1. 程式人生 > >Spark-core-問題記錄:join shuffle

Spark-core-問題記錄:join shuffle

1、partitionBy:當hashCode為負時,拋異常:java.lang.ArrayIndexOutOfBoundsException

        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)

2、rdd.partitionBy(new TdidPartitioner(10)).mapPartitionsWithIndex {

  case

(idx, value) =>

    Iterator.single(idx, value)

}

        A。返回值為Iterator中的value為Iterator,拋異常:NotSerializableException

                - object not serializable (class: org.apache.spark.InterruptibleIterator, value: non-empty iterator)

        B。把value改為Iterator.single(idx, value.toSet)

3、join於zipPartitions區別

        tmpRdd.join(bloomFilterRdd)、tmpRdd.zipPartitions(bloomFilterRdd)

的區別:

        A。join會先計算左邊的rdd,然後計算右邊的rdd,根據key join

        B。zipPartitions,根據key,一個一個進行匹配

4、persist、以及導致shuffle增長的原因(streaming)

        val tmpRDD = Analysis.analyserJoin(tdidRDD, filterRDD).persist()

        //filterRDD.unpersist()

        filterRDD = tmpRDD

        //filterRDD.persist()

        filterRDD.count()    *** tmpRDD.count() ***

        tmpRDD.unpersist()

        問題描述:

                迴圈使用filterRDD,tdidRDD每次都是不同的資料

        A。在呼叫action操作之前,先呼叫unpersist又呼叫persist,運算元會先執行上次操作的計算,因為unpersist把之前的計算釋放,因為所有的計算在呼叫action操作時,才會真正的計算,unpersist則會把RDD標記為不需要persist,並且釋放block塊

        B。如果是tmpRDD.count(),也會執行上次filterRDD的計算,因為count操作只是執行了tmpRDD之前的操作,下次用到filterRDD時,需要計算filterRDD的結果

        C。每執行一次action操作,都會重新計算一遍,除非使用persisit方法

 

        val tmpRDD = Analysis.analyserZip(tdidRDD, filterRDD)

        filterRDD.count()

        filterRDD.unpersist()

        filterRDD = tmpRDD

        *** 使用這種方式,不會導致shuffle增長 ***

 

5、join操作、co-partitioin

        A。按相同的paritioner分割槽後,如果有mapPartitionsWithIndex、mapPartitions 操作,

                需要設定preservesPartitioning這個引數,

                預設值:preservesPartitioning: Boolean = false

                override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

        B。按相同的paritioner分割槽後,如果map、flatMap、filter 操作,partitioner被置為None

        C。streaming例項驗證:

                1、當為no-partitioner時,shuffle每批資料都會增長,並且,序列化和反序列化後,shuffle並不下降,

                      即使重啟系統後,重新讀區序列化後的資料,會在上次shuffleWrite的基礎上,

                  增加shufflefilter RDD shuffleWrite + tdidRDD shuffleWrite全shuffle

                  結論:BloomFilter的shuffleWrite和shuffleRead的增長,其大小跟put的資料有關,put的資料越大(重複率越小),增量越大

                2、當為no-partitioner時,其中一個RDD有partitioner,另一個partitioner為None,shuffle的量為partitioner為None的RDD的資料量

                3、當為co-partitioner時,partition相同,shuffle每批增長的量為上批tdidRDD的資料量,當序列化反序列後,shuffle下降為20M左右

                4、當為co-partitioner時,partition不同,shuffle的量為partition少的RDDshuffle

        D。單個join,a.join(b),shuffle跟a、b在join左右兩側的位置無關

        E。zipPartition,即使沒有相同的partitioner,也沒有shuffle,計算的時候是多個RDD的這個partition算完,再算下一個partition

6、localcheckpoint

        A。會打斷RDD的依賴關係,並且把當前的RDD儲存為一個新的RDD

        B。當有塊丟失時,會導致系統異常,退出