1. 程式人生 > >第42課: Spark Broadcast內幕解密:Broadcast執行機制徹底解密、Broadcast原始碼解析、Broadcast最佳實踐

第42課: Spark Broadcast內幕解密:Broadcast執行機制徹底解密、Broadcast原始碼解析、Broadcast最佳實踐

第42課:  Spark Broadcast內幕解密:Broadcast執行機制徹底解密、Broadcast原始碼解析、Broadcast最佳實踐

Broadcast在機器學習、圖計算、構建日常的各種演算法中到處可見。 Broadcast就是將資料從一個節點發送到其它的節點上;例如Driver上有一張表,而Executor中的每個並行執行的Task(100萬個Task)都要查詢這張表,那我們通過Broadcast的方式就只需要往每個Executor把這張表傳送一次就行了,Executor中的每個執行的Task查詢這張唯一的表,而不是每次執行的時候都從Driver獲得這張表!

JAVA中的Servlet裡面有個ServletContext,是JSP或Java程式碼執行時的上下文,通過上下文可以獲取各種資源。Broadcast類似於ServletContext中的資源、變數或資料,Broadcast廣播出去是基於Executor的,裡面的每個任務可以用上下文,Task的上下文就是Executor,可以抓取資料。這就好像ServletContext的具體作用,只是Broadcast是分散式的共享資料,預設情況下只要程式在執行Broadcast變數就會存在,因為Broadcast在底層是通過BlockManager管理的!但是你可以手動指定或者配置具體週期來銷燬Broadcast變數!可以指定Broadcast的unpersist銷燬Broadcast變數,因為Spark應用程式中可能執行很多job,可能一個job需要很多Broadcast變數,但下一個job不需要這些變數,但是應用程式還存在,因此需手工銷燬Broadcast變數。

Broadcast一般用於處理共享配置檔案、通用的Dataset、常用的資料結構等等;但是不適合存放太大的資料在Broadcast,Broadcast不會記憶體溢位,因為其資料的儲存的StorageLevel是MEMORY_AND_DISK的方式;雖然如此,我們也不可以放入太大的資料在Broadcast中,因為網路IO和可能的單點壓力會非常大!(Spark 1.6版本Broadcast有兩種方式: HttpBroadcast, TorrentBroadcast。  HttpBroadcast可能有單點壓力;TorrentBroadcast下載沒有單點壓力但可能有網路壓力。)但在Spark 2.0版本中已經去掉HTTPBroadcast(SPARK-12588),Spark 2.0版本中TorrentBroadcast是Broadcast唯一的廣播實現方式。

廣播Broadcast變數是隻讀變數,如果Broadcast不是隻讀變數而可以更新,那帶來的問題:1,一個節點上Broadcast可以更新,其它的節點Broadcast也要更新2,如果多個的節點Broadcast同時更新,如何確定更新的順序,以及容錯等內容。因此廣播Broadcast變數是隻讀變數,最為輕鬆保持了資料的一致性!

Broadcast 廣播變數是隻讀變數,快取在每一個節點上,而不是每個Task去獲取它的一份複製副本。例如,以高效的方式給每個節點發送一個dataset的副本。Spark嘗試在分散式傳送廣播變數時使用高效的廣播演算法減少通訊的成本。

