1. 程式人生 > >Spark——效能調優——Shuffle

Spark——效能調優——Shuffle

一、序引
    當以分散式方式處理資料時,常常需要執行map與reduce轉換。由於巨量資料必須從一個節點傳輸到另外的節點,給叢集中的cpu、磁碟、記憶體造成沉重的負載壓力,同時也會給網路頻寬帶來壓力。所以,reduce階段進行的shuffle過程,往往是效能的瓶頸所在。
    shuffle過程涉及資料排序、重分割槽、網路傳輸時的序列化與反序列化,為了減少I/O頻寬及磁碟I/O操作,還要對資料進行壓縮。故而,shuffle的效能將直接spark的整體運算效能。
    因為shuffle檔案數(M(Map數) * R(Reduce數))將會非常大。故而,這是效能損失的關鍵所在。折衷方案:壓縮輸出檔案。spark預設的是Snappy,但也支援選擇lz4、lzf。
    reduce階段,記憶體問題異常突出。對於一個reducer而言,被shuffle的所有資料都必須放到記憶體中。若記憶體不夠,則會因記憶體溢處而導致job失敗。同時,這也是為何reducer數量如此重要的原因。reducer增加時,理應及時減少每個reducer對應的資料量。
    Hadoop與Spark的對比:hadoop中,map與reduce階段存在交疊,mapper把輸出資料推送到reducer;spark中,只有map階段結束,reduce階段才啟動工作,reducer將拉取shuffle後的資料。
    在Spark中,引入了shuffle file consolidation機制,儘量輸出少一點但大一些的檔案。map階段為每個分割槽輸出一個shuffle檔案。shuffle檔案數是每個核的reducer數,而不是每個mapper的reducer數。原有執行在相同CPU核上的map任務都將輸出相同的shuffle檔案,每個reducer都有一個檔案。要啟用shuffle檔案合併,必須把spark.shuffle.consolidateFile=true。
二、Shuffle與資料分割槽
    join、reduceByKey、groupByKey、cogroup等轉換(transformation)操作需要在叢集中shuffle資料。這些操作可能需要對整個資料集進行shuffle、排序及重新分割槽,十分消耗效能。“預分割槽”(pre-partition)的方案,可以有效提高效能。
    如果RDD已分割槽,就能避免資料shuffle。
    對於涉及兩個或更多RDD的轉換操作,資料分割槽更為重要。需要join操作的RDD越多,要處理的資料就越多,這些資料是shuffle的重要內容。
    為進一步改善join轉換的效能,你可以用相同的分割槽器對兩個RDD進行分割槽。如此,不需要通過網路進行shuffle。
    總之,shuffle操作異常的消耗效能,當編寫Spark應用時,應當減少Shuffle的次數及在叢集中傳輸資料。由於避免了跨節點傳輸資料引起的CPU、磁碟與網路I/O壓力,分割槽得當的RDD能有效提升應用的效能。
三、運算元與shuffle
    正確的時機選擇正確的運算元(operator),可有效避免讓大量資料分佈到叢集各個worker節點。
    初學者傾向於用轉換來完成job,而不會思考它們背後觸發的操作。這是一個極為重要的認識誤區。
    1、groupByKey vs reduceByKey
        groupByKey,特定鍵(Key)的所有值必須在一個任務中進行處理。為達到這個目的,整個資料集被shuffle,特定鍵的所有單詞對被髮送到一個節點。因此,會消耗大量的時間。與此同時,可能還有“一個大資料集,一個鍵有很多值,一個任務可能會用完所有記憶體”的爆炸性問題。
        reduceByKey運算元的函式會被應用到單臺機器上一個鍵的全部值,處理後得到的中間結果隨後會在叢集中傳送。
        groupByKey: {
{(A,1),(A,1),(A,1),(B,1)},
{(A,1),(A,1),(B,1),(B,1)},
{(A,1),(B,1),(B,1)}
} →→→ {
{{(A,1),(A,1),(A,1),(A,1),(A,1),(A,1)}→(A,6)}
{{(B,1),(B,1),(B,1),(B,1),(B,1)}→(B,5)}
}
reduceByKey: {
{{(A,1),(A,1),(A,1),(B,1)}→{(A,3),(B,1)}}
{{(A,1),(A,1),(B,1),(B,1)}→{(A,2),(B,2)}},
{{(A,1),(B,1),(B,1)}→{(A,1),(B,2)}}
} →→→ {
{{(A,1),(A,1),(A,1)}→(A,6)}
{{(B,1),(B,1),(B,1)}→(B,5)}
}


        從流程圖中不難看出,reduceByKey代替groupByKey來解決聚合問題,顯著減少了需要壓縮及shuffle的資料,效能有了巨大的提升空間。
    2、repartition vs coalesce
        兩種方法,均可以實現“需要改變RDD分割槽數來改變並行度”的情形。
        重分割槽運算元會隨機rshuffle資料,並將其分發到許多分割槽中。可能比RDD的原始分割槽數多,亦可能少。
        coalesce運算元會得到同樣的結果,但是減少RDD的分割槽數能避免shuffle。而coalesce並不是總能避免shuffle。如果大幅減少分割槽數,將它設定成比節點數還少,那麼剩餘節點上的資料將被送到包含這些分割槽的其它節點上。在減少分割槽數時,由於僅對一部分資料而不是對整個資料集進行shuffle,因而coalesce比repartition執行的效果更好。
    3、reduceByKey vs aggregateByKey
        為了避免reduceByKey方案中的記憶體分配問題,可用aggregateByKey代替。
        必須為aggregateByKey提供三個引數:1)、零值(zero),即聚合的初始值。2)、函式f:(U,V),把值V合併到資料結構U,該函式在分割槽內合併值時被呼叫。3)、函式g:(U,U),合併兩個資料結構U。在分割槽間合併值時被呼叫。
四、Shuffle並不總是壞事
    總的來說,避免shuffle是正確選擇,但shuffle並不總是壞事。
    第一,shuffle是spark在叢集裡重新組織資料的一種方式,因而是必不可少。
    第二,有時,shuffle能節省應用執行的時間。