Spark Broadcast
Broadcast Variables(廣播變數)
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
(廣播變數容許程式在每臺機器上儲存一份只讀變數快取,而不是為每一個任務傳送一個副本。他們可以被使用,例如以一種高效的方式給每一個節點拷貝一份大的資料集。spark 試圖公國分散式廣播變數來有效低減少通訊開銷。)
(以上直意,意思是這樣。就是spark執行任務的時候會用到共享變數。一般方式是為每個task拷貝一份共享變數。那麼問題來了,如果一個機器上有上百個task那麼就要拷貝上百次,延遲不說,對記憶體溢位造成隱患。那麼使用廣播變數後,所有任務公用一個只讀變數。有點類似於readonly,那麼只需要傳輸一次,且記憶體保留一個副本,大大提高效率。 )
那麼廣播變數這麼有用,大家鐵定多用,不知道有木有遇到空指正bug的。
spark Broadcast 空指正異常:(看下面程式碼 查查他有沒有毛病)
package com.migu.dpi
import java.util
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
object Test2 {
//程式碼純屬為了引出問題 不要較真
var data: Broadcast[java.util.ArrayList[String]] =null
def formatFlag(sc: SparkContext, datas: util.ArrayList[String]):Unit = {
data = sc.broadcast(datas)
}
def main(args: Array[String]):Unit = {
var datas: util.ArrayList[String] =new util.ArrayList[String]()
datas.add("mmp")
var conf =new SparkConf().setAppName(Test2.getClass.getName).setMaster("local[2]")
var sc =new SparkContext(conf)
formatFlag(sc, datas)
//...... args(0) is a path
sc.textFile(args(0)).map(x => {
x +data.value.get(0)
}).foreach(println)
}
}
如果沒有那麼這樣呢,提交到叢集呢?
package com.migu.dpi
import java.util
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
object Test2 {
//程式碼純屬為了引出問題 不要較真
var data: Broadcast[java.util.ArrayList[String]] =null
def formatFlag(sc: SparkContext, datas: util.ArrayList[String]):Unit = {
data = sc.broadcast(datas)
}
def main(args: Array[String]):Unit = {
var datas: util.ArrayList[String] =new util.ArrayList[String]()
datas.add("mmp")
var conf =new SparkConf().setAppName(Test2.getClass.getName)
var sc =new SparkContext(conf)
formatFlag(sc, datas)
//...... args(0) is a path
sc.textFile(args(0)).map(x => {
x +data.value.get(0)
}).foreach(println)
}
}
這時候你會發現,第二種情況,包data廣播變數空指正異常。但是你會想,不是格式化過了嗎?是的,我但是也是這麼想的。
但是 我們 來看看spark提交任務是怎麼幹的。
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
(意譯:spark行為是通過一個stage集執行來完成的。stage是被分散式shuffle動作分割的。spark自動將廣播變數廣播到每一個需要的節點上。廣播變數廣播是在每一個執行任務之前完成的。下面沒用不譯了 )
spark 對任務分發是通過 預讀碼 然後 更加編譯中程式碼中行為動作和轉換動作來做shuffle切分的。切分好後把各個任務傳送到各個節點,在任務執行前,把廣播變數發過去。具體誰前誰後,我猜是廣播變數在前,因為task很多,有些廣播變數貫穿整個stage,所以我也就不讀原始碼直接猜了。
那麼問題來了,程式碼沒執行你的廣播變數格式化個毛啊。所以除非本地執行,否則其他機器全空指正異常。
造成這個現象的根本原因是——全域性變數的濫用 。所以自我檢討一下,能用區域性變數別用全域性,除非區域性變數造成記憶體溢位,為了防止OOM。其他別為了省幾個變數的空間而使用全域性變數。