Showing posts with label Pyspark. Show all posts
Showing posts with label Pyspark. Show all posts

Wednesday, October 18, 2023

pyspark code to get estimated size of dataframe in bytes

 from pyspark.sql import SparkSession

import sys
# Initialize a Spark session
spark = SparkSession.builder.appName("DataFrameSize").getOrCreate()

# Create a PySpark DataFrame
data = [(1, "John"), (2, "Alice"), (3, "Bob")]
columns = ["id", "name"]
df = spark.createDataFrame(data, columns)

# Get the size of the DataFrame in bytes
size_in_bytes = df.rdd.flatMap(lambda x: x).map(lambda x: sys.getsizeof(x) if x is not None else 0).sum()
print(f"Size of the DataFrame: {size_in_bytes} bytes")

# Stop the Spark session
spark.stop()

Friday, December 2, 2022

Spark cluster mode parameters

 We can divide these options into two categories.

The first category is data file, data files means spark only add the specified files into containers, no further commands will be executed. There are two options in this category:

  • --archives: with this option, you can submit archives, and spark will extract files in it for you, spark support ziptar ... formats.
  • --files: with this option, you can submit files, spark will put it in container, won't do any other things. sc.addFile is the programming api for this one.

The second category is code dependencies. In spark application, code dependency could be JVM dependency or python dependency for pyspark application.

  • --jars :this option is used to submit JVM dependency with Jar file, spark will add these Jars into CLASSPATH automatically, so your JVM can load them.

  • --py-files: this option is used to submit Python dependency, it can be .py.egg or .zip. spark will add these file into PYTHONPATH, so your python interpreter can find them.

    sc.addPyFile is the programming api for this one.

    PS: for single .py file, spark will add it into a __pyfiles__ folder, others will add into CWD.


All these four options can specified multiple files, splitted with "," and for each file, you can specified an alias through {URL}#{ALIAS} format. Don't specify alias in --py-files option, cause spark won't add alias into PYTHONPATH.


Example:

-- archives abc.zip#new_abc,cde.zip#new_cde

spark will extract abc.zip, cde.zip and creates new_abc, new_cde folders in container


Monday, April 19, 2021

Common commands to manage Linux background process

Linux command to run in background.

command: nohup sh run.sh &

Linux command to run multiple commands in sequence.

command: nohup sh run.sh 2021-01-05 && sh run.sh 2021-02-08 && sh run.sh 2021-03-09 &

Finding the back ground running process in linux and killing it.

command:  ps -ef | grep sh run.sh

get the pid from above command output and use it to kill it.

kill -9 92646










Pyspark Multiple sessions, Global Temp View, run time configuration setting

 **************Py Spark Multiple Sessions********************************

#Initialize first pyspark session as below

import pyspark
spark1=pyspark.sql.SparkSession.builder.master("local").appName("Single_app").getOrCreate()


#Tempory view created in one session cannot be accessed in another session, below is an example


spark2=spark1.newSession()
spark1.sql("select 1,2").createOrReplaceTempView("spark1_view")
spark1.catalog.listTables()





spark2.catalog.listTables()





#To access or have a common temp space, create a global temp view


spark1.sql("select 1,2").createGlobalTempView("spark1_global")
spark2.sql("select * from global_temp.spark1_global").show()









"""
Another thing , if we have different sessions sharing same context we can have different set of configurations among
those sessions as below
"""
spark1.conf.set("spark.sql.shuffle.partitions","100")
spark2.conf.set("spark.sql.shuffle.partitions","200")


#display the respective conf values


spark1.conf.get("spark.sql.shuffle.partitions")





spark2.conf.get("spark.sql.shuffle.partitions")







#stopping one session stops SparkContext and all associated sessions
spark1.stop()


************************Below is scala version***********************

he two most common uses cases are:

  • Keeping sessions with minor differences in configuration.

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions
    res0: Int = 200
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@618a9cb7
    scala> newSpark.conf.set("spark.sql.shuffle.partitions", 99)
    scala> newSpark.range(100).groupBy("id").count.rdd.getNumPartitions
    res2: Int = 99
    scala> spark.range(100).groupBy("id").count.rdd.getNumPartitions  // No effect on initial session
    res3: Int = 200
    
  • Separating temporary namespaces:

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
          /_/
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_141)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> spark.range(1).createTempView("foo")
    scala> 
    scala> spark.catalog.tableExists("foo")
    res1: Boolean = true
    scala> 
    scala> val newSpark = spark.newSession
    newSpark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@73418044
    scala> newSpark.catalog.tableExists("foo")
    res2: Boolean = false
    scala> newSpark.range(100).createTempView("foo")  // No exception
    scala> spark.table("foo").count // No effect on inital session
    res4: Long = 1     

