1. 程式人生 > >Spark 廣播變數 TorrentBroadcast

Spark 廣播變數 TorrentBroadcast

1.需求的出現

  當我們在driver端排程spark作用的過程中,需要向各個節點發送任務“資料”--Rdd,一個般一個Rdd會對應多個任務,沒一個任務可以交給一個excutor執行,而一個excutor可以開啟多個執行緒去計算,那麼此時每個執行緒都要從Driver端獲取Rdd,那樣就會產生大量的副本,當需要向excutor傳遞大型變數的時候,就會產生大量的網路佔用,而且多次序列化,與反序列化都會佔用資源。

2.解決方案

  Spark採用了廣播變數的方案,解決了產生副本過多的問題,driver會將在任務執行過程中需要傳送的序列化變數物件進行切割,形成多個chunk,儲存在BlockkManager中,每個excutor一樣都會有一個blockManager,當excutor需要變數的時候首先會從自身的BlockManager中去尋找,如果沒有才去Driver或者其他執行器進行抓取,這樣就可以確保在一個excutor中只需要一份變數副本。也就減少了大量變數副本而產生的網路佔用了。

驗證例項:

一、不採用廣播變數

1)定義方法可以監控Spark任務執行端的資訊

def sendInfo(obj: Object, m: String, param: String) = {
val ip = java.net.InetAddress.getLocalHost.getHostAddress
val pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val tname = Thread.currentThread().getName
val classname 
= obj.getClass.getSimpleName val objHash = obj.hashCode() val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n" //傳送資料給nc 伺服器 val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream out.write(info.getBytes()) out.flush() out.close() }

2)首先建立一個可序列化的dog類,(entends Serializable)

scala> class dog extends Serializable

3)建立一個dog物件,並將其作為變數傳入spark作業任務中

//建立物件
scala
> val d = new dog d: dog = [email protected]
//建立rdd scala
> val rdd1 = sc.makeRDD(1 to 10 ,10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 val rdd2 = rdd1.map(e=>{sendInfo(d,"x","x");e}) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:31
//觸發Spark任務

 scala> rdd2.collect
 res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

4)監控結果:

192.168.48.101/4140/Executor task launch worker-0/[email protected]/x(x)

192.168.48.102/3355/Executor task launch worker-0/[email protected]/x(x)
192.168.48.102/3360/Executor task launch worker-0/[email protected]/x(x)
192.168.48.102/3355/Executor task launch worker-1/[email protected]/x(x)

192.168.48.104/3364/Executor task launch worker-0/[email protected]/x(x)
192.168.48.104/3357/Executor task launch worker-0/[email protected]/x(x)

192.168.48.103/3450/Executor task launch worker-0/[email protected]/x(x)
192.168.48.103/3445/Executor task launch worker-0/[email protected]/x(x)

192.168.48.103/3445/Executor task launch worker-1/[email protected]/x(x)
192.168.48.103/3450/Executor task launch worker-1/[email protected]/x(x)

  通過dog物件的地址可以看出,每個executor的每個執行緒都會建立(反序列化,從Driver端抓取(是否會從其他executor抓取還有待驗證))一個新的物件。

二、廣播變數的實現

1)定義方法可以監控Spark任務執行端的資訊

def sendInfo(obj: Object, m: String, param: String) = {
val ip = java.net.InetAddress.getLocalHost.getHostAddress
val pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val tname = Thread.currentThread().getName
val classname = obj.getClass.getSimpleName
val objHash = obj.hashCode() val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n" //傳送資料給nc 伺服器 val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream out.write(info.getBytes()) out.flush() out.close() }

2)首先建立一個可序列化的dog類,(entends Serializable)

scala> class dog extends Serializable

3)建立一個dog物件,並將其作為變數傳入spark作業任務中

//建立物件
scala> val d = new dog d: dog = [email protected]
//建立廣播變數
scala> val d1 = sc.broadcast(d)
//建立rdd scala> val rdd1 = sc.makeRDD(1 to 10 ,10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 val rdd2 = rdd1.map(e=>{sendInfo(d1.value,"x","x");e}) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:31
//觸發Spark任務

 scala> rdd2.collect
 res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

4)監控結果:(由於叢集設計的問題導致很難看出結論)

192.168.48.104/3559/Executor task launch worker-0/[email protected]/x(x)
192.168.48.104/3559/Executor task launch worker-1/[email protected]/x(x)
192.168.48.104/3562/Executor task launch worker-0/[email protected]/x(x)
192.168.48.103/3646/Executor task launch worker-0/[email protected]/x(x) 192.168.48.103/3655/Executor task launch worker-0/[email protected]/x(x)
192.168.48.102/3561/Executor task launch worker-0/[email protected]/x(x) 192.168.48.102/3561/Executor task launch worker-1/[email protected]/x(x) 192.168.48.102/3546/Executor task launch worker-1/[email protected]/x(x) 192.168.48.102/3546/Executor task launch worker-0/[email protected]/x(x)
192.168.48.101/5281/Executor task launch worker-0/[email protected]/x(x)

看到這個結果開始我是有點懵的,為什麼同一個節點的 worker-0和worker-1是同一個dog,但是同樣的worker-0卻是不同的dog。後來才想起來之前修改過配置檔案spark-env.sh。

在每個節點啟動了兩個executor。也就是說 兩個worker-0是屬於不同的executor的所以,是不同的dog。

5)通過修改啟動spark-shell時的引數配置,改變資源配置,實現一個節點只啟動一個executor,一個executor啟動多個執行緒。重複上述同樣步驟

spark-shell --master spark://s101:7077 --executor-cores 4 --total-executor-cores 40

 

6)重複上述同樣步驟,得到如下結果

192.168.48.104/4293/Executor task launch worker-2/[email protected]/x(x)
192.168.48.104/4293/Executor task launch worker-0/[email protected]/x(x)
192.168.48.104/4293/Executor task launch worker-1/[email protected]/x(x)

192.168.48.102/4305/Executor task launch worker-1/[email protected]/x(x)
192.168.48.102/4305/Executor task launch worker-0/[email protected]/x(x)

192.168.48.103/4374/Executor task launch worker-0/[email protected]/x(x)
192.168.48.103/4374/Executor task launch worker-1/[email protected]/x(x)
192.168.48.103/4374/Executor task launch worker-2/[email protected]/x(x)

192.168.48.101/7610/Executor task launch worker-1/[email protected]/x(x)
192.168.48.101/7610/Executor task launch worker-2/[email protected]/x(x)

可以看出一個executor多個執行緒共享一個dog物件。