Friday, April 23, 2021

Hive Table operations - dropping partition, renaming partition, changing table properties, insert sample data

 

--Changing the table type from external table to managed table.

alter table c_sample_tbl set tblproperties('EXTERNAL'='FALSE');

--Dropping the hive partition along with its cache.

alter table c_sample_tbl drop if exists partition (bus_dt='2019-09-30') purge;

--Renaming the hive partition.

ALTER TABLE c_sample_tbl PARTITION (bus_dt='2019-09-30_1') RENAME TO PARTITION (bus_dt='2019-09-30');

--Changing the table type from managed table to external table.

alter table c_sample_tbl set tblproperties('EXTERNAL'='TRUE');

--Inserting sample data /static data to hive table for a static partition.

insert into c_curr_rates PARTITION (bus_dt = '2019-06-30') values ('INR','SGD',0.01886,'2019-12-02');


Monday, April 19, 2021

Common commands to manage Linux background process

Linux command to run in background.

command: nohup sh run.sh &

Linux command to run multiple commands in sequence.

command: nohup sh run.sh 2021-01-05 && sh run.sh 2021-02-08 && sh run.sh 2021-03-09 &

Finding the back ground running process in linux and killing it.

command:  ps -ef | grep sh run.sh

get the pid from above command output and use it to kill it.

kill -9 92646










Pyspark Multiple sessions, Global Temp View, run time configuration setting

 **************Py Spark Multiple Sessions********************************

#Initialize first pyspark session as below

import pyspark
spark1=pyspark.sql.SparkSession.builder.master("local").appName("Single_app").getOrCreate()


#Tempory view created in one session cannot be accessed in another session, below is an example


spark2=spark1.newSession()
spark1.sql("select 1,2").createOrReplaceTempView("spark1_view")
spark1.catalog.listTables()





spark2.catalog.listTables()





#To access or have a common temp space, create a global temp view


spark1.sql("select 1,2").createGlobalTempView("spark1_global")
spark2.sql("select * from global_temp.spark1_global").show()









"""
Another thing , if we have different sessions sharing same context we can have different set of configurations among
those sessions as below
"""
spark1.conf.set("spark.sql.shuffle.partitions","100")
spark2.conf.set("spark.sql.shuffle.partitions","200")


#display the respective conf values


spark1.conf.get("spark.sql.shuffle.partitions")





spark2.conf.get("spark.sql.shuffle.partitions")







#stopping one session stops SparkContext and all associated sessions
spark1.stop()


************************Below is scala version***********************

he two most common uses cases are:

  • Keeping sessions with minor differences in configuration.

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions
    res0: Int = 200
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@618a9cb7
    scala> newSpark.conf.set("spark.sql.shuffle.partitions", 99)
    scala> newSpark.range(100).groupBy("id").count.rdd.getNumPartitions
    res2: Int = 99
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions  // No effect on initial session
    res3: Int = 200
    
  • Separating temporary namespaces:

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(1).createTempView("foo")
    scala> 
    scala> spark.catalog.tableExists("foo")
    res1: Boolean = true
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@73418044
    scala> newSpark.catalog.tableExists("foo")
    res2: Boolean = false
    scala> newSpark.range(100).createTempView("foo")  // No exception
    scala> spark.table("foo").count // No effect on inital session
    res4: Long = 1     

Sunday, April 18, 2021

SCD TYPE 2 Implementation in Pyspark

 #Consider following data as existing data in target table.

dat= [[1, 50, "2019-02-01", "2019-02-02", 0],

  [1, 75, "2019-02-02", None,1],

  [2, 200, "2019-02-01", "2019-02-01", 0],

  [2, 60, "2019-02-01", "2019-02-01", 0],

  [2, 500, "2019-02-01", None, 1],

  [3, 175, "2019-02-01", None, 1],

  [4, 50, "2019-02-02", "2019-02-02", 0],

  [4, 300, "2019-02-02", None, 1],

  [5, 500, "2019-02-02", None, 1]]

  

header=["pk", "amount", "StartDate", "endDate", "active"]

df_existing_data=spark.createDataFrame(data=dat,schema=header)

#Create a table with SCD Type 2 and associated structure.

df_existing_data.write.format("parquet").mode("overwrite").partitionBy("active").saveAsTable("default.tab_amount")

df_existing_data.show()















#Extracting current Data from target table/scd type 2 table.

df_active_data=spark.sql("select * from default.tab_amount where active=1")

df_active_data.createOrReplaceTempView("ExisingActiveData")

df_active_data.show()









#Consider following as latest or new data arrived from source system.

dat= [

  [1, 75],

  [2, 500],

  [3, 200],

  [4, 350],

  [5, 500],

  [6, 800],

  [7, 1500]]

  

header=["pk", "amount"]