Sunday, April 18, 2021

SCD TYPE 2 Implementation in Pyspark

 #Consider following data as existing data in target table.

dat= [[1, 50, "2019-02-01", "2019-02-02", 0],

  [1, 75, "2019-02-02", None,1],

  [2, 200, "2019-02-01", "2019-02-01", 0],

  [2, 60, "2019-02-01", "2019-02-01", 0],

  [2, 500, "2019-02-01", None, 1],

  [3, 175, "2019-02-01", None, 1],

  [4, 50, "2019-02-02", "2019-02-02", 0],

  [4, 300, "2019-02-02", None, 1],

  [5, 500, "2019-02-02", None, 1]]

  

header=["pk", "amount", "StartDate", "endDate", "active"]

df_existing_data=spark.createDataFrame(data=dat,schema=header)

#Create a table with SCD Type 2 and associated structure.

df_existing_data.write.format("parquet").mode("overwrite").partitionBy("active").saveAsTable("default.tab_amount")

df_existing_data.show()















#Extracting current Data from target table/scd type 2 table.

df_active_data=spark.sql("select * from default.tab_amount where active=1")

df_active_data.createOrReplaceTempView("ExisingActiveData")

df_active_data.show()









#Consider following as latest or new data arrived from source system.

dat= [

  [1, 75],

  [2, 500],

  [3, 200],

  [4, 350],

  [5, 500],

  [6, 800],

  [7, 1500]]

  

header=["pk", "amount"]

df_newdata=spark.createDataFrame(data=dat,schema=header)

df_newdata.createOrReplaceTempView("NewData")

df_newdata.show()












#Extracting new records to insert with start date  as today and end date as NULL.

df_NewRecords=spark.sql("select ND.*,current_Date() as startDate,NULL as endDate,1 as active from NewData ND left anti join ExisingActiveData EAD  on EAD.pk=ND.pk ")

df_NewRecords.show()









#Extracting updated active records from source table with start date as today and end date as NULL.

df_UpdatedActive=spark.sql("select ND.*,current_date() as startDate,cast(NULL as string) as endDate,1 as active from NewData ND inner join ExisingActiveData EAD on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedActive.createOrReplaceTempView("UpdatedActive")

df_UpdatedActive.show()








#Extracting existing active records where the values are not changed in target vs source comparison.

df_ExistingActive=spark.sql("select EAD.* from ExisingActiveData EAD left anti join UpdatedActive UA on EAD.pk=UA.pk")

df_ExistingActive.show()








#Extracting old updated records for updating its end date to yesterday.

df_UpdatedInactive=spark.sql("select EAD.pk,EAD.amount,startDate,date_sub(current_date(),1) as endDate,0 as active from ExisingActiveData EAD inner join NewData ND on EAD.pk=ND.pk where EAD.amount<>ND.amount")

df_UpdatedInactive.show()








#Configuring for auto insert and dynamic partitioning.


spark.conf.set("hive.exec.dynamic.partition", "true")

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

#Consolidated active records.

df_TotalActive=df_NewRecords.unionAll(df_UpdatedActive).unionAll(df_ExistingActive)


#Appending the updated records with end date as yesterday to active=0 partition

df_UpdatedInactive.write.mode("append").insertInto("default.tab_amount")

#Overriding the consolidated active records to active=1 partition

df_TotalActive.write.mode("overwrite").insertInto("default.tab_amount")


#Below is the over all data in scd type 2 dimension after data load.

spark.sql("select * from default.tab_amount order by pk,active asc").show()




Thursday, April 8, 2021

Spark Cache() Vs Persist()

The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY 

Using persist() we can use various storage levels to Store Persisted RDDs in Apache Spark. Let’s discuss each RDD storage level one by one-

a. MEMORY_ONLY

In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.

b. MEMORY_AND_DISK

In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.

c. MEMORY_ONLY_SER

This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.

d. MEMORY_AND_DISK_SER

It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.

e. DISK_ONLY

In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.

Thursday, January 28, 2021

Pyspark BroadCast Join

First of all spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold is disabled setting broadcast hint will take precedence. With default settings:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)

