PySpark — Spark Streaming Error and Exception Handling

Understand How to handle Spark Streaming Errors and Exceptions

Subham Khandelwal
7 min readMar 20, 2024

We always dream of writing picture perfect code and even try to do so, but what if — the data itself is bad/malformed or say some service (JDBC) is down while processing it? Now a perfect code is still not perfect if its not ready to handle such scenarios i.e. capture data for re-processing failed due to errors or exceptions while processing.

Today we will understand and handle such errors and exception in Spark Structured Streaming code.

Photo by Sigmund on Unsplash

Now if you are new to Spark, PySpark or want to learn more — I teach Big Data, Spark, Data Engineering & Data Warehousing on my YouTube Channel — Ease With Data. Improve your PySpark Skill with this Playlist. Spark Streaming with PySpark Playlist.

Use Case 📃

We are reading Device Data JSON payloads with the below mentioned structure through Kafka in Spark Streaming. Our goal is to flatten and explode the data and save it in Postgres table using JDBC.

{
"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502",
"eventOffset": 10001,
"eventPublisher": "device",
"customerId": "CI00103",
"data": {
"devices": [
{
"deviceId": "D001",
"temperature": 15,
"measure": "C",
"status": "ERROR"
},
{
"deviceId": "D002",
"temperature": 16,
"measure": "C",
"status": "SUCCESS"
}
]
},
"eventTime": "2023-01-05 11:13:53.643364"
}

Issues ❌

We can run into following two failure scenarios, which we need to address to make sure the Streaming Application is not killed and we don’t lose any data:

  1. Error — Situation where we get a malformed event and data is not flattened properly. For example — a string in place of a JSON payload like hello world or a JSON payload where the devices array is empty “data”: { “devices”: [] }
  2. Exception — Situation where the code itself run into some issues like the JDBC or Postgres Service is not available to write the data.

Resolution ✅

Lets generate our Spark Session with required jars to read data from Kafka and write to Postgres (JDBC).

# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
SparkSession
.builder
.appName("Handling errors and Exceptions")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
.config('spark.jars', '/home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar')
.config("spark.sql.shuffle.partitions", 8)
.master("local[*]")
.getOrCreate()
)

spark

Next we need to read the JSON payloads from Kafka topic device-data into a DataFrame kafka_df.

# Create the kafka_df to read from kafka

kafka_df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ed-kafka:29092")
.option("subscribe", "device-data")
.option("startingOffsets", "earliest")
.load()
)

Now, Spark Streaming provides us foreachBatch method which allows us to pass that micro batch data through a Python function. For our use case we will keep the trigger at processing time of 10 seconds. So, after every 10s the kafka_df dataframe will be passed to our parent Python function device_data_output for processing.

Not sure of foreachBatch in Spark Streaming, checkout this YouTube video. Also follow this playlist, in case you want to learn Spark Streaming with PySpark

# Running foreachBatch

(kafka_df
.writeStream
.foreachBatch(device_data_output)
.trigger(processingTime='10 seconds')
.option("checkpointLocation", "checkpoint_dir_kafka")
.start()
.awaitTermination())

Since, we are ready to pass our micro batches to parent Python function device_data_output lets implement the code for same.

# Handle Error and Exception and write to JDBC 
from pyspark.sql.functions import lit

def device_data_output(kafka_df, batch_id):
print("Batch id:" + str(batch_id))
try:
# Get the Flattened and Error Dataframe
flattened_df, error_df_raw = flatten_data(kafka_df)

# Add the batchid column in Error Dataframe
error_df = error_df_raw.withColumn("batchid", lit(batch_id))

# Write Flattened Dataframe to JDBC
postgres(flattened_df, "device_data")

# Write Error Datafram to JDBC
postgres(error_df, "device_data_error")

# Display both Dataframes for confirmation
flattened_df.show()
error_df.show()

except Exception as e:
print(e)
kafka_df.write.format("parquet").mode("append").save("data/output/device_data_error.parquet")

Lets understand the above code with following steps:

  1. Print the batch_id of the micro batch that is being processed.
  2. Pass the micro batch data to another python function flatten_data in order to segregate the data into flattened_df and error_df. The flattened_df dataframe will hold the correct data which is properly flattened and the error_df dataframe will hold the error raw data (columns: key, value, eventtimestamp, batchid).
  3. Add batchid column to the error_df dataframe in order to identify the batch of error data.
  4. Write the flattened/correct data in the JDBC Postgres table device_data.
  5. Capture the error raw data in JDBC Postgres table device_data_error.
  6. Wrap up the whole code in try - except block in order to make sure if there is any exception while execution, we will capture the whole micro batch data in parquet format at an output location for re-processing.

And finally the flatten_data Python function to flatten, explode and segregate the data into correct and error data frames.

