1. 程式人生 > >Spark程式設計指引(四)------------------共享變數(廣播變數和累加器)

Spark程式設計指引(四)------------------共享變數(廣播變數和累加器)

共享變數

通常情況下,當向Spark操作(如map,reduce)傳遞一個函式時,它會在一個遠端叢集節點上執行,它會使用函式中所有變數的副本。這些變數被複制到所有的機器上,遠端機器上並沒有被更新的變數會向驅動程式回傳。在任務之間使用通用的,支援讀寫的共享變數是低效的。儘管如此,Spark提供了兩種有限型別的共享變數,廣播變數和累加器。

廣播變數

廣播變數允許程式設計師將一個只讀的變數快取在每臺機器上,而不用在任務之間傳遞變數。廣播變數可被用於有效地給每個節點一個大輸入資料集的副本。Spark還嘗試使用高效地廣播演算法來分發變數,進而減少通訊的開銷。

Spark的動作通過一系列的步驟執行,這些步驟由分散式的洗牌操作分開。Spark自動地廣播每個步驟每個任務需要的通用資料。這些廣播資料被序列化地快取,在執行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的資料,或者以反序列化形式快取資料是十分重要的時候,顯式地建立廣播變數才有用。


通過在一個變數v上呼叫SparkContext.broadcast(v)可以建立廣播變數。廣播變數是圍繞著v的封裝,可以通過value方法訪問這個變數。舉例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在建立了廣播變數之後,在叢集上的所有函式中應該使用它來替代使用v.這樣v就不會不止一次地在節點之間傳輸了。另外,為了確保所有的節點獲得相同的變數,物件v在被廣播之後就不應該再修改。

累加器

累加器是僅僅被相關操作累加的變數,因此可以在並行中被有效地支援。它可以被用來實現計數器和總和。Spark原生地只支援數字型別的累加器,程式設計者可以新增新型別的支援。如果建立累加器時指定了名字,可以在Spark的UI介面看到。這有利於理解每個執行階段的程序。(對於python還不支援)

累加器通過對一個初始化了的變數v呼叫SparkContext.accumulator(v)來建立。在叢集上執行的任務可以通過add或者"+="方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程式能夠讀取它的值,通過累加器的value方法。

下面的程式碼展示瞭如何把一個數組中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

儘管上面的例子使用了內建支援的累加器型別Int,但是開發人員也可以通過繼承AccumulatorParam類來建立它們自己的累加器型別。AccumulatorParam介面有兩個方法:

zero方法為你的型別提供一個0值。

addInPlace方法將兩個值相加。

假設我們有一個代表數學vector的Vector類。我們可以向下面這樣實現:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在Scala裡,Spark提供更通用的累加介面來累加資料,儘管結果的型別和累加的資料型別可能不一致(例如,通過收集在一起的元素來建立一個列表)。同時,SparkContext..accumulableCollection方法來累加通用的Scala的集合型別。

累加器僅僅在動作操作內部被更新,Spark保證每個任務在累加器上的更新操作只被執行一次,也就是說,重啟任務也不會更新。在轉換操作中,使用者必須意識到每個任務對累加器的更新操作可能被不只一次執行,如果重新執行了任務和作業的階段。

累加器並沒有改變Spark的惰性求值模型。如果它們被RDD上的操作更新,它們的值只有當RDD因為動作操作被計算時才被更新。因此,當執行一個惰性的轉換操作,比如map時,不能保證對累加器值的更新被實際執行了。下面的程式碼片段演示了此特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在這裡,accum的值仍然是0,因為沒有動作操作引起map被實際的計算.