Monday, April 19, 2021

Pyspark Multiple sessions, Global Temp View, run time configuration setting

 **************Py Spark Multiple Sessions********************************

#Initialize first pyspark session as below

import pyspark
spark1=pyspark.sql.SparkSession.builder.master("local").appName("Single_app").getOrCreate()


#Tempory view created in one session cannot be accessed in another session, below is an example


spark2=spark1.newSession()
spark1.sql("select 1,2").createOrReplaceTempView("spark1_view")
spark1.catalog.listTables()





spark2.catalog.listTables()





#To access or have a common temp space, create a global temp view


spark1.sql("select 1,2").createGlobalTempView("spark1_global")
spark2.sql("select * from global_temp.spark1_global").show()









"""
Another thing , if we have different sessions sharing same context we can have different set of configurations among
those sessions as below
"""
spark1.conf.set("spark.sql.shuffle.partitions","100")
spark2.conf.set("spark.sql.shuffle.partitions","200")


#display the respective conf values


spark1.conf.get("spark.sql.shuffle.partitions")





spark2.conf.get("spark.sql.shuffle.partitions")







#stopping one session stops SparkContext and all associated sessions
spark1.stop()


************************Below is scala version***********************

he two most common uses cases are:

  • Keeping sessions with minor differences in configuration.

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions
    res0: Int = 200
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@618a9cb7
    scala> newSpark.conf.set("spark.sql.shuffle.partitions", 99)
    scala> newSpark.range(100).groupBy("id").count.rdd.getNumPartitions
    res2: Int = 99
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions  // No effect on initial session
    res3: Int = 200
    
  • Separating temporary namespaces:

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(1).createTempView("foo")
    scala> 
    scala> spark.catalog.tableExists("foo")
    res1: Boolean = true
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@73418044
    scala> newSpark.catalog.tableExists("foo")
    res2: Boolean = false
    scala> newSpark.range(100).createTempView("foo")  // No exception
    scala> spark.table("foo").count // No effect on inital session
    res4: Long = 1     

No comments:

Post a Comment