Thursday, January 28, 2021

Pyspark BroadCast Join

First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)

Spark will use autoBroadcastJoinThreshold and automatically broadcast data:

df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

When we disable auto broadcast Spark will use standard SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

but can forced to use BroadcastHashJoin with broadcast hint:

df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

SQL has its own hints format (similar to the one used in Hive):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)

So to answer your question - autoBroadcastJoinThreshold is applicable when working with Dataset API, but it is not relevant when using explicit broadcast hints.

Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.

Sunday, January 10, 2021

coalesce vs repartition

 coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

Catalyst optimizer, Tungsten optimizer

Spark uses two engines to optimize and run the queries - Catalyst and Tungsten, in that order. Catalyst basically generates an optimized physical query plan from the logical query plan by applying a series of transformations like predicate pushdown, column pruning, and constant folding on the logical plan. This optimized query plan is then used by Tungsten to generate optimized code, that resembles hand written code, by making use of Whole-stage Codegen functionality introduced in Spark 2.0. This functionality has improved Spark's efficiency by a huge margin from Spark 1.6, which used the traditional Volcano Iterator Model.

Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:

  • Easily add new optimization techniques and features to Spark SQL
  • Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.)

When you execute code, Spark SQL uses Catalyst's general tree transformation framework in four phases, as shown below:

No alt text provided for this image



Tungsten

The goal of Project Tungsten is to improve Spark execution by optimising Spark jobs for CPU and memory efficiency (as opposed to network and disk I/O which are considered fast enough). 

  1. Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly,
  2. Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates
  3. Whole-Stage Code Generation (aka CodeGen).

property: spark.sql.tungsten.enabled to true

All thanks to below article.

https://www.linkedin.com/pulse/catalyst-tungsten-apache-sparks-speeding-engine-deepak-rajak/?articleId=6674601890514378752 

Word Count Using flatMap and Map

df = sc.textFile("dbfs:/FileStore/test.txt")

# below is the text file content

"""

hadoop is fast

hive is sql on hdfs

spark is superfast

spark is awesome

"""

 fm=df.flatMap(lambda x: x.split(" ")).map(lambda x: (x,1)).groupByKey().mapValues(sum)

fm.take(20)


Out[8]: [('hadoop', 1), ('is', 4), ('hive', 1), ('hdfs', 1), ('awesome', 1), ('fast', 1), ('sql', 1), ('on', 1), ('spark', 2), ('superfast', 1)]



RDD Vs Dataframe Vs Dataset

 

What are RDDs?

RDDs or Resilient Distributed Datasets is the fundamental data structure of the Spark. It is the collection of objects which is capable of storing the data partitioned across the multiple nodes of the cluster and also allows them to do processing in parallel.


What are Dataframes?

It was introduced first in Spark version 1.3 to overcome the limitations of the Spark RDD. Spark Dataframes are the distributed collection of the data points, but here, the data is organized into the named columns. They allow developers to debug the code during the runtime which was not allowed with the RDDs.


What are Datasets?

Spark Datasets is an extension of Dataframes API with the benefits of both RDDs and the Datasets. It is fast as well as provides a type-safe interface. Type safety means that the compiler will validate the data types of all the columns in the dataset while compilation only and will throw an error if there is any mismatch in the data types.

We cannot create Spark Datasets in Python yet. The dataset API is available only in Scala and Java only


Below are details.

RDDs and Datasets are type safe means that compiler know the Columns and it's data type of the Column whether it is Long, String, etc....

But, In Dataframe, every time when you call an action, collect() for instance,then it will return the result as an Array of Rows not as Long, String data type. In dataframe, Columns have their own type such as integer, String but they are not exposed to you. To you, its any type. To convert the Row of data into it's suitable type you have to use .asInstanceOf method.

eg: In Scala:

scala > :type df.collect()
Array[org.apache.spark.sql.Row]


df.collect().map{ row => 
    val str = row(0).asInstanceOf[String]
    val num = row(1).asInstanceOf[Long]
}   

Reference: https://www.analyticsvidhya.com/blog/2020/11/what-is-the-difference-between-rdds-dataframes-and-datasets/                   

SparkContext vs SparkSesssion

 In older version(before 1+) of Spark there was different contexts that was entrypoints to the different api (sparkcontext for the core api, sql context for the spark-sql api, streaming context for the Dstream api etc...) this was source of confusion for the developer and was a point of optimization for the spark team, so in the most recent version of spark there is only one entrypoint (the spark session) and from this you can get the various other entrypoint (the spark context , the streaming context , etc ....)


Another difference with sparksession is, now different users can submit same applications with different configurations. even though its not advisable to run more than 1 session at a time, its one of the differences to be metioned.

Cluster Mode Vs Client Mode

 In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.


  • Client mode, driver will be running in the machine where application got submitted and the machine has to be available in the network till the application completes.
  • Cluster mode, driver will be running in application master(one per spark application) node and machine submitting the application need not to be in network after submission

Client mode

Client mode

Cluster mode

Cluster mode

If Spark application is submitted with cluster mode on its own resource manager(standalone) then the driver process will be in one of the worker nodes.

Spark Architecure

 Spark In depth Architecture: