Friday, April 23, 2021

Hive Table operations - dropping partition, renaming partition, changing table properties, insert sample data

 

--Changing the table type from external table to managed table.

alter table c_sample_tbl set tblproperties('EXTERNAL'='FALSE');

--Dropping the hive partition along with its cache.

alter table c_sample_tbl drop if exists partition (bus_dt='2019-09-30') purge;

--Renaming the hive partition.

ALTER TABLE c_sample_tbl PARTITION (bus_dt='2019-09-30_1') RENAME TO PARTITION (bus_dt='2019-09-30');

--Changing the table type from managed table to external table.

alter table c_sample_tbl set tblproperties('EXTERNAL'='TRUE');

--Inserting sample data /static data to hive table for a static partition.

insert into c_curr_rates PARTITION (bus_dt = '2019-06-30') values ('INR','SGD',0.01886,'2019-12-02');


Monday, April 19, 2021

Common commands to manage Linux background process

Linux command to run in background.

command: nohup sh run.sh &

Linux command to run multiple commands in sequence.

command: nohup sh run.sh 2021-01-05 && sh run.sh 2021-02-08 && sh run.sh 2021-03-09 &

Finding the back ground running process in linux and killing it.

command:  ps -ef | grep sh run.sh

get the pid from above command output and use it to kill it.

kill -9 92646










Pyspark Multiple sessions, Global Temp View, run time configuration setting

 **************Py Spark Multiple Sessions********************************

#Initialize first pyspark session as below

import pyspark
spark1=pyspark.sql.SparkSession.builder.master("local").appName("Single_app").getOrCreate()


#Tempory view created in one session cannot be accessed in another session, below is an example


spark2=spark1.newSession()
spark1.sql("select 1,2").createOrReplaceTempView("spark1_view")
spark1.catalog.listTables()





spark2.catalog.listTables()





#To access or have a common temp space, create a global temp view


spark1.sql("select 1,2").createGlobalTempView("spark1_global")
spark2.sql("select * from global_temp.spark1_global").show()









"""
Another thing , if we have different sessions sharing same context we can have different set of configurations among
those sessions as below
"""
spark1.conf.set("spark.sql.shuffle.partitions","100")
spark2.conf.set("spark.sql.shuffle.partitions","200")


#display the respective conf values


spark1.conf.get("spark.sql.shuffle.partitions")





spark2.conf.get("spark.sql.shuffle.partitions")







#stopping one session stops SparkContext and all associated sessions
spark1.stop()


************************Below is scala version***********************

he two most common uses cases are:

  • Keeping sessions with minor differences in configuration.

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions
    res0: Int = 200
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@618a9cb7
    scala> newSpark.conf.set("spark.sql.shuffle.partitions", 99)
    scala> newSpark.range(100).groupBy("id").count.rdd.getNumPartitions
    res2: Int = 99
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions  // No effect on initial session
    res3: Int = 200
    
  • Separating temporary namespaces:

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(1).createTempView("foo")
    scala> 
    scala> spark.catalog.tableExists("foo")
    res1: Boolean = true
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@73418044
    scala> newSpark.catalog.tableExists("foo")
    res2: Boolean = false
    scala> newSpark.range(100).createTempView("foo")  // No exception
    scala> spark.table("foo").count // No effect on inital session
    res4: Long = 1     

Sunday, April 18, 2021

SCD TYPE 2 Implementation in Pyspark

 #Consider following data as existing data in target table.

dat= [[1, 50, "2019-02-01", "2019-02-02", 0],

  [1, 75, "2019-02-02", None,1],

  [2, 200, "2019-02-01", "2019-02-01", 0],

  [2, 60, "2019-02-01", "2019-02-01", 0],

  [2, 500, "2019-02-01", None, 1],

  [3, 175, "2019-02-01", None, 1],

  [4, 50, "2019-02-02", "2019-02-02", 0],

  [4, 300, "2019-02-02", None, 1],

  [5, 500, "2019-02-02", None, 1]]

  

header=["pk", "amount", "StartDate", "endDate", "active"]

