PySpark — Structured Streaming Read from Sockets

Subham Khandelwal
4 min readJan 4, 2023

Spark Structured Streaming is gaining its popularity due to many out-of-box features. It leverages the existing Spark APIs framework to enhance the power of Streaming computations.

Representation Image

Spark executes small micro-batches in loop to achieve the streaming use case. A background task generates the execution plan similar to batch execution and keeps on iterating over the same as micro-batches.

Still have doubts, checkout PySpark — The Basics of Structured Streaming for quick refresh on Spark Structured Streaming basics.

For beginning, we will be reading data from a socket using Spark Structured Streaming in real-time to find out the count of each word posted in string. Very basic example to start with to understand the basic concepts for Spark Streaming.

As usual we will start with creating a SparkSession

# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Streaming Socket Word Count") \
.master("local[*]") \
.getOrCreate()

spark
Spark Session

Now, to read the streaming data — create a DataFrameStreamReader (streaming_df) dataframe

# Create the streaming dataframe to read from socket
# Sockets are not recommended for Production applications is only for debugging and testing applications

streaming_df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", "9999") \
.load()

Check the schema

# Check the schema
streaming_df.printSchema()
Schema for the streaming data frame

As the socket will only push string data, we have only one column value with datatype as String.

Split the string and explode to create a dataframe with words

# Lets split the strings based on spaces and explode the list to create words column
words_df = streaming_df.selectExpr("explode(split(value, ' ')) as word")

# Check the schema
words_df.printSchema()
Words Data frame

Now, we can aggregate on the words dataframe to find the count of individaul words.

# Now lets aggregate the words_df to find the word counts
from pyspark.sql.functions import count

# Change the shuffle partitions to 4 as we dont want to run through 200 partitions
spark.conf.set("spark.sql.shuffle.partitions", 4)

# Generate aggregated dataframe for word count
agg_words_df = words_df \
.groupBy("word") \
.agg(count("word").alias("count"))

# Print the schema to validate
agg_words_df.printSchema()
Aggregated Words Data Frame

Before we start the application, open a terminal to attach port 9999 to send data.

Attach port

Finally we will choose the sink as console and outputMode as complete.

# Write the output to console sink to check the output
writing_df = agg_words_df.writeStream \
.format("console") \
.outputMode("update") \
.start()

# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption

writing_df.awaitTermination()
Complete mode

What if we chose, the outputMode as update

Update mode

Append mode will not work. Spark will throw an error as it doesn’t allow to append results for aggregated calculation.

And to justify that in background Spark run small micro-batches, checkout the Spark UI.

Spark UI

Its evident we can easily write Streaming applications with a very minor changes in batch execution code. We will cover more complex and important use-cases in coming posts.

In case ncat is not installed, run in linux — apt-get install -y ncat

Checkout Structured Streaming basics — https://urlit.me/blog/pyspark-the-basics-of-structured-streaming/

Checkout the iPython Notebook on Github — https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/31_spark%20streaming_read_from_socket.ipynb

Checkout the docker images to quickly start— https://github.com/subhamkharwal/docker-images

Checkout my Personal blog — https://urlit.me/blog/

Checkout the PySpark Medium Series — https://medium.com/@subhamkharwal/learnbigdata101-spark-series-940160ff4d30

--

--