Spark Shuffle簡介

Shuffle就是對資料進行重組,由於分散式計算的特性和要求,在實現細節上更加繁瑣和複雜

在MapReduce框架,Shuffle是連線Map和Reduce之間的橋樑,Map階段通過shuffle讀取資料並輸出到對應的Reduce;而Reduce階段負責從Map端拉取資料並進行計算。在整個shuffle過程中,往往伴隨著大量的磁碟和網路I/O。所以shuffle效能的高低也直接決定了整個程式的效能高低。Spark也會有自己的shuffle實現過程

HashShuffle

什麼是HashShuffle?
每一個task的計算結果根據key的hashcode與reduce task的個數取模決定寫入到哪一個分割槽檔案,這樣就能保證相同的資料一定是落入到某一個分割槽檔案中。
在這裡插入圖片描述

shuffle可能面臨的問題?

  • 小檔案過多,耗時低效的IO操作
  • 記憶體溢位,讀寫檔案以及快取過多

磁碟小檔案的個數 = map task num × reduce task num

磁碟小檔案過多帶來什麼問題?

  • write階段建立大量的寫檔案的物件
  • read階段就要進行多次網路通訊
  • read階段建立大量的讀檔案的物件

優化後的HashShuffleManager
在這裡插入圖片描述
使用HashShuffle並且開啟合併機制,shuffle過程中磁碟小檔案個數為 cores × reduce task num

SortShuffle

該機制每一個MapTask不會為後續的任務建立單獨的檔案,而是會將所有的Task結果寫入同一個檔案,並且對應生成一個索引檔案。以前的資料是放在記憶體快取中,等到資料完了再刷到磁碟,現在為了減少記憶體的使用,在記憶體不夠用的時候,可以將輸出溢寫到磁碟,結束的時候,再將這些不同的檔案聯合記憶體的資料一起進行歸併,從而減少記憶體的使用量。

SortShuffle的執行機制主要分成兩種:

  • 普通執行機制
  • bypass執行機制
SortShuffleManager普通執行機制

比較適合資料量很大的場景或者叢集規模很大
在這裡插入圖片描述

SortShuffleManager bypass執行機制

主要用於處理Reducer任務數量比較少或不需要排序和聚合的Shuffle操作,資料是直接寫入檔案,資料量較大的時候,網路I/O和記憶體負擔較重。
在這裡插入圖片描述
bypass執行機制的觸發條件如下:
shuffle reduce task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值。