# Defined logic for handling the error records
from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json, col, expr, explode, current_timestamp, lit, size
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

def flatten_data(df):

# Convert binary to string value column
kafka_json_df = df.withColumn("value", expr("cast(value as string)"))

# Define Schema
json_schema = (
StructType(
[StructField('customerId', StringType(), True),
StructField('data', StructType(
[StructField('devices',
ArrayType(StructType([
StructField('deviceId', StringType(), True),
StructField('measure', StringType(), True),
StructField('status', StringType(), True),
StructField('temperature', LongType(), True)
]), True), True)
]), True),
StructField('eventId', StringType(), True),
StructField('eventOffset', LongType(), True),
StructField('eventPublisher', StringType(), True),
StructField('eventTime', StringType(), True)
])
)

# Expand JSON from Value column using Schema
json_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema))

# Filter out for error data
error_df = json_df.select("key", "value").withColumn("eventtimestamp",lit(current_timestamp())) \
.where("values_json.customerId is null or size(values_json.data.devices) = 0")

# Filter out correct flattened data
streaming_df = json_df.where("values_json.customerId is not null and size(values_json.data.devices) > 0") \
.selectExpr("values_json.*")

# Explode the correct flattened data
exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))

# Flatten data
flattened_df = (
exploded_df
.drop("data")
.withColumn("deviceId", col("data_devices.deviceId"))
.withColumn("measure", col("data_devices.measure"))
.withColumn("status", col("data_devices.status"))
.withColumn("temperature", col("data_devices.temperature"))
.drop("data_devices")
)

# Return both Flattened & Error Dataframe
return flattened_df, error_df

The above code segregates the micro batch data into flattened and error dataframes based on the filter condition values_json.customerId is null or size(values_json.data.devices) = 0, which checks if the data is read properly based on JSON schema and we have valid customerId column along with device data array size greater than 0.

Want to understand how flattening and exploding code works, checkout this YouTube video.

Only missing piece is to add the re-useable postgres Python function to write the data to JDBC Postgres tables.

# Function to write the dataframe to JDBC (Postgres)

def postgres(df, table_name):
(
df.write
.mode("append")
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://postgres-db-1:5432/sqlpad")
.option("dbtable", table_name)
.option("user", "sqlpad")
.option("password", "sqlpad")
.save()
)

Execution 🛠️

Our code is ready, lets go ahead and post some data in Kafka to see if the code handles the error and Exception properly. Since we are printing both flattened_df and error_df, we can find the same in below executions.

Case 1: Correct JSON payload with proper structure

Correct Data posted in Kafka
Data Flattened Properly
Data inserted in Postgres device_data table

Case 2: Malformed string data “hello world” sent to Kafka

Sting hello world is sent to Kafka
Data not flattened and pushed to error dataframe
Data inserted in Postgres error table device_data_error

Case 3: Malformed JSON payload with device data missing

Devices array is empty for JSON payload and sent of Kafka
Data not flattened and pushed to error dataframe
Data inserted in Postgres error table device_data_error

Case 4: Exception scenario with Postgres service turned off and data posted in Kafka

Postgres service is turned off
Correct Data posted in Kafka
Spark Streaming prints the Exception
Micro Batch data captured at Parquet output location for re-processing
Spark Streaming code is still running after Exception

And thus, we cover all scenarios and handle Errors and Exceptions for our Spark Streaming code.

If you want to see this in Action, checkout this YouTube video.

YouTube video

Checkout the complete Spark Streaming with PySpark series on YouTube: https://youtube.com/playlist?list=PL2IsFZBGM_IEtp2fF5xxZCS9CYBSHV2WW&si=4rF9V-Px9EJTiIiU

Conclusion 💬

Now that we can handle the errors and exception in our code, no data is lost. We can easily re-process the data in error/exception with minor fixes if required.

Make sure to Like, Subscribe and Share with your network❤️

References🔖

Checkout complete code on Github — https://github.com/subhamkharwal/spark-streaming-with-pyspark/blob/master/06_handling_errors_and_exceptions.ipynb

PySpark Zero to Hero Series on YouTube: https://youtube.com/playlist?list=PL2IsFZBGM_IHCl9zhRVC1EXTomkEp_1zm&si=Q664l-TFXf4wj1We

Spark Streaming with PySpark on YouTube: https://youtube.com/playlist?list=PL2IsFZBGM_IEtp2fF5xxZCS9CYBSHV2WW&si=4rF9V-Px9EJTiIiU

Checkout Ease With Data YouTube Channel: https://www.youtube.com/@easewithdata

Wish to connect with me: https://topmate.io/subham_khandelwal

Checkout the PySpark Medium Series — https://subhamkharwal.medium.com/learnbigdata101-spark-series-940160ff4d30

--

--