#Consider following data as existing data in target table.
dat= [[1, 50, "2019-02-01", "2019-02-02", 0],
[1, 75, "2019-02-02", None,1],
[2, 200, "2019-02-01", "2019-02-01", 0],
[2, 60, "2019-02-01", "2019-02-01", 0],
[2, 500, "2019-02-01", None, 1],
[3, 175, "2019-02-01", None, 1],
[4, 50, "2019-02-02", "2019-02-02", 0],
[4, 300, "2019-02-02", None, 1],
[5, 500, "2019-02-02", None, 1]]
header=["pk", "amount", "StartDate", "endDate", "active"]
df_existing_data=spark.createDataFrame(data=dat,schema=header)
#Create a table with SCD Type 2 and associated structure.
df_existing_data.write.format("parquet").mode("overwrite").partitionBy("active").saveAsTable("default.tab_amount")
df_existing_data.show()
#Extracting current Data from target table/scd type 2 table.
df_active_data=spark.sql("select * from default.tab_amount where active=1")
df_active_data.createOrReplaceTempView("ExisingActiveData")
df_active_data.show()
#Consider following as latest or new data arrived from source system.
dat= [
[1, 75],
[2, 500],
[3, 200],
[4, 350],
[5, 500],
[6, 800],
[7, 1500]]
header=["pk", "amount"]
df_newdata=spark.createDataFrame(data=dat,schema=header)
df_newdata.createOrReplaceTempView("NewData")
df_newdata.show()
#Extracting new records to insert with start date as today and end date as NULL.
df_NewRecords=spark.sql("select ND.*,current_Date() as startDate,NULL as endDate,1 as active from NewData ND left anti join ExisingActiveData EAD on EAD.pk=ND.pk ")
df_NewRecords.show()
#Extracting updated active records from source table with start date as today and end date as NULL.
df_UpdatedActive=spark.sql("select ND.*,current_date() as startDate,cast(NULL as string) as endDate,1 as active from NewData ND inner join ExisingActiveData EAD on EAD.pk=ND.pk where EAD.amount<>ND.amount")
df_UpdatedActive.createOrReplaceTempView("UpdatedActive")
df_UpdatedActive.show()
#Extracting existing active records where the values are not changed in target vs source comparison.
df_ExistingActive=spark.sql("select EAD.* from ExisingActiveData EAD left anti join UpdatedActive UA on EAD.pk=UA.pk")
df_ExistingActive.show()
#Extracting old updated records for updating its end date to yesterday.
df_UpdatedInactive=spark.sql("select EAD.pk,EAD.amount,startDate,date_sub(current_date(),1) as endDate,0 as active from ExisingActiveData EAD inner join NewData ND on EAD.pk=ND.pk where EAD.amount<>ND.amount")
df_UpdatedInactive.show()
#Configuring for auto insert and dynamic partitioning.
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
#Consolidated active records.
df_TotalActive=df_NewRecords.unionAll(df_UpdatedActive).unionAll(df_ExistingActive)
#Appending the updated records with end date as yesterday to active=0 partition
df_UpdatedInactive.write.mode("append").insertInto("default.tab_amount")
#Overriding the consolidated active records to active=1 partition
df_TotalActive.write.mode("overwrite").insertInto("default.tab_amount")
#Below is the over all data in scd type 2 dimension after data load.
spark.sql("select * from default.tab_amount order by pk,active asc").show()
No comments:
Post a Comment