This project demonstrates the implementation of Slowly Changing Dimensions (SCD) Type 2 for a retail store customer dimension, using PySpark. SCD Type 2 retains the complete history of changes, making it the most commonly used approach in the industry.
- Directory Structure in HDFS
- Prepare Source and Target
- Define Variables
- Define Schemas
- Create Source dataFrame
- Write Enhanced Dataframe to Target
- Create Target DataFrame
- View Source and Target DataFrames
- Handle Surrogate Keys
- Add New Data
- Read Updated Source DataFrame
- Create Active and Inactive DataFrames
- Perform Join Operation
- Column Renaming and Hashing
- Perform Join and Create Action Column
- Filter Records by Action
- Merge DataFrames
- Key Points
- Conclusion
- License
- Main Folder:
- Subfolders:
- Subfolders:
- Source Folder: Add the
file to the source. - Target Folder: Create an empty target folder.
=[ "email", "phone", "address", "city", "state", "zipcode"]
=["effective_date", "end_date", "active_flag"]
Columns to Show History:
Example Values:
1st Jan 2013
,31st Dec 2017
31st Dec 2017
,31st Dec 9999
- Source Schema:
customers_source_schema = "customerid long, firstname string, lastname string, email string, phone string, address string, city string, state string, zipcode long"
- Target Schema:
customers_target_schema = "customerid long, firstname string, lastname string, email string, phone string, address string, city string, state string, zipcode long, customer_skey long, effective_date date, end_date date, active_flag boolean"
- Read and enhance the
:customers_source_df = \ .format("csv") \ .option("header",True) \ .schema(customers_source_schema) \ .load(source_url) enhanced_customers_source_df = \ .format("csv") \ .option("header",True) \ .schema(customers_source_schema) \ .load(source_url) \ .withColumn("customer_skey", row_number().over(window_def)) \ .withColumn("effective_date", date_format(current_date(), DATE_FORMAT)) \ .withColumn("end_date", date_format(lit(future_date), DATE_FORMAT)) \ .withColumn("active_flag", lit(True))
- Write the enhanced source dataframe
to the target folder:enhanced_customers_source_df.write.mode('overwrite') \ .option("header",True) \ .option("delimiter",",") \ .csv(destination_url)
- Create target dataframe
from the enhanced source data:customers_target_df = \ .format("csv") \ .option("header",True) \ .schema(customers_target_schema) \ .load(destination_url)
- Compare Columns between source and target dataframes:
- Aggregate function to find maximum surrogate key:
max_sk = customers_target_df.agg({"customer_skey": "max"}).collect()[0][0]
- Add new data to mock the changes in the landing area due to ingestion of incremental data.
- Add
to the source folder. - Delete the old
- Observe Updates, Inserts, and Deletes in the new source data
- Active Customers:
active_customers_target_df = customers_target_df.filter(col("active_flag") == True)
- Inactive Customers:
inactive_customers_target_df = customers_target_df.filter(col("active_flag") == False)
- Join active customers with the updated source data:
merged_df = active_customers_target_df.join(customers_source_df, "customerid", "full_outer")
- Column Renaming Function:
def column_renamer(df, suffix, append): if append: new_column_names = list(map(lambda x: x+suffix, df.columns)) else: new_column_names = list(map(lambda x: x.replace(suffix,""), df.columns)) return df.toDF(*new_column_names)
- Get Hash Function:
def get_hash(df, keys_list): columns = [col(column) for column in keys_list] if columns: return df.withColumn("hash_md5", md5(concat_ws("", *columns))) else: return df.withColumn("hash_md5", md5(lit("1")))
- Apply Functions:
active_customers_target_df_hash = column_renamer(get_hash(active_customers_target_df, slowly_changing_cols), suffix="_target", append=True) customers_source_df_hash = column_renamer(get_hash(customers_source_df, slowly_changing_cols), suffix="_source", append=True)
- Join and create the Action column to indicate INSERT, DELETE, UPDATE, and NO-CHANGE:
merged_df = active_customers_target_df_hash.join(customers_source_df_hash, col("customerid_source") == col("customerid_target") , "full_outer") \ .withColumn("Action", when(col("hash_md5_source") == col("hash_md5_target") , 'NOCHANGE')\ .when(col("customerid_source").isNull(), 'DELETE')\ .when(col("customerid_target").isNull(), 'INSERT')\ .otherwise('UPDATE'))
- Unchanged Records:
unchanged_records = column_renamer(merged_df.filter(col("action") == 'NOCHANGE'), suffix="_target", append=False).select(active_customers_target_df.columns)
- Inserted Records:
insert_records = column_renamer(merged_df.filter(col("action") == 'INSERT'), suffix="_source", append=False) \ .select(customers_source_df.columns)\ .withColumn("row_number",row_number().over(window_def))\ .withColumn("customer_skey",col("row_number") + max_sk)\ .withColumn("effective_date",date_format(current_date(),DATE_FORMAT))\ .withColumn("end_date",date_format(lit(future_date),DATE_FORMAT))\ .withColumn("active_flag", lit(True))\ .drop("row_number")
- Evaluate Maximum Customers Surrogate Key from Inserted Records:
max_sk = insert_records.agg({"customer_skey": "max"}).collect()[0][0]
- Updated Records:
update_records = column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_target", append=False)\ .select(active_customers_target_df.columns)\ .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\ .withColumn("active_flag", lit(False))\ .unionByName( column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_source", append=False)\ .select(customers_source_df.columns)\ .withColumn("effective_date",date_format(current_date(),DATE_FORMAT))\ .withColumn("end_date",date_format(lit(future_date),DATE_FORMAT))\ .withColumn("row_number",row_number().over(window_def))\ .withColumn("customer_skey",col("row_number")+ max_sk)\ .withColumn("active_flag", lit(True))\ .drop("row_number") )
- Evaluate Maximum Customers Surrogate Key from Updated Records:
max_sk = update_records.agg({"customer_skey": "max"}).collect()[0][0]
- Deleted Records:
delete_records = column_renamer(merged_df.filter(col("action") == 'DELETE'), suffix="_target", append=False)\ .select(active_customers_target_df.columns)\ .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\ .withColumn("active_flag", lit(False))
- Final union of all records:
resultant_df = inactive_customers_target_df \ .unionByName(unchanged_records)\ .unionByName(insert_records)\ .unionByName(update_records)\ .unionByName(delete_records)
- Focus on active records.
- Mark deleted records as inactive in the target.
- Use hash approach for updates.
- Handle 4 scenarios: INSERT, DELETE, UPDATE, NO-CHANGE.
- This implementation ensures that the customer dimension retains complete history, providing accurate and historical insights for the retail store.
This project is licensed under the MIT License. You are free to use, modify, and distribute this software in any way you see fit, provided that this license notice appears in all copies of the software.