PySpark — Columnar Read Optimization
--
Parquet, ORC etc are very popular Columnar File formats used in Big Data for data storage and retrieval. These are popular choice for fast analytics workloads.
To check about different columnar file format, checkout — http://urlit.me/t43HI
We are going to see how we can proactively optimize our data read operations from those columnar files.
For our example we are going to consider Parquet file format, which are popularly used for their Big Data Data Warehousing use cases.
Without delay, lets jump into action mode. Our Parquet example file (118M of size).
As usual we will create out Python decorator for performance measurement.
# Lets create a simple Python decorator - {get_time} to get the execution timings
# If you dont know about Python decorators - check out : https://www.geeksforgeeks.org/decorators-in-python/
import timedef get_time(func):
def inner_get_time() -> str:
start_time = time.time()
func()
end_time = time.time()
return (f"Execution time: {(end_time - start_time)*1000} ms")
print(inner_get_time())
We are going to use “noop” format for performance benchmarking. Keep a note of the timings.
Method 1 — Lets read the data without specifying any schema. This will allow spark to read the schema on the fly.
# Now lets read the dataset without specifying the schemadf_sales = spark \
.read \
.format("parquet") \
.load("dataset/sales.parquet")df_sales.printSchema()
@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Method 2 — Reading data with schema specified
# Now we specify the schema before reading_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"df_sales = spark \
.read \
.schema(_schema) \
.format("parquet") \
.load("dataset/sales.parquet")df_sales.printSchema()
@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Method 3 — Reading data with only required columns, not all columns
# Now if we only query the required columns_required_schema = "transacted_at STRING, trx_id STRING, amount STRING"df_sales = spark \
.read \
.schema(_required_schema) \
.format("parquet") \
.load("dataset/sales.parquet")df_sales.printSchema()
@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Method 4 — Reading data with required columns using select()
# If we read the partial schema again but this time with select_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"df_sales = spark \
.read \
.schema(_schema) \
.parquet("dataset/sales.parquet") \
.select("transacted_at", "trx_id", "amount")df_sales.printSchema()
@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Method 5 — Reading required columns using drop() to remove un-necessary columns
# We can also use drop to remove the un-wanted columns_schema = "transacted_at STRING, trx_id STRING, retailer_id STRING, description STRING, amount STRING, city_id STRING"df_sales = spark \
.read \
.schema(_schema) \
.parquet("dataset/sales.parquet") \
.drop("retailer_id", "description", "city_id")df_sales.printSchema()
@get_time
def x(): df_sales.write.format("noop").mode("overwrite").save()
Note: The above result will change with the number of columns and size of dataset. With increase, the timing difference will also vary significantly.
Conclusion — As demonstrated, if we specify schema during columnar reads with required columns only, the read time can be cut-off by huge margin. We can use specify schema directly or use select/drop for the same.
Checkout iPython Notebook on Github — https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/10_columnar_data_optimization.ipynb
Checkout PySpark Series on Medium — https://subhamkharwal.medium.com/learnbigdata101-spark-series-940160ff4d30
Wish to Buy me a Coffee: Buy Subham a Coffee