Spark will use autoBroadcastJoinThreshold and automatically broadcast data:

df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

When we disable auto broadcast Spark will use standard SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

but can forced to use BroadcastHashJoin with broadcast hint:

df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

SQL has its own hints format (similar to the one used in Hive):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)

So to answer your question - autoBroadcastJoinThreshold is applicable when working with Dataset API, but it is not relevant when using explicit broadcast hints.

Furthermore broadcasting large objects is unlikely provide any performance boost, and in practice will often degrade performance and result in stability issue. Remember that broadcasted object has to be first fetch to driver, then send to each worker, and finally loaded into memory.

Sunday, January 10, 2021

coalesce vs repartition

 coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

Catalyst optimizer, Tungsten optimizer

Spark uses two engines to optimize and run the queries - Catalyst and Tungsten, in that order. Catalyst basically generates an optimized physical query plan from the logical query plan by applying a series of transformations like predicate pushdown, column pruning, and constant folding on the logical plan. This optimized query plan is then used by Tungsten to generate optimized code, that resembles hand written code, by making use of Whole-stage Codegen functionality introduced in Spark 2.0. This functionality has improved Spark's efficiency by a huge margin from Spark 1.6, which used the traditional Volcano Iterator Model.

Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:

  • Easily add new optimization techniques and features to Spark SQL
  • Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.)

When you execute code, Spark SQL uses Catalyst's general tree transformation framework in four phases, as shown below:

No alt text provided for this image



Tungsten

The goal of Project Tungsten is to improve Spark execution by optimising Spark jobs for CPU and memory efficiency (as opposed to network and disk I/O which are considered fast enough). 

  1. Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly,
  2. Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates
  3. Whole-Stage Code Generation (aka CodeGen).

property: spark.sql.tungsten.enabled to true

All thanks to below article.

https://www.linkedin.com/pulse/catalyst-tungsten-apache-sparks-speeding-engine-deepak-rajak/?articleId=6674601890514378752 

Word Count Using flatMap and Map

df = sc.textFile("dbfs:/FileStore/test.txt")

# below is the text file content

"""

hadoop is fast

hive is sql on hdfs

spark is superfast

spark is awesome

"""

 fm=df.flatMap(lambda x: x.split(" ")).map(lambda x: (x,1)).groupByKey().mapValues(sum)

fm.take(20)


Out[8]: [('hadoop', 1), ('is', 4), ('hive', 1), ('hdfs', 1), ('awesome', 1), ('fast', 1), ('sql', 1), ('on', 1), ('spark', 2), ('superfast', 1)]



RDD Vs Dataframe Vs Dataset

 

What are RDDs?

RDDs or Resilient Distributed Datasets is the fundamental data structure of the Spark. It is the collection of objects which is capable of storing the data partitioned across the multiple nodes of the cluster and also allows them to do processing in parallel.


What are Dataframes?

It was introduced first in Spark version 1.3 to overcome the limitations of the Spark RDD. Spark Dataframes are the distributed collection of the data points, but here, the data is organized into the named columns. They allow developers to debug the code during the runtime which was not allowed with the RDDs.


What are Datasets?

Spark Datasets is an extension of Dataframes API with the benefits of both RDDs and the Datasets. It is fast as well as provides a type-safe interface. Type safety means that the compiler will validate the data types of all the columns in the dataset while compilation only and will throw an error if there is any mismatch in the data types.

We cannot create Spark Datasets in Python yet. The dataset API is available only in Scala and Java only


Below are details.

RDDs and Datasets are type safe means that compiler know the Columns and it's data type of the Column whether it is Long, String, etc....

But, In Dataframe, every time when you call an action, collect() for instance,then it will return the result as an Array of Rows not as Long, String data type. In dataframe, Columns have their own type such as integer, String but they are not exposed to you. To you, its any type. To convert the Row of data into it's suitable type you have to use .asInstanceOf method.

eg: In Scala:

scala > :type df.collect()
Array[org.apache.spark.sql.Row]


df.collect().map{ row => 
    val str = row(0).asInstanceOf[String]
    val num = row(1).asInstanceOf[Long]
}   

Reference: https://www.analyticsvidhya.com/blog/2020/11/what-is-the-difference-between-rdds-dataframes-and-datasets/