df_existing_data=spark.createDataFrame(data=dat,schema=header)

#Create a table with SCD Type 2 and associated structure.

df_existing_data.write.format("parquet").mode("overwrite").partitionBy("active").saveAsTable("default.tab_amount")

df_existing_data.show()















#Extracting current Data from target table/scd type 2 table.

df_active_data=spark.sql("select * from default.tab_amount where active=1")

df_active_data.createOrReplaceTempView("ExisingActiveData")

df_active_data.show()









#Consider following as latest or new data arrived from source system.

dat= [

  [1, 75],

  [2, 500],

  [3, 200],

  [4, 350],

  [5, 500],

  [6, 800],

  [7, 1500]]

  

header=["pk", "amount"]

df_newdata=spark.createDataFrame(data=dat,schema=header)

df_newdata.createOrReplaceTempView("NewData")

df_newdata.show()












#Extracting new records to insert with start date  as today and end date as NULL.

df_NewRecords=spark.sql("select ND.*,current_Date() as startDate,NULL as endDate,1 as active from NewData ND left anti join ExisingActiveData EAD  on EAD.pk=ND.pk ")

df_NewRecords.show()









#Extracting updated active records from source table with start date as today and end date as NULL.

df_UpdatedActive=spark.sql("select ND.*,current_date() as startDate,cast(NULL as string) as endDate,1 as active from NewData ND inner join ExisingActiveData EAD on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedActive.createOrReplaceTempView("UpdatedActive")

df_UpdatedActive.show()








#Extracting existing active records where the values are not changed in target vs source comparison.

df_ExistingActive=spark.sql("select EAD.* from ExisingActiveData EAD left anti join UpdatedActive UA on EAD.pk=UA.pk")

df_ExistingActive.show()








#Extracting old updated records for updating its end date to yesterday.

df_UpdatedInactive=spark.sql("select EAD.pk,EAD.amount,startDate,date_sub(current_date(),1) as endDate,0 as active from ExisingActiveData EAD inner join NewData ND on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedInactive.show()








#Configuring for auto insert and dynamic partitioning.


spark.conf.set("hive.exec.dynamic.partition", "true")

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

#Consolidated active records.

df_TotalActive=df_NewRecords.unionAll(df_UpdatedActive).unionAll(df_ExistingActive)


#Appending the updated records with end date as yesterday to active=0 partition

df_UpdatedInactive.write.mode("append").insertInto("default.tab_amount")

#Overriding the consolidated active records to active=1 partition

df_TotalActive.write.mode("overwrite").insertInto("default.tab_amount")


#Below is the over all data in scd type 2 dimension after data load.

spark.sql("select * from default.tab_amount order by pk,active asc").show()




Friday, April 16, 2021

Hive optimization techniques

 The main components of the Hive are as follows:

  • Metastore
  • Driver
  • Compiler
  • Optimizer
  • Executor
  • Client


While Hadoop/hive can process nearly any amount of data, but optimizations can lead to big savings, proportional to the amount of data, in terms of processing time and cost. There are a whole lot of optimizations that can be applied in the hive. Let us look into the optimization techniques we are going to cover:

  1. Partitioning
  2. Bucketing
  3. Using Tez as Execution Engine
  4. Using Compression
  5. Using ORC Format
  6. Join Optimizations
  7. Cost-based Optimizer

Thursday, April 15, 2021

Avoid small file issue in Hive

One way to control the size of files when inserting into a table using Hive, is to set the below parameters:

set hive.merge.tezfiles=true;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=128000000;
set hive.merge.smallfiles.avgsize=128000000;

This will work for both M/R and Tez engine and will ensure that all files created are at or below 128 MB in size (you can alter that size number according to your use case. Additional reading here: https://community.cloudera.com/t5/Community-Articles/ORC-Creation-Best-Practices/ta-p/248963).

The easiest way to merge the files of the table is to remake it, while having ran the above hive commands at runtime:

CREATE TABLE new_table LIKE old_table;
INSERT INTO new_table select * from old_table;

Friday, April 9, 2021

Hive Architecture

 


Hive complex data types , explode, Lateral view

Create a table with array, struct complex data types as below.

CREATE TABLE student_details(

id_key string,
name string,
subjects array<string>,
address struct<city:string,State:string>
);

Insert sample data as below.

INSERT INTO student_details with below command.

select 

'AA87U',

'BRYAN', 

array('ENG','CAL_1','CAL_2','HST','MUS'),

named_struct('city','Tampa','State','FL');


display the sample data.

select * from student_details;





Explode:  Displaying array using explode method and Lateral view.
select id_key,name,each_subject
from
student_details
lateral view explode(subjects) temp_table as each_subject;









Inline: Displaying struct components using inline.
select id_key,name,add.*
from
student_details
lateral view inline (array(address)) add;







Displaying both array and struct data in 1NF.
select id_key,name,each_subject,add.*
from
student_details
lateral view explode(subjects) temp_table as each_subject
lateral view inline (array(address)) add;






Commonly used functions in array.
select size(subjects),sort_array(subjects),concat_ws('/',subjects),array_contains(subjects,'END') from student_details;



Thursday, April 8, 2021

Spark Cache() Vs Persist()

The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY 

Using persist() we can use various storage levels to Store Persisted RDDs in Apache Spark. Let’s discuss each RDD storage level one by one-

a. MEMORY_ONLY

In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.

b. MEMORY_AND_DISK

In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.

c. MEMORY_ONLY_SER

This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.

d. MEMORY_AND_DISK_SER

It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.

e. DISK_ONLY

In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.

Wednesday, March 17, 2021

Sqoop performance improvement

In General, performance tuning in Sqoop can be achieved by:

  • Controlling Parallelism
  • Controlling Data Transfer Process

Controlling Parallelism:

Sqoop works on the MapReduce programming model implemented in Hadoop. Sqoop imports exports data from most relational databases in parallel. The number of map tasks per job determines it’s parallelism. By controlling the parallelism, we can handle the load on our databases and hence its performance. Here are a couple of ways in Sqoop jobs to exploit parallelism:

Changing the number of mappers

Typical Sqoop jobs launch four mappers by default. To optimise performance, increasing the map tasks (Parallel processes) to an integer value of 8 or 16 can show an increase in performance in some databases.

By using the -m or --num-mappers parameter we can set the degree of parallelism in Sqoop. Changing the number of mappers to 10 for example:

sqoop import  
--connect jdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--num-mappers 10

A few things to keep in mind is that the number of map tasks should be less than the maximum number of parallel database connections possible. The increase in the degree of parallelism should be lesser than that which is available within your MapReduce cluster.

Split By Query

When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range.

The --split-by parameter splits the column data uniformly on the basis of the number of mappers specified. The syntax for --split-by is given by:

sqoop import  
--connect jdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \

--split-by city_id 

Custom Boundary Queries

As seen before split-by uniformly distributes the data for import. If the column has non-uniform values, boundary-query can be used if we do not get the desired results while using the split-by argument alone.

Ideally, we configure the boundary-query parameter with the min(id) and max(id) along with the table name.

sqoop import \
--connect jdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--query 'SELECT normcities.id, \
countries.country, \
normcities.city \
FROM normcities \
JOIN countries USING(country_id) \
WHERE $CONDITIONS' \
--split-by id \
--target-dir cities \
--boundary-query "select min(id), max(id) from normcities"

Here $CONDITIONS is used internally by Sqoop and will be expanded to pick the min and max id of the cities table to split the data. By using custom values, efficient partitions are derived splitting the data resulting in performance improvement in general.

Thursday, January 28, 2021

Pyspark BroadCast Join

First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)

Spark will use autoBroadcastJoinThreshold and automatically broadcast data:

df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

When we disable auto broadcast Spark will use standard SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

but can forced to use BroadcastHashJoin with broadcast hint:

df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

SQL has its own hints format (similar to the one used in Hive):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)

So to answer your question - autoBroadcastJoinThreshold is applicable when working with Dataset API, but it is not relevant when using explicit broadcast hints.

Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.