PySpark — Columnar Read Optimization

Subham Khandelwal
3 min readOct 12, 2022

--

Parquet, ORC etc are very popular Columnar File formats used in Big Data for data storage and retrieval. These are popular choice for fast analytics workloads.

To check about different columnar file format, checkout — http://urlit.me/t43HI

We are going to see how we can proactively optimize our data read operations from those columnar files.

For our example we are going to consider Parquet file format, which are popularly used for their Big Data Data Warehousing use cases.

Without delay, lets jump into action mode. Our Parquet example file (118M of size).

Sales Parquet File

As usual we will create out Python decorator for performance measurement.

# Lets create a simple Python decorator - {get_time} to get the execution timings
# If you dont know about Python decorators - check out :
https://www.geeksforgeeks.org/decorators-in-python/
import time
def get_time(func):
def inner_get_time() -> str:
start_time = time.time()
func()
end_time = time.time()
return (f"Execution time: {(end_time - start_time)*1000} ms")
print(inner_get_time())
Python decorator for Performance Measure

We are going to use “noop” format for performance benchmarking. Keep a note of the timings.

Method 1 — Lets read the data without specifying any schema. This will allow spark to read the schema on the fly.

# Now lets read the dataset without specifying the schemadf_sales = spark \
.read \
.format("parquet") \
.load("dataset/sales.parquet")
df_sales.printSchema()

@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Reading without specifying Schema

Method 2 — Reading data with schema specified

# Now we specify the schema before reading_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"df_sales = spark \
.read \
.schema(_schema) \
.format("parquet") \
.load("dataset/sales.parquet")
df_sales.printSchema()

@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Reading with Schema specified

Method 3 — Reading data with only required columns, not all columns

# Now if we only query the required columns_required_schema = "transacted_at STRING, trx_id STRING, amount STRING"df_sales = spark \
.read \
.schema(_required_schema) \
.format("parquet") \
.load("dataset/sales.parquet")
df_sales.printSchema()

@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Reading with only required column

Method 4 — Reading data with required columns using select()

# If we read the partial schema again but this time with select_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"df_sales = spark \
.read \
.schema(_schema) \
.parquet("dataset/sales.parquet") \
.select("transacted_at", "trx_id", "amount")
df_sales.printSchema()

@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Reading with select

Method 5 — Reading required columns using drop() to remove un-necessary columns

# We can also use drop to remove the un-wanted columns_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"df_sales = spark \
.read \
.schema(_schema) \
.parquet("dataset/sales.parquet") \
.drop("retailer_id", "description", "city_id")
df_sales.printSchema()

@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Reading with drop

Note: The above result will change with the number of columns and size of dataset. With increase, the timing difference will also vary significantly.

Conclusion — As demonstrated, if we specify schema during columnar reads with required columns only, the read time can be cut-off by huge margin. We can use specify schema directly or use select/drop for the same.

--

--