1. 程式人生 > >Spark的RDD連續轉換操作有時需要注意強行觸發action執行操作,否則(Tansformation)的惰性(lazy)機制會導致結果錯誤

Spark的RDD連續轉換操作有時需要注意強行觸發action執行操作,否則(Tansformation)的惰性(lazy)機制會導致結果錯誤

最近通過spark做一些資料處理,遇到一些詭異的現象

我開發了一個隨機生成海量資料點的程式,因為要保證這些點具有自增序號,不適合直接map分散式做(幾十億的資料,map計算需要分割槽(不主動分割槽估計也會自動分割槽,spark自帶的資料累加邏輯只能對單個partition分割槽內有效),需要在driver裡進行序號計算,所以就想通過陣列分批生成資料,轉換成RDD,在依次拼接(union)起來,就是下面的程式碼。

 val array = ArrayBuffer[(String,String)]()
 var i=0l
 var rdd:RDD[(String,String)] = sc.makeRDD(array)
 
 for(i<- 1l to size)
 {
        val name = "王".toString.concat((i % 1000).toString)
        array +=((i.toString, name))
        if(i%part_size == 0)
        {
            val rdd1 = sc.makeRDD(array)
            rdd1.cache
            val pre_rdd = rdd
            rdd= rdd.union(rdd1)
            rdd.cache()
            array.clear()
            rdd1.unpersist()
            pre_rdd.unpersist()
          }

    }
    if(array.length>0)
    {
      val rdd1 = sc.parallelize(array)
      rdd1.cache
      val pre_rdd = rdd
      rdd=rdd.union(rdd1)
      rdd.cache()
      pre_rdd.unpersist()
      rdd1.unpersist()
    }

好了,經驗豐富或者瞭解相關基礎知識的同學,知道上面程式碼有問題後,應該很快能看出問題在哪兒了,其他人是不是看著計算邏輯挺正常?

但如果我輸入size=8,part_size=5,就是輸出8個點,分批計算,每批算5個點。不管分幾批,對結果應該沒影響,最終結果就是

(1,xxxx)(2,xxxx)(3,xxxx)....(8,xxxx)共8個點

實際結果是:(6,xxxx)(7,xxxx)(8,xxxx)(6,xxxx)(7,xxxx)(8,xxxx)(6,xxxx)(7,xxxx)(8,xxxx)九個點,震驚了有沒有?

仔細分析,看得出, 是把第二批重複了三次,根據這個線索,按圖索驥查詢資料,發現這一切是spark的RDD惰性計算(lazy機制)的鍋。

因為spark的RDD操作分為兩種操作模式,轉換操作(transformation)和行動操作(action),具體可以參考以下兩篇文章,能得到一個清晰的初步瞭解:

總而言之,就是transformation操作主要是對每個RDD中的元素進行處理並生成新的RDD;而action則主要是對RDD進行最後的操作,比如遍歷、reduce、儲存到檔案等(雖然最終可能還是儲存到一個新的RDD上,但至少從設計上是具備輸出能力的),並可以返回結果給Driver程式。這樣因為transformation肯定不會輸出,spark就設定了惰性機制(lazy特性),當沒有出現action操作的時候,所有RDD轉換操作不會執行,程式會為其生成DAG,直到遇到action才觸發,這樣做的好處時有利於加強平行計算,減少中間結果。比如程式裡進行了大量的轉換操作,最後才reduce並輸出,前面轉換操作就可以生成DAG後,不同stage平行計算,甚至可能複用中間結果提高計算效率。這是為了spark的優化機制服務的。

所以,如果我們有如下程式碼:

val rdd1=makeRDD(...)
val rdd2=rdd1.map(...).filter(...)
val rdd3=makeRDD(...)
val rdd4= rdd1.join(rdd3)
val rdd5=rdd2.join(rdd4).groupByKey(...)
val rdd6=rdd5.reduceByKey(...)
val rdd7= rdd4.reduce(...)

只有執行到RDD6的時候,rdd1到rdd5才會被優化執行計算出來,而且是並行的,是依次得:

array->rdd1

 rdd1-->rdd2

array->rdd3

===rdd4===

array->rdd1--> 

                                  -->join -->rdd4

  array->rdd13-->

===rdd5===

array-->rdd1-->rdd2                          -->

                                                                       -->join-->groupByKey-->rdd5

 array->rdd1--> 

                                  -->join -->rdd4 -->

 array->rdd13-->

可以看得出,漴副計算特別多,中間RDD並未重複利用,這就是為什麼就算結果正確,在lazy計算中也要引入cache或者persisit來快取中間結果以便重複利用。

那麼回到我們剛開始提到的例子,當整個程式碼真正開始執行的時候,已經是函式返回rdd後在別的地方被呼叫的時候了,此時array裡只有{(6,xxxx)(7,xxxx)(8,xxxx)},這樣三次從array裡makeRDD出來的rdd自然都一樣,然後拼出了錯誤的結果。

要想結果正確,可以這麼改:

val array = ArrayBuffer[(String,String)]()
 var i=0l
 var rdd:RDD[(String,String)] = sc.makeRDD(array)
 
 for(i<- 1l to size)
 {
        val name = "王".toString.concat((i % 1000).toString)
        array +=((i.toString, name))
        if(i%part_size == 0)
        {
            val rdd1 = sc.makeRDD(array)
            rdd1.cache
            val pre_rdd = rdd
            rdd= rdd.union(rdd1)
            rdd.count //action操作觸發之前的RDD操作執行
            rdd.cache()
            array.clear()
            rdd1.unpersist()
            pre_rdd.unpersist()
          }

    }
    if(array.length>0)
    {
      val rdd1 = sc.parallelize(array)
      rdd1.cache
      val pre_rdd = rdd
      rdd=rdd.union(rdd1)
      rdd.count
      rdd.cache()
      pre_rdd.unpersist()
      rdd1.unpersist()
    }

雖然rdd.count從資料邏輯上看是毫無意義的,但對於spark轉換計算卻是一個不得已的辦法。

人為刻意去觸發操作執行的情況,在我們spark開發中需要額外注意,像這種RDD連續操作中因為同一資料來源變數(array)發生變化導致需要中間專門提前觸發操作執行的是一種典型場景,還有另一種場景就是RDD轉換操作中有隨機數生成的邏輯,參考下面的程式碼;

val rdd1 = makeRDD(array)
val rdd2 = rdd1.map{ t=>
      val  ran= ((new Random).nextLong().abs % len)
      (ran,t)}
val rdd3 = rdd2.groupByKey()

rdd2.collect().foreach(println)
rdd3.collect().foreach(println)

列印結果發現,rdd3和rdd2的結果匹配不上,為什麼呢?就因為當我們最後collect的時候,rdd2和rdd3分別按照自己的DAG規劃,平行計算,rdd3依賴rdd2,算rdd3的時候會把rdd2又計算一次,為什麼呢?因為rdd3的計算前全都是轉換操作,不會和driver發生資料互動,雖然rdd2也單獨算過一次了,但對於其他轉換操作來說也不會來複用,實際上RDD3計算的時候,程式碼相當於執行了:

val rdd3 = rdd1.map{ t=>
      val  ran= ((new Random).nextLong().abs % len)
      (ran,t)}.groupByKey().collect().foreach(println)

所以就相當於計算rdd3時 ,隨機數key重新生成了一次,自然和rdd2的輸出對不上了。

這裡面就得在rdd3的轉化前,或者說在具有隨機因子的轉化操作後(生成rdd2),馬上觸發一個執行操作,生成rdd2.並且cache起來,讓後面呼叫rdd2的語句直接從cache裡取 中間結果,避免重複計算:

val rdd1 = makeRDD(array)
val rdd2 = rdd1.map{ t=>
      val  ran= ((new Random).nextLong().abs % len)
      (ran,t)}
rdd2.count //action行動操作,觸發上面的語句執行,這一句在本段程式碼場景中不是必須的,
           //因為後面cache和collect共同發揮做作用,也起到了在執行rdd3操作時會取rdd2的cache
rdd2.cache //這個是至關重要的,包含隨機數的轉換計算,不僅要提前觸發,還要被快取起來才行
val rdd3 = rdd2.groupByKey()

rdd2.collect().foreach(println)
rdd3.collect().foreach(println)

注意裡面的註釋,解決這個問題,實際上用到了rdd轉換操作中的cache機制的作用,實際上cahce操作,也是在action操作觸發後才執行,所以說,是否及時觸發action操作還是很重要。

好了,如果看了本文,有助於你更直觀的理解RDD的tansfromation,action,cache,persisit的設計理念以及實踐中的價值或者用不好產生的問題,那我的文章就沒白寫,畢竟講這幾個概念的文章還是很多的。