PySpark — Upsert or SCD1 with Dynamic Overwrite
Upsert or Incremental Update or Slowly Changing Dimension 1 aka SCD1 is basically a concept in data modelling, that allows to update existing records and insert new records based on identified keys from an incremental/delta feed.
To implement the same in PySpark on a partitioned dataset, we would take help of Dynamic Partition Overwrite. Lets jump into action.
To know more on Dynamic Partition Overwrite, checkout my previous article — http://urlit.me/75B6B
First we will change the Partition Overwrite configuration to DYNAMIC.
# Set the mode to dynamic to work on Upsert
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")# Check the mode for Partition Overwrite
print(spark.conf.get("spark.sql.sources.partitionOverwriteMode"))
Create Orders History Dataset
# Create the Full history dataset
from pyspark.sql.functions import to_date
_data = [
["ORD1001", "P003", 70, "01-21-2022", "01-30-2022"],
["ORD1004", "P033", 12, "01-24-2022", "01-30-2022"],
["ORD1005", "P036", 10, "01-20-2022", "01-30-2022"],
["ORD1002", "P016", 2, "01-10-2022", "01-30-2022"],
["ORD1003", "P012", 6, "01-10-2022", "01-30-2022"],
]_cols = ["order_id", "prod_id", "qty"…