Spark2.10中使用累加器、注意點以及實現自定義累加器
阿新 • • 發佈:2018-11-03
累加器(accumulator)是Spark中提供的一種分散式的變數機制,其原理類似於mapreduce,即分散式的改變,然後聚合這些改變。累加器的一個常見用途是在除錯時對作業執行過程中的事件進行計數。
累加器簡單使用
Spark內建的提供了Long和Double型別的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數的同時進行計數,最後計算剩下整數的和。
-
val sparkConf =
new SparkConf().setAppName(
"Test").setMaster(
"local[2]"
- val sc = new SparkContext(sparkConf)
- val accum = sc.longAccumulator( "longAccum") //統計奇數的個數
-
val sum = sc.parallelize(Array(
1,
2,
3,
4,
5,
6
- if(n% 2!= 0) accum.add( 1L)
- n% 2== 0
- }).reduce(_+_)
-
println(
"sum: "
- println( "accum: "+accum.value)
- sc.stop()
結果為:
sum: 20
accum: 5
這是結果正常的情況,但是在使用累加器的過程中如果對於spark的執行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。
少加的情況:
對於如下程式碼:
- val accum = sc.longAccumulator( "longAccum")
- val numberRDD = sc.parallelize(Array( 1, 2, 3, 4, 5, 6, 7, 8, 9), 2).map(n=>{
- accum.add( 1L)
- n+ 1
- })
- println( "accum: "+accum.value)
執行完畢,列印的值是多少呢?答案是0,因為累加器不會改變spark的lazy的計算模型,即在列印的時候像map這樣的transformation還沒有真正的執行,從而累加器的值也就不會更新。
多加的情況:
對於如下程式碼:
- val accum = sc.longAccumulator( "longAccum")
- val numberRDD = sc.parallelize(Array( 1, 2, 3, 4, 5, 6, 7, 8, 9), 2).map(n=>{
- accum.add( 1L)
- n+ 1
- })
- numberRDD. count
- println ("accum1:"+accum.value)
- numberRDD. reduce (_+_)
- println ("accum2: "+accum.value)
結果我們得到了:
accum1:9
accum2: 18
我們雖然只在map裡進行了累加器加1的操作,但是兩次得到的累加器的值卻不一樣,這是由於count和reduce都是action型別的操作,觸發了兩次作業的提交,所以map運算元實際上被執行了了兩次,在reduce操作提交作業後累加器又完成了一輪計數,所以最終累加器的值為18。究其原因是因為count雖然促使numberRDD被計出來,但是由於沒有對其進行快取,所以下次再次需要使用numberRDD這個資料集是,還需要從並行化資料集的部分開始執行計算。解釋到這裡,這個問題的解決方法也就很清楚了,就是在count之前呼叫numberRDD的cache方法(或persist),這樣在count後資料集就會被快取下來,reduce操作就會讀取快取的資料集而無需從頭開始計算了。改成如下程式碼即可:
- val accum = sc.longAccumulator( "longAccum")
- val numberRDD = sc.parallelize(Array( 1, 2, 3, 4, 5, 6, 7, 8, 9), 2).map(n=>{
- accum.add( 1L)
- n+ 1
- })
- numberRDD.cache(). count
- println ("accum1:"+accum.value)
- numberRDD. reduce (_+_)
- println ("accum2: "+accum.value)
這次兩次列印的值就會保持一致了。
自定義累加器
自定義累加器型別的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義型別累加器的實現方式。官方同時給出了一個實現的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應用執行過程中的一些資訊。例如,我們可以用這個類收集Spark處理資料時的一些細節,當然,由於累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的資訊的規模要加以控制,不宜過大。 實現自定義型別累加器需要繼承AccumulatorV2並至少覆寫下例中出現的方法,下面這個累加器可以用於在程式執行過程中收集一些文字類資訊,最終以Set[String]的形式返回。
- import java.util
- import org.apache.spark.util.AccumulatorV2
- class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
- private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
- override def isZero: Boolean = {
- _logArray.isEmpty
- }
- override def reset(): Unit = {
- _logArray.clear()
- }
- override def add(v: String): Unit = {
- _logArray.add(v)
- }
- override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
- other match {
- case o: LogAccumulator => _logArray.addAll(o.value)
- }
- }
- override def value: java.util.Set[String] = {
- java.util.Collections.unmodifiableSet(_logArray)
- }
- override def copy(): AccumulatorV2[String, util.Set[String]] = {
- val newAcc = new LogAccumulator()
- _logArray. synchronized{
- newAcc._logArray.addAll(_logArray)
- }
- newAcc
- }
- }
測試類:
- import scala.collection.JavaConversions._
- import org.apache.spark.{SparkConf, SparkContext}
- object Main {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName( "Test").setMaster( "local[2]")
- val sc = new SparkContext(sparkConf)
- val accum = new LogAccumulator
- sc.register(accum, "logAccum")
- val sum = sc.parallelize(Array( "1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
- val pattern = "" "^-?(\d+)" ""
- val flag = line.matches(pattern)
- if (!flag) {
- accum.add(line)
- }
- flag
- }).map(_.toInt).reduce(_ + _)
- println( "sum: " + sum)
- for (v <- accum.value) print(v + " ")
- println()
- sc.stop()
- }
- }
本例中利用自定義的收集器收集過濾操作中被過濾掉的元素,當然這部分的元素的資料量不能太大。執行結果如下: sum; 32
7cd 4b 2a
來源:https://blog.csdn.net/u013468917/article/details/70617085