廣播變數是由一個變數“V”通過呼叫[[org.apache.spark.SparkContext#broadcast]]建立的。廣播變數是一個圍繞“V”的包裝器,它的值可以通過呼叫 `value`方法來獲取。例如:

1.           scala> val broadcastVar =sc.broadcast(Array(1, 2, 3))

2.           broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

3.          

4.           scala> broadcastVar.value

5.           res0: Array[Int] = Array(1, 2, 3)

如果要更新廣播變數,只有再廣播一次,那就是一個新的廣播變數,使用一個新的廣播變數ID。

廣播變數建立後,在群集上執行的時候“V”變數不是在任何函式都使用,以便“V”傳送到節點時不止一次。此外,物件“V”不應該被修改,是為了確保廣播所有節點得到相同的廣播變數值(例如,如果變數被髮送到後來的一個新節點)。

Broadcast的原始碼:

1.          @param id 廣播變數的唯一識別符號。

2.          @tparam T   廣播變數的資料型別。

3.          abstract class Broadcast[T: ClassTag](val id:Long) extends Serializable with Logging {

4.          

5.           @volatile private var _isValid = true

6.          

7.           private var _destroySite = ""

8.          

9.           /** Get the broadcasted value. */

10.        def value: T = {

11.          assertValid()

12.          getValue()

13.        }

14.      ......

Spark 1.6版本中的HttpBroadcast方式的Broadcast,最開始的時候資料放在Driver的本地檔案系統中,Driver在本地會建立一個資料夾來存放Broadcast中的data,然後啟動HttpServer來訪問資料夾中的資料,同時寫入到BlockManager(StorageLevel是MEMORY_AND_DISK)中獲得BlockId(BroadcastBlockId),當第一次Eexcutor中的Task要訪問Broadcast變數的時候,會向Driver通過HttpServer來訪問資料,然後會在Executor中的BlockManager中註冊該Broadcast中的資料BlockManager,這樣後需要的Task需要訪問Broadcast的變數的時候會首先查詢BlockManager中有沒有該資料,如果有就直接使用;(說明SPARK-12588,HTTPBroadcast方式在Spark 2.0版本中已經去掉。)。

BroadcastManager是用來管理Broadcast,該例項物件是在SparkContext建立SparkEnv的時候建立的:

SparkEnv.scala原始碼:

1.            val broadcastManager = newBroadcastManager(isDriver, conf, securityManager)

2.          

3.             val mapOutputTracker = if (isDriver) {

4.               new MapOutputTrackerMaster(conf,broadcastManager, isLocal)

5.             } else {

6.               new MapOutputTrackerWorker(conf)

7.             }

BroadcastManager.scala中BroadcastManager例項化的時候會呼叫initialize()方法,initialize()方法就建立TorrentBroadcastFactory的方式。

BroadcastManager原始碼如下:

1.          

2.         private[spark] classBroadcastManager(

3.             val isDriver: Boolean,

4.             conf: SparkConf,

5.             securityManager: SecurityManager)

6.           extends Logging {

7.          

8.           private var initialized = false

9.           private var broadcastFactory:BroadcastFactory = null

10.       

11.        initialize()

12.       

13.        // Called by SparkContext or Executor beforeusing Broadcast

14.        private def initialize() {

15.          synchronized {

16.            if (!initialized) {

17.              broadcastFactory = newTorrentBroadcastFactory

18.              broadcastFactory.initialize(isDriver,conf, securityManager)

19.              initialized = true

20.            }

21.          }

22.        }

Spark 2.0 版本中TorrentBroadcast方式:資料開始在Driver中,A節點如果使用了資料,A就成為了供應源,這個時候Driver節點、A節點二個節點成為了供應源,如第三個節點B訪問的時候,第三個節點B也成為了供應源,同樣的,第四個節點、第五個節點。。。。等都成為了供應源,這些都被BlockManager管理,這樣不會導致一個節點壓力太大,從理論上將,資料使用的節點越多,網路速度就越快。

TorrentBroadcast按照BLOCK_SIZE(預設是4MB)將Broadcast中的資料劃分成為不同的Block,然後將分塊資訊也就是Meta資訊存放到Driver的 BlockManager中,同時會告訴BlockManagerMaster說明Meta資訊存放完畢。

         看一下SparkContext.scala的broadcast方法:

1.                def broadcast[T: ClassTag](value: T):Broadcast[T] = {

2.             assertNotStopped()

3.             require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),

4.               "Can not directly broadcast RDDs;instead, call collect() and broadcast the result.")

5.             val bc =env.broadcastManager.newBroadcast[T](value, isLocal)

6.             val callSite = getCallSite

7.             logInfo("Created broadcast " +bc.id + " from " + callSite.shortForm)

8.             cleaner.foreach(_.registerBroadcastForCleanup(bc))

9.             bc

10.        }     

SparkContext.scala的broadcast方法中呼叫env.broadcastManager.newBroadcast,BroadcastManager.scala的newBroadcast方法如下:

1.           def newBroadcast[T: ClassTag](value_ : T,isLocal: Boolean): Broadcast[T] = {

2.             broadcastFactory.newBroadcast[T](value_,isLocal, nextBroadcastId.getAndIncrement())

3.           }   

newBroadcast方法new出來一個Broadcast,第一個引數是Value,第三個引數是BroadcastId,這裡BroadcastFactory是一個trait,沒有具體的實現。

1.           private[spark] trait BroadcastFactory {

2.         ......

3.         def newBroadcast[T:ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T]

4.         ...

TorrentBroadcastFactory是 BroadcastFactory的具體實現:

1.         private[spark] classTorrentBroadcastFactory extends BroadcastFactory {

2.         ……   

3.         override def newBroadcast[T:ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {

4.             new TorrentBroadcast[T](value_, id)

5.           }

BroadcastFactory的newBroadcast方法建立TorrentBroadcast例項:

1.          private[spark] class TorrentBroadcast[T:ClassTag](obj: T, id: Long)

2.           extends Broadcast[T](id) with Logging withSerializable {

3.         ……

4.         private def readBlocks():Array[ChunkedByteBuffer] = {

5.             // Fetch chunks of data. Note that allthese chunks are stored in the BlockManager and reported

6.             // to the driver, so other executors canpull these chunks from this executor as well.

7.             val blocks = newArray[ChunkedByteBuffer](numBlocks)

8.             val bm = SparkEnv.get.blockManager

9.          

10.          for (pid <- Random.shuffle(Seq.range(0,numBlocks))) {

11.            val pieceId = BroadcastBlockId(id,"piece" + pid)

12.            logDebug(s"Reading piece $pieceId of$broadcastId")

13.            // First try getLocalBytes because thereis a chance that previous attempts to fetch the

14.            // broadcast blocks have already fetchedsome of the blocks. In that case, some blocks

15.            // would be available locally (on thisexecutor).

16.            bm.getLocalBytes(pieceId) match {

17.              case Some(block) =>

18.                blocks(pid) = block

19.                releaseLock(pieceId)

20.              case None =>

21.                bm.getRemoteBytes(pieceId) match {

22.                  case Some(b) =>

23.                    if (checksumEnabled) {

24.                      val sum =calcChecksum(b.chunks(0))

25.                      if (sum != checksums(pid)) {

26.                        throw newSparkException(s"corrupt remote block $pieceId of $broadcastId:" +

27.                          s" $sum !=${checksums(pid)}")

28.                      }

29.                    }

30.                    // We found the block from remoteexecutors/driver's BlockManager, so put the block

31.                    // in this executor'sBlockManager.

32.                    if (!bm.putBytes(pieceId, b,StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {

33.                      throw new SparkException(

34.                        s"Failed to store $pieceIdof $broadcastId in local BlockManager")

35.                    }

36.                    blocks(pid) = b

37.                  case None =>

38.                    throw newSparkException(s"Failed to get $pieceId of $broadcastId")

39.                }

40.            }

41.          }

42.          blocks

43.        }

TorrentBroadcast.scala的readBlocks方法中Random.shuffle(Seq.range(0,numBlocks)))進行隨機洗牌,是因為資料有很多來源DataServer,為了保持負載均衡,因此使用shuffle。

TorrentBroadcast按照BLOCK_SIZE(預設是4MB)將Broadcast中的資料劃分成為不同的Block,然後將分塊資訊也就是meta資訊存放到Driver的 BlockManager中,StorageLevel是MEMORY_AND_DISK的方式,同時會告訴Driver中的BlockManagerMaster說明Meta資訊存放完畢。資料存放到BlockManagerMaster中就變成了全域性資料,BlockManagerMaster具有所有的資訊,Driver、Executor就可以訪問這些內容。Executor執行具體的TASK的時候,通過TorrentBroadcast的方式readBlocks,如果本地有資料就從本地讀取,如果本地沒有資料,就從遠端讀取資料。Executor讀取資訊以後,通過TorrentBroadcast的機制通知BlockManagerMaster資料多了一份副本,下一個Task讀取資料的時候,就有2個選擇,分享的節點越多,下載的供應源就越多,最終變成點到點的方式。

Broadcast可以廣播RDD,join操作效能優化之一也是採用Broadcast。


相關推薦

42 Spark Broadcast內幕解密Broadcast執行機制徹底解密Broadcast原始碼解析Broadcast最佳實踐

第42課:  Spark Broadcast內幕解密:Broadcast執行機制徹底解密、Broadcast原始碼解析、Broadcast最佳實踐Broadcast在機器學習、圖計算、構建日常的各種演算法中到處可見。 Broadcast就是將資料從一個節點發送到其它的節點上;

大資料IMF傳奇行動絕密課程42Checkpoint內幕解密

Broadcast內幕解密 1、Broadcast徹底解析 2、Broadcast原始碼徹底詳解 3、Broadcast最佳實踐 一、Broadcast徹底解析 1、Broadcast就是將資料從一個節點發送到其它的節點上。例如Driver上有一張表,

124Spark Streaming效能優化通過Spark Streaming進行裝置日誌監控報警及效能優化

通過Spark Streaming進行裝置日誌監控報警及效能優化 1、Spark Streaming進行裝置監控及報警 2、Spark Streaming進行裝置監控效能優化 ELK Stack:一整套開源的日誌處理平臺解決方案,可以集日誌的採集、檢索、視

spark性能調優(二) 徹底解密spark的Hash Shuffle

弱點 sta 出了 寫到 三方 很大的 完成 map 重新 裝載:http://www.cnblogs.com/jcchoiling/p/6431969.html 引言 Spark HashShuffle 是它以前的版本,現在1.6x 版本默應是 Sort-Based Sh

2018-3-1512周4次 Nginx防盜鏈訪問控制配置PHP解析代理

Nginx12.13 Nginx防盜鏈[root@localhost test.com]# vim /usr/local/nginx/conf/vhost/test.com.conf~* 表示不區分大小寫白名單 *.test.com,如果不是白名單,則返回403[root@localhost test.com

ONE筆記DirectDlivery路由演算法(ONE模擬執行機制解析

DirectDlivery路由要點 DirectDlivery(DD)演算法是基礎路由演算法,是基於flooding的一種極端情況,每個節點只將訊息直接傳遞給目的節點。通過該演算法模擬,旨在梳理ONE模擬執行機制。 特點:開銷低,效率低,節點跳數為1 工作機制:

Apache Spark MLlib學習筆記(六)MLlib決策樹類演算法原始碼解析 2

上篇說道建立分類決策樹模型呼叫了trainClassifier方法,這章分析trainClassifier方法相關內容 按照以下路徑開啟原始碼檔案: /home/yangqiao/codes/spark/mllib/src/main/scala/org/ap

Redis系列(五)資料結構List雙向連結串列中基本操作操作命令和原始碼解析

1.介紹 Redis中List是通過ListNode構造的雙向連結串列。 特點: 1.雙端:獲取某個結點的前驅和後繼結點都是O(1) 2.無環:表頭的prev指標和表尾的next指標都指向NULL,對連結串列的訪問都是以NULL為終點 3.帶表頭指標和表尾指標:獲取表頭和表尾的複雜度都是O(1) 4.帶連結串

大資料IMF傳奇行動絕密課程87Flume推送資料到Spark Streaming案例實戰和內幕原始碼解密

Flume推送資料到Spark Streaming案例實戰和內幕原始碼解密 1、Flume on HDFS案例回顧 2、Flume推送資料到Spark Streaming實戰 3、原理繪圖剖析 一、配置.bashrc vi ~/.bashrc

大資料IMF傳奇行動絕密課程63Spark SQL下Parquet內幕深度解密

Spark SQL下Parquet內幕深度解密 1、Spark SQL下的Parquet意義再思考 2、Spark SQL下的Parquet內幕揭祕 一、Spark SQL下的Parquet意義再思考 1、如果說HDFS是大資料時代分散式檔案系統儲存的事

15RDD建立內幕徹底解密

內容: 1.RDD建立的幾個方式 2.RDD建立實戰 3.RDD內幕   第一個RDD:代表了星火應用程式輸入資料的來源 通過轉型來對RDD進行各種運算元的轉換實現演算法 RDD的3種基本的建立方式 1,使用程式中的集合建立RDD; 2,使用本地檔案系統建立RDD;

7實戰解析spark執行原理和rdd解密

1.spark執行優勢 善於使用記憶體,磁碟,迭代式計算是其核心 2.現在為什麼很多公司都是使用java開發spark a.scala高手較少,java高手較多 b.專案對接比較容易 c.系統運維方便 3.spark只能取代hive的儲存引擎,不能取代hive的數倉部分 4.資料輸

72Spark SQL UDF和UDAF解密與實戰

內容:     1.SparkSQL UDF     2.SparkSQL UDAF 一、SparkSQL UDF和SparkSQL UDAF     1.解決SparkSQL內建函式不足問題,自定義內建函式,     2.UDF:User Define Functio

71Spark SQL視窗函式解密與實戰

內容:     1.SparkSQL視窗函式解析     2.SparkSQL視窗函式實戰 一、SparkSQL視窗函式解析     1.spark支援兩種方式使用視窗函式:  &nb

70Spark SQL內建函式解密與實戰

內容:     1.SparkSQL內建函式解析     2.SparkSQL內建函式實戰 一、SparkSQL內建函式解析     使用Spark SQL中的內建函式對資料進行分析,Spark

91SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密    /* * *王家林老師授課http://weibo.com/ilovepains */  每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS

14spark RDD解密學習筆記

第14課:spark RDD解密學習筆記 本期內容: 1.RDD:基於工作集的應用抽象 2.RDD內幕解密 3.RDD思考 精通了RDD,學習Spark的時間大大縮短。解決問題能力大大提高, 徹底把精力聚集在RDD的理解上,SparkStreaming、SparkSQL、

Spark定製班1通過案例對Spark Streaming透徹理解三板斧之一解密Spark Streaming另類實驗及Spark Streaming本質解析

package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seco

87Flume推送資料到SparkStreaming案例實戰和內幕原始碼解密--flume安裝篇

1、  下載flume 老師提供的包 2、  安裝 vi/etc/profile exportFLUME_HOME=/usr/local/apache-flume-1.6.0-bin exportPATH=.:$PATH:$JAVA_HOME/bin:$HADOOP

Spark定製班29深入理解Spark 2.x中的Structured Streaming內幕

本期內容: 1. 新型的Spark Streaming思維 2. Structured Streaming內幕 Spark 2.0 仍有bug,不適合於生成環境。只用於測試。 Spark 2.X提出了continuous application(連續的應用程式)的概念,非