PySpark — Dynamic Partition Overwrite

Overwrite only effected partitions without effecting all other data

Subham Khandelwal
5 min readNov 2, 2022

INSERT OVERWRITE is a very wonderful concept of overwriting few partitions rather than overwriting the whole data in partitioned output. We have seen this implemented in Hive, Impala etc. But can we implement the same Apache Spark?

Yes, we can implement the same functionality in Spark with Version > 2.3.0 with a small configuration change with write mode “overwrite” itself.

Representation Image

As usual, we will follow along with an example to test this implementation. To implement this functionality we will change the following Spark Parameter

spark.sql.sources.partitionOverwriteMode — Check out Spark documentation https://spark.apache.org/docs/3.0.0/configuration.html

Test 1: We overwrite the partitions for the data with default value for partitionOverwriteMode i.e. STATIC

Lets create our Initial example dataset

# Example dataset
from pyspark.sql.functions import cast, to_date
_data = [
["ORD1001", "P003", 70, "01-21-2022"],
["ORD1004", "P033", 12, "01-24-2022"],
["ORD1005", "P036", 10, "01-20-2022"],
["ORD1002", "P016", 2, "01-10-2022"],
["ORD1003", "P012", 6, "01-10-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]# Create the dataframe
df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
df = df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
df.printSchema()
df.show()
Initial dataset

Current/Default configuration for spark.sql.sources.partitionOverwriteMode

# Check the mode for Partition Overwrite
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
Default conf set to STATIC

Repartition and save the Initial dataset, we have 4 partition with order_date

# Lets repartition the data with order_date and write
df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Initial dataset partitioned
Validate Initial dataset

Generate the new delta dataset for overwrite

# Lets create our delta dataset for Overwrite
_data = [
["ORD1010", "P053", 78, "01-24-2022"],
["ORD1011", "P076", 21, "01-20-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]# Create the delta dataframe
delta_df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
delta_df = delta_df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
delta_df.printSchema()
delta_df.show()
Delta dataset

Now, we overwrite the delta data with default configuration

# Lets write to the same location for Orders partitioned
delta_df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Final Data Partitioned
Final data validation

From the above test we conclude, The data is completely overwritten for all existing partitions from Initial data with Delta dataset.

Now if you are new to Spark, PySpark or want to learn more — I teach Big Data, Spark, Data Engineering & Data Warehousing on my YouTube Channel — Ease With Data

YouTube — Tutorials

Test 2: We overwrite the partitions for the data with partitionOverwriteMode as DYNAMIC

We follow the same above steps but this time we set the configuration value to DYNAMIC

# Setting the partitionOverwriteMode as DYNAMICspark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
Setting conf value to DYNAMIC

Initial dataset

# Example dataset
from pyspark.sql.functions import cast, to_date
_data = [
["ORD1001", "P003", 70, "01-21-2022"],
["ORD1004", "P033", 12, "01-24-2022"],
["ORD1005", "P036", 10, "01-20-2022"],
["ORD1002", "P016", 2, "01-10-2022"],
["ORD1003", "P012", 6, "01-10-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]# Create the dataframe
df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
df = df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
df.printSchema()
df.show()
Initial Dataset

Write the partitioned Initial dataset

# Lets repartition the data with order_date and write
df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Initial Partitioned dataset
Validate Initial data

We create the same delta dataset

# Lets create our delta dataset for Overwrite
_data = [
["ORD1010", "P053", 78, "01-24-2022"],
["ORD1011", "P076", 21, "01-10-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]# Create the delta dataframe
delta_df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
delta_df = delta_df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
delta_df.printSchema()
delta_df.show()
Delta Dataset

Now, we write the dataset with same mode “overwrite

# Lets write to the same location for Orders partitioned
delta_df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Final partitioned dataset
Final data validation

As its evident from above test, the partitioned in the delta dataset are only overwritten rather than the whole dataset.

Note: This will only work with partitioned dataset.

Conclusion: We can achieve INSERT OVERWRITE functionality in Spark > 2.3.0 with configuration parameter for changed for Partition Overwrite to dynamic.

Checkout the iPython Notebook on Github — https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/24_Partition_Overwrite.ipynb

Checkout my Personal blog — https://urlit.me/blog/

Checkout the PySpark Medium Series — https://subhamkharwal.medium.com/learnbigdata101-spark-series-940160ff4d30

--

--