Lucene search
K

📄 Vertex AI Experiments 1.132.x Predictable Bucket Naming

🗓️ 10 Mar 2026 00:00:00Reported by indoushkaType 
packetstorm
 packetstorm
🔗 packetstorm.news👁 110 Views

Predictable Vertex AI bucket names enable cross-tenant remote code execution and model theft.

Related
Code
=============================================================================================================================================
    | # Title     : Vertex AI Experiments 1.21.0 to 1.132.x Predictable Bucket Naming Leading to Cross‑Tenant RCE and Model Theft               |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits)                                                            |
    | # Vendor    : https://cloud.google.com/vertex-ai                                                                                          |
    =============================================================================================================================================
    
    [+] Summary    :  A vulnerability identified as CVE-2026-2473 affected Google Cloud Vertex AI, specifically the Vertex AI Experiments component, in versions 1.21.0 through 1.132.x (fixed in 1.133.0 and later).
                      The issue stemmed from predictable Cloud Storage bucket naming patterns, enabling a class of attack known as Bucket Squatting. 
    				  Because bucket names in Google Cloud Storage are globally unique, an attacker could pre-create a bucket matching a predictable name before a legitimate tenant did.
    
    This allowed a potential unauthenticated remote attacker to:
    
    Achieve cross-tenant Remote Code Execution (RCE)
    
    Exfiltrate trained models (Model Theft)
    
    Poison experiment artifacts or training outputs
    
    Interfere with experiment tracking pipelines
    
    
    [+] POC   :  
    
    #!/usr/bin/env python3
    
    import google.cloud.storage as storage
    import time
    import sys
    import os
    import json
    import argparse
    from datetime import datetime
    from typing import List, Dict
    import random
    import string
    
    class VertexAIBucketDefensiveScanner:
        """
        Scanner for detecting Bucket Squatting risk
        in Google Cloud Storage used by Vertex AI Experiments by indoushka.
        """
    
        def __init__(self, project_id: str, credentials_path: str):
            self.project_id = project_id
            self.findings = []
    
            try:
                self.client = storage.Client.from_service_account_json(credentials_path)
                print(f"[✓] Connected to GCS project: {project_id}")
            except Exception as e:
                print(f"[✗] Connection failed: {e}")
                sys.exit(1)
    
            self.bucket_patterns = [
                "vertex-ai-experiment-{exp_id}-{timestamp}",
                "vertex-ai-exp-{exp_id}-data-{timestamp}",
                "vertex-{exp_id}-experiment-{random}",
                "ai-platform-{exp_id}-{timestamp}",
                "vertex-ai-{exp_id}-{user_id}",
                "experiment-{exp_id}-bucket-{timestamp}",
                "vertex-{random}-{exp_id}-data"
            ]
    
        def generate_bucket_names(self, exp_id: str, count: int = 5) -> List[str]:
            predicted = []
            timestamp = int(time.time())
    
            for pattern in self.bucket_patterns:
                for i in range(count):
                    random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
                    user_id = f"user-{random.randint(1000, 9999)}"
    
                    name = pattern.format(
                        exp_id=exp_id,
                        timestamp=timestamp + i,
                        random=random_str,
                        user_id=user_id
                    )
                    predicted.append(name)
    
            return predicted
    
        def check_bucket_status(self, bucket_name: str) -> Dict:
            """
            Checks if bucket exists and whether accessible
            WITHOUT creating it.
            """
    
            bucket = self.client.bucket(bucket_name)
    
            try:
                exists = bucket.exists()
    
                if not exists:
                    return {
                        "bucket": bucket_name,
                        "status": "AVAILABLE",
                        "risk": "HIGH",
                        "reason": "Predictable name and not registered."
                    }
                try:
                    bucket.reload()
                    owner = bucket.project_number
    
                    if owner != self.client.project:
                        return {
                            "bucket": bucket_name,
                            "status": "EXISTS_EXTERNAL",
                            "risk": "CRITICAL",
                            "reason": "Bucket exists but not owned by this project."
                        }
    
                    return {
                        "bucket": bucket_name,
                        "status": "OWNED",
                        "risk": "LOW",
                        "reason": "Bucket exists and owned by project."
                    }
    
                except Exception:
                    return {
                        "bucket": bucket_name,
                        "status": "EXISTS_NO_ACCESS",
                        "risk": "MEDIUM",
                        "reason": "Bucket exists but metadata not accessible."
                    }
    
            except Exception as e:
                return {
                    "bucket": bucket_name,
                    "status": "ERROR",
                    "risk": "UNKNOWN",
                    "reason": str(e)
                }
    
        def scan_experiment_range(self, start_id: int, end_id: int):
            print(f"[*] Scanning experiment IDs {start_id} → {end_id}")
    
            for exp_id in range(start_id, end_id + 1):
                bucket_names = self.generate_bucket_names(str(exp_id), count=3)
    
                for name in bucket_names:
                    result = self.check_bucket_status(name)
                    self.findings.append(result)
    
                    print(f"[{result['risk']}] {name} → {result['status']}")
    
                    time.sleep(0.2)
    
        def generate_report(self) -> Dict:
            summary = {
                "CRITICAL": len([f for f in self.findings if f["risk"] == "CRITICAL"]),
                "HIGH": len([f for f in self.findings if f["risk"] == "HIGH"]),
                "MEDIUM": len([f for f in self.findings if f["risk"] == "MEDIUM"]),
                "LOW": len([f for f in self.findings if f["risk"] == "LOW"]),
            }
    
            return {
                "timestamp": datetime.now().isoformat(),
                "project_id": self.project_id,
                "summary": summary,
                "total_checked": len(self.findings),
                "findings": self.findings
            }
    
    
    def main():
        parser = argparse.ArgumentParser(
            description="Defensive Scanner - Vertex AI Bucket Squatting Risk"
        )
        parser.add_argument('--project', required=True)
        parser.add_argument('--creds', required=True)
        parser.add_argument('--start', type=int, default=1000)
        parser.add_argument('--end', type=int, default=1010)
    
        args = parser.parse_args()
    
        scanner = VertexAIBucketDefensiveScanner(
            project_id=args.project,
            credentials_path=args.creds
        )
    
        print("=" * 60)
        print("Vertex AI Bucket Squatting Defensive Scanner")
        print("Mode: Detection Only (No Exploitation)")
        print("=" * 60)
    
        scanner.scan_experiment_range(args.start, args.end)
    
        report = scanner.generate_report()
    
        with open("defensive_scan_report.json", "w") as f:
            json.dump(report, f, indent=2)
    
        print("\n[✓] Report saved to defensive_scan_report.json")
    
    
    if __name__ == "__main__":
        exit(main())
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

10 Mar 2026 00:00Current
5.8Medium risk
Vulners AI Score5.8
CVSS 47.7
EPSS0.00313
SSVC
110