Sunday, April 18, 2021

SCD TYPE 2 Implementation in Pyspark

 #Consider following data as existing data in target table.

dat= [[1, 50, "2019-02-01", "2019-02-02", 0],

  [1, 75, "2019-02-02", None,1],

  [2, 200, "2019-02-01", "2019-02-01", 0],

  [2, 60, "2019-02-01", "2019-02-01", 0],

  [2, 500, "2019-02-01", None, 1],

  [3, 175, "2019-02-01", None, 1],

  [4, 50, "2019-02-02", "2019-02-02", 0],

  [4, 300, "2019-02-02", None, 1],

  [5, 500, "2019-02-02", None, 1]]

  

header=["pk", "amount", "StartDate", "endDate", "active"]

df_existing_data=spark.createDataFrame(data=dat,schema=header)

#Create a table with SCD Type 2 and associated structure.

df_existing_data.write.format("parquet").mode("overwrite").partitionBy("active").saveAsTable("default.tab_amount")

df_existing_data.show()















#Extracting current Data from target table/scd type 2 table.

df_active_data=spark.sql("select * from default.tab_amount where active=1")

df_active_data.createOrReplaceTempView("ExisingActiveData")

df_active_data.show()









#Consider following as latest or new data arrived from source system.

dat= [

  [1, 75],

  [2, 500],

  [3, 200],

  [4, 350],

  [5, 500],

  [6, 800],

  [7, 1500]]

  

header=["pk", "amount"]

df_newdata=spark.createDataFrame(data=dat,schema=header)

df_newdata.createOrReplaceTempView("NewData")

df_newdata.show()












#Extracting new records to insert with start date  as today and end date as NULL.

df_NewRecords=spark.sql("select ND.*,current_Date() as startDate,NULL as endDate,1 as active from NewData ND left anti join ExisingActiveData EAD  on EAD.pk=ND.pk ")

df_NewRecords.show()









#Extracting updated active records from source table with start date as today and end date as NULL.

df_UpdatedActive=spark.sql("select ND.*,current_date() as startDate,cast(NULL as string) as endDate,1 as active from NewData ND inner join ExisingActiveData EAD on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedActive.createOrReplaceTempView("UpdatedActive")

df_UpdatedActive.show()








#Extracting existing active records where the values are not changed in target vs source comparison.

df_ExistingActive=spark.sql("select EAD.* from ExisingActiveData EAD left anti join UpdatedActive UA on EAD.pk=UA.pk")

df_ExistingActive.show()








#Extracting old updated records for updating its end date to yesterday.

df_UpdatedInactive=spark.sql("select EAD.pk,EAD.amount,startDate,date_sub(current_date(),1) as endDate,0 as active from ExisingActiveData EAD inner join NewData ND on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedInactive.show()








#Configuring for auto insert and dynamic partitioning.


spark.conf.set("hive.exec.dynamic.partition", "true")

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

#Consolidated active records.

df_TotalActive=df_NewRecords.unionAll(df_UpdatedActive).unionAll(df_ExistingActive)


#Appending the updated records with end date as yesterday to active=0 partition

df_UpdatedInactive.write.mode("append").insertInto("default.tab_amount")

#Overriding the consolidated active records to active=1 partition

df_TotalActive.write.mode("overwrite").insertInto("default.tab_amount")


#Below is the over all data in scd type 2 dimension after data load.

spark.sql("select * from default.tab_amount order by pk,active asc").show()




No comments:

Post a Comment