Thursday, January 28, 2021

Pyspark BroadCast Join

First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)

Spark will use autoBroadcastJoinThreshold and automatically broadcast data:

df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

When we disable auto broadcast Spark will use standard SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

but can forced to use BroadcastHashJoin with broadcast hint:

df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

SQL has its own hints format (similar to the one used in Hive):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)

So to answer your question - autoBroadcastJoinThreshold is applicable when working with Dataset API, but it is not relevant when using explicit broadcast hints.

Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.

No comments:

Post a Comment