from pyspark.sql import SparkSession
Wednesday, October 18, 2023
pyspark code to get estimated size of dataframe in bytes
Friday, December 2, 2022
Spark cluster mode parameters
We can divide these options into two categories.
The first category is data file, data files means spark only add the specified files into containers, no further commands will be executed. There are two options in this category:
--archives: with this option, you can submit archives, and spark will extract files in it for you, spark supportzip,tar... formats.--files: with this option, you can submit files, spark will put it in container, won't do any other things.sc.addFileis the programming api for this one.
The second category is code dependencies. In spark application, code dependency could be JVM dependency or python dependency for pyspark application.
--jars:this option is used to submit JVM dependency with Jar file, spark will add these Jars intoCLASSPATHautomatically, so your JVM can load them.--py-files: this option is used to submit Python dependency, it can be.py,.eggor.zip. spark will add these file intoPYTHONPATH, so your python interpreter can find them.sc.addPyFileis the programming api for this one.PS: for single
.pyfile, spark will add it into a__pyfiles__folder, others will add into CWD.
All these four options can specified multiple files, splitted with "," and for each file, you can specified an alias through {URL}#{ALIAS} format. Don't specify alias in --py-files option, cause spark won't add alias into PYTHONPATH.
Example:
-- archives abc.zip#new_abc,cde.zip#new_cde
spark will extract abc.zip, cde.zip and creates new_abc, new_cde folders in container
Monday, April 19, 2021
Common commands to manage Linux background process
Linux command to run in background.
command: nohup sh run.sh &
Linux command to run multiple commands in sequence.
command: nohup sh run.sh 2021-01-05 && sh run.sh 2021-02-08 && sh run.sh 2021-03-09 &
Finding the back ground running process in linux and killing it.
command: ps -ef | grep sh run.sh
get the pid from above command output and use it to kill it.
kill -9 92646
Pyspark Multiple sessions, Global Temp View, run time configuration setting
**************Py Spark Multiple Sessions********************************
#Initialize first pyspark session as below
import pyspark
spark1=pyspark.sql.SparkSession.builder.master("local").appName("Single_app").getOrCreate()
#Tempory view created in one session cannot be accessed in another session, below is an example
spark2=spark1.newSession()
spark1.sql("select 1,2").createOrReplaceTempView("spark1_view")
spark1.catalog.listTables()
spark2.catalog.listTables()
#To access or have a common temp space, create a global temp view
spark1.sql("select 1,2").createGlobalTempView("spark1_global")
spark2.sql("select * from global_temp.spark1_global").show()
"""
Another thing , if we have different sessions sharing same context we can have different set of configurations among
those sessions as below
"""
spark1.conf.set("spark.sql.shuffle.partitions","100")
spark2.conf.set("spark.sql.shuffle.partitions","200")
#display the respective conf values
spark1.conf.get("spark.sql.shuffle.partitions")
spark2.conf.get("spark.sql.shuffle.partitions")
#stopping one session stops SparkContext and all associated sessions
spark1.stop()
************************Below is scala version***********************
he two most common uses cases are:
Keeping sessions with minor differences in configuration.
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions res0: Int = 200 scala> scala> val newSpark = spark.newSession newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@618a9cb7 scala> newSpark.conf.set("spark.sql.shuffle.partitions", 99) scala> newSpark.range(100).groupBy("id").count.rdd.getNumPartitions res2: Int = 99 scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions // No effect on initial session res3: Int = 200Separating temporary namespaces:
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(1).createTempView("foo") scala> scala> spark.catalog.tableExists("foo") res1: Boolean = true scala> scala> val newSpark = spark.newSession newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@73418044 scala> newSpark.catalog.tableExists("foo") res2: Boolean = false scala> newSpark.range(100).createTempView("foo") // No exception scala> spark.table("foo").count // No effect on inital session res4: Long = 1
Sunday, April 18, 2021
SCD TYPE 2 Implementation in Pyspark
#Consider following data as existing data in target table.
dat= [[1, 50, "2019-02-01", "2019-02-02", 0],
[1, 75, "2019-02-02", None,1],
[2, 200, "2019-02-01", "2019-02-01", 0],
[2, 60, "2019-02-01", "2019-02-01", 0],
[2, 500, "2019-02-01", None, 1],
[3, 175, "2019-02-01", None, 1],
[4, 50, "2019-02-02", "2019-02-02", 0],
[4, 300, "2019-02-02", None, 1],
[5, 500, "2019-02-02", None, 1]]
header=["pk", "amount", "StartDate", "endDate", "active"]
df_existing_data=spark.createDataFrame(data=dat,schema=header)
#Create a table with SCD Type 2 and associated structure.
df_existing_data.write.format("parquet").mode("overwrite").partitionBy("active").saveAsTable("default.tab_amount")
df_existing_data.show()
#Extracting current Data from target table/scd type 2 table.
df_active_data=spark.sql("select * from default.tab_amount where active=1")
df_active_data.createOrReplaceTempView("ExisingActiveData")
df_active_data.show()
#Consider following as latest or new data arrived from source system.
dat= [
[1, 75],
[2, 500],
[3, 200],
[4, 350],
[5, 500],
[6, 800],
[7, 1500]]
header=["pk", "amount"]
df_newdata=spark.createDataFrame(data=dat,schema=header)
df_newdata.createOrReplaceTempView("NewData")
df_newdata.show()
#Extracting new records to insert with start date as today and end date as NULL.
df_NewRecords=spark.sql("select ND.*,current_Date() as startDate,NULL as endDate,1 as active from NewData ND left anti join ExisingActiveData EAD on EAD.pk=ND.pk ")
df_NewRecords.show()
#Extracting updated active records from source table with start date as today and end date as NULL.
df_UpdatedActive=spark.sql("select ND.*,current_date() as startDate,cast(NULL as string) as endDate,1 as active from NewData ND inner join ExisingActiveData EAD on EAD.pk=ND.pk where EAD.amount<>ND.amount")
df_UpdatedActive.createOrReplaceTempView("UpdatedActive")
df_UpdatedActive.show()
#Extracting existing active records where the values are not changed in target vs source comparison.
df_ExistingActive=spark.sql("select EAD.* from ExisingActiveData EAD left anti join UpdatedActive UA on EAD.pk=UA.pk")
df_ExistingActive.show()
#Extracting old updated records for updating its end date to yesterday.
df_UpdatedInactive=spark.sql("select EAD.pk,EAD.amount,startDate,date_sub(current_date(),1) as endDate,0 as active from ExisingActiveData EAD inner join NewData ND on EAD.pk=ND.pk where EAD.amount<>ND.amount")
df_UpdatedInactive.show()
#Configuring for auto insert and dynamic partitioning.
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
#Consolidated active records.
df_TotalActive=df_NewRecords.unionAll(df_UpdatedActive).unionAll(df_ExistingActive)
#Appending the updated records with end date as yesterday to active=0 partition
df_UpdatedInactive.write.mode("append").insertInto("default.tab_amount")
#Overriding the consolidated active records to active=1 partition
df_TotalActive.write.mode("overwrite").insertInto("default.tab_amount")
#Below is the over all data in scd type 2 dimension after data load.
spark.sql("select * from default.tab_amount order by pk,active asc").show()
Thursday, April 8, 2021
Spark Cache() Vs Persist()
The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY
Using persist() we can use various storage levels to Store Persisted RDDs in Apache Spark. Let’s discuss each RDD storage level one by one-
a. MEMORY_ONLY
In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.
b. MEMORY_AND_DISK
In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.
c. MEMORY_ONLY_SER
This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.
d. MEMORY_AND_DISK_SER
It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.
e. DISK_ONLY
In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.
Thursday, January 28, 2021
Pyspark BroadCast Join
First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)
Spark will use autoBroadcastJoinThreshold and automatically broadcast data:
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
When we disable auto broadcast Spark will use standard SortMergeJoin:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *Range (0, 100, step=1, splits=Some(8))
+- *Sort [id#3L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
but can forced to use BroadcastHashJoin with broadcast hint:
df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
SQL has its own hints format (similar to the one used in Hive):
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
spark.sql(
"SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)
So to answer your question - autoBroadcastJoinThreshold is applicable when working with Dataset API, but it is not relevant when using explicit broadcast hints.
Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.
Sunday, January 10, 2021
coalesce vs repartition
coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.
Catalyst optimizer, Tungsten optimizer
Spark uses two engines to optimize and run the queries - Catalyst and Tungsten, in that order. Catalyst basically generates an optimized physical query plan from the logical query plan by applying a series of transformations like predicate pushdown, column pruning, and constant folding on the logical plan. This optimized query plan is then used by Tungsten to generate optimized code, that resembles hand written code, by making use of Whole-stage Codegen functionality introduced in Spark 2.0. This functionality has improved Spark's efficiency by a huge margin from Spark 1.6, which used the traditional Volcano Iterator Model.
Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:
- Easily add new optimization techniques and features to Spark SQL
- Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.)
When you execute code, Spark SQL uses Catalyst's general tree transformation framework in four phases, as shown below:
Tungsten
The goal of Project Tungsten is to improve Spark execution by optimising Spark jobs for CPU and memory efficiency (as opposed to network and disk I/O which are considered fast enough).
- Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly,
- Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates
- Whole-Stage Code Generation (aka CodeGen).
property: spark.sql.tungsten.enabled to true
All thanks to below article.
https://www.linkedin.com/pulse/catalyst-tungsten-apache-sparks-speeding-engine-deepak-rajak/?articleId=6674601890514378752
Word Count Using flatMap and Map
df = sc.textFile("dbfs:/FileStore/test.txt")
# below is the text file content
"""
hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome
"""
fm=df.flatMap(lambda x: x.split(" ")).map(lambda x: (x,1)).groupByKey().mapValues(sum)
fm.take(20)
Out[8]: [('hadoop', 1), ('is', 4), ('hive', 1), ('hdfs', 1), ('awesome', 1), ('fast', 1), ('sql', 1), ('on', 1), ('spark', 2), ('superfast', 1)]
RDD Vs Dataframe Vs Dataset
What are RDDs?
RDDs or Resilient Distributed Datasets is the fundamental data structure of the Spark. It is the collection of objects which is capable of storing the data partitioned across the multiple nodes of the cluster and also allows them to do processing in parallel.
What are Dataframes?
It was introduced first in Spark version 1.3 to overcome the limitations of the Spark RDD. Spark Dataframes are the distributed collection of the data points, but here, the data is organized into the named columns. They allow developers to debug the code during the runtime which was not allowed with the RDDs.
What are Datasets?
Spark Datasets is an extension of Dataframes API with the benefits of both RDDs and the Datasets. It is fast as well as provides a type-safe interface. Type safety means that the compiler will validate the data types of all the columns in the dataset while compilation only and will throw an error if there is any mismatch in the data types.
We cannot create Spark Datasets in Python yet. The dataset API is available only in Scala and Java only
Below are details.
But, In Dataframe, every time when you call an action, collect() for instance,then it will return the result as an Array of Rows not as Long, String data type. In dataframe, Columns have their own type such as integer, String but they are not exposed to you. To you, its any type. To convert the Row of data into it's suitable type you have to use .asInstanceOf method.
eg: In Scala:
scala > :type df.collect()
Array[org.apache.spark.sql.Row]
df.collect().map{ row =>
val str = row(0).asInstanceOf[String]
val num = row(1).asInstanceOf[Long]
}
Reference: https://www.analyticsvidhya.com/blog/2020/11/what-is-the-difference-between-rdds-dataframes-and-datasets/ 








