PySpark — Structured Streaming Read from Files

Subham Khandelwal
3 min readJan 5


Often Streaming use case needs to read data from files/folders and process for downstream requirements. And to elaborate this, lets consider the following use case.

A popular thermostat company takes reading from their customer devices to understand device usage. The device data JSON files are generated and dumped in an input folder. The notification team requires the data to be flattened in real-time and written in output in JSON format for sending out notifications and reporting accordingly.

Representation Image

A sample JSON input file contains the following data points

Sample JSON file data format

Now, to design the real time streaming pipeline to ingest the file and flatten, lets create the SparkSession

# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Streaming Process Files") \
.config("spark.streaming.stopGracefullyOnShutdown", True) \
.master("local[*]") \


Create the DataFrameStreamReader dataframe

# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
streaming_df = spark.readStream\
.format("json") \
.option("cleanSource", "archive") \
.option("sourceArchiveDir", "data/archive/device_data/") \
.option("maxFilesPerTrigger", 1) \

Few points to note:

  1. Option cleanSource — It can archive or delete the source file after processing. Values can be archive, delete and default is off.
  2. Option sourceArchiveDir — Archive directory if the cleanSource option is set to archive.
  3. Option maxFilesPerTrigger — Sets how many files to process in a single go.
  4. We need to set spark.sql.streaming.schemaInference to True to allow streaming schemaInference.

Now, if we need to check the Schema, just replace the readStream to read for debugging

# To the schema of the data, place a sample json file and change readStream to read 
Schema structure

Since the data is for multiple devices in list/array we need to explode the dataframe before flattening

# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode, col

exploded_df = streaming_df \
.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
.withColumn("devices", explode("data.devices")) \

Flattening dataframe

# Flatten the exploded df
flattened_df = exploded_df \
.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime",
"devices.deviceId as deviceId", "devices.measure as measure",
"devices.status as status", "devices.temperature as temperature")

Lets write the streaming data to output folder in JSON format

# Write the output to console sink to check the output
writing_df = flattened_df.writeStream \
.format("json") \
.option("path", "data/output/device_data") \
.option("checkpointLocation","checkpoint_dir") \
.outputMode("append") \

# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption


As soon we put a new file in input directory, same is processed in real-time. See in action:

Output files generated

Lets check the output

# Check the data at the output location

out_df ="data/output/device_data/")
Flattened output data

Check the Spark UI, to see the micro bach executions

Spark UI

Checkout Structured Streaming basics —

Checkout the iPython Notebook on Github —

Checkout the docker images to quickly start —

Checkout my Personal blog —

Checkout the PySpark Medium Series —



Subham Khandelwal

⚒️ Senior Data Engineer with 10+ YOE | 📽️ YouTube channel: | 📞 TopMate :