df_newdata=spark.createDataFrame(data=dat,schema=header)

df_newdata.createOrReplaceTempView("NewData")

df_newdata.show()












#Extracting new records to insert with start date  as today and end date as NULL.

df_NewRecords=spark.sql("select ND.*,current_Date() as startDate,NULL as endDate,1 as active from NewData ND left anti join ExisingActiveData EAD  on EAD.pk=ND.pk ")

df_NewRecords.show()









#Extracting updated active records from source table with start date as today and end date as NULL.

df_UpdatedActive=spark.sql("select ND.*,current_date() as startDate,cast(NULL as string) as endDate,1 as active from NewData ND inner join ExisingActiveData EAD on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedActive.createOrReplaceTempView("UpdatedActive")

df_UpdatedActive.show()








#Extracting existing active records where the values are not changed in target vs source comparison.

df_ExistingActive=spark.sql("select EAD.* from ExisingActiveData EAD left anti join UpdatedActive UA on EAD.pk=UA.pk")

df_ExistingActive.show()








#Extracting old updated records for updating its end date to yesterday.

df_UpdatedInactive=spark.sql("select EAD.pk,EAD.amount,startDate,date_sub(current_date(),1) as endDate,0 as active from ExisingActiveData EAD inner join NewData ND on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedInactive.show()








#Configuring for auto insert and dynamic partitioning.


spark.conf.set("hive.exec.dynamic.partition", "true")

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

#Consolidated active records.

df_TotalActive=df_NewRecords.unionAll(df_UpdatedActive).unionAll(df_ExistingActive)


#Appending the updated records with end date as yesterday to active=0 partition

df_UpdatedInactive.write.mode("append").insertInto("default.tab_amount")

#Overriding the consolidated active records to active=1 partition

df_TotalActive.write.mode("overwrite").insertInto("default.tab_amount")


#Below is the over all data in scd type 2 dimension after data load.

spark.sql("select * from default.tab_amount order by pk,active asc").show()




Friday, April 16, 2021

Hive optimization techniques

 The main components of the Hive are as follows:

  • Metastore
  • Driver
  • Compiler
  • Optimizer
  • Executor
  • Client


While Hadoop/hive can process nearly any amount of data, but optimizations can lead to big savings, proportional to the amount of data, in terms of processing time and cost. There are a whole lot of optimizations that can be applied in the hive. Let us look into the optimization techniques we are going to cover:

  1. Partitioning
  2. Bucketing
  3. Using Tez as Execution Engine
  4. Using Compression
  5. Using ORC Format
  6. Join Optimizations
  7. Cost-based Optimizer

Thursday, April 15, 2021

Avoid small file issue in Hive

One way to control the size of files when inserting into a table using Hive, is to set the below parameters:

set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=128000000;
set hive.merge.smallfiles.avgsize=128000000;

This will work for both M/R and Tez engine and will ensure that all files created are at or below 128 MB in size (you can alter that size number according to your use case. Additional reading here: https://community.cloudera.com/t5/Community-Articles/ORC-Creation-Best-Practices/ta-p/248963).

The easiest way to merge the files of the table is to remake it, while having ran the above hive commands at runtime:

CREATE TABLE new_table LIKE old_table;
INSERT INTO new_table select * from old_table;

Friday, April 9, 2021

Hive Architecture

 


Hive complex data types , explode, Lateral view

Create a table with array, struct complex data types as below.

CREATE TABLE student_details(

id_key string,
name string,
subjects array<string>,
address struct<city:string,State:string>
);

Insert sample data as below.

INSERT INTO student_details with below command.

select 

'AA87U',

'BRYAN', 

array('ENG','CAL_1','CAL_2','HST','MUS'),

named_struct('city','Tampa','State','FL');


display the sample data.

select * from student_details;





Explode:  Displaying array using explode method and Lateral view.
select id_key,name,each_subject
from
student_details
lateral view explode(subjects) temp_table as each_subject;









Inline: Displaying struct components using inline.
select id_key,name,add.*
from
student_details
lateral view inline (array(address)) add;







Displaying both array and struct data in 1NF.
select id_key,name,each_subject,add.*
from
student_details
lateral view explode(subjects) temp_table as each_subject
lateral view inline (array(address)) add;






Commonly used functions in array.
select size(subjects),sort_array(subjects),concat_ws('/',subjects),array_contains(subjects,'END') from student_details;



Thursday, April 8, 2021

Spark Cache() Vs Persist()

The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY 

Using persist() we can use various storage levels to Store Persisted RDDs in Apache Spark. Let’s discuss each RDD storage level one by one-

a. MEMORY_ONLY

In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.

b. MEMORY_AND_DISK

In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.

c. MEMORY_ONLY_SER

This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.

d. MEMORY_AND_DISK_SER

It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.

e. DISK_ONLY

In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.