一 概述

Shuffle就是對資料進行重組,由於分散式計算的特性和要求,在實現細節上更加繁瑣和複雜

在MapReduce框架,Shuffle是連線Map和Reduce之間的橋樑,Map階段通過shuffle讀取資料並輸出到對應的Reduce;而Reduce階段負責從Map端拉取資料並進行計算。在整個shuffle過程中,往往伴隨著大量的磁碟和網路I/O。所以shuffle效能的高低也直接決定了整個程式的效能高低。Spark也會有自己的shuffle實現過程



在DAG排程的過程中,Stage階段的劃分是根據是否有shuffle過程,也就是存在ShuffleDependency寬依賴的時候,需要進行shuffle,這時候會將作業job劃分成多個Stage;並且在劃分Stage的時候,構建ShuffleDependency的時候進行shuffle註冊,獲取後續資料讀取所需要的ShuffleHandle,最終每一個job提交後都會生成一個ResultStage和若干個ShuffleMapStage,其中ResultStage表示生成作業的最終結果所在的Stage. ResultStage與ShuffleMapStage中的task分別對應著ResultTask與ShuffleMapTask。一個作業,除了最終的ResultStage外,其他若干ShuffleMapStage中各個ShuffleMapTask都需要將最終的資料根據相應的Partitioner對資料進行分組,然後持久化分割槽的資料。

一 HashShuffle機制

1.1 HashShuffle概述

在spark-1.6版本之前,一直使用HashShuffle,在spark-1.6版本之後使用Sort-Base Shuffle,因為HashShuffle存在的不足所以就替換了HashShuffle.

我們知道,Spark的執行主要分為2部分:一部分是驅動程式,其核心是SparkContext;另一部分是Worker節點上Task,它是執行實際任務的。程式執行的時候,Driver和Executor程序相互互動:執行什麼任務,即Driver會分配Task到Executor,Driver 跟 Executor 進行網路傳輸; 任務資料從哪兒獲取,即Task要從 Driver 抓取其他上游的 Task 的資料結果,所以有這個過程中就不斷的產生網路結果。其中,下一個 Stage 向上一個 Stage 要資料這個過程,我們就稱之為 Shuffle。

1.2 沒有優化之前的HashShuffle機制


在HashShuffle沒有優化之前,每一個ShufflleMapTask會為每一個ReduceTask建立一個bucket快取,並且會為每一個bucket建立一個檔案。這個bucket存放的資料就是經過Partitioner操作(預設是HashPartitioner)之後找到對應的bucket然後放進去,最後將資料

重新整理bucket快取的資料到磁碟上,即對應的block file.

然後ShuffleMapTask將輸出作為MapStatus傳送到DAGScheduler的MapOutputTrackerMaster,每一個MapStatus包含了每一個ResultTask要拉取的資料的位置和大小

ResultTask然後去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster獲取MapStatus,看哪一份資料是屬於自己的,然後底層通過BlockManager將資料拉取過來

拉取過來的資料會組成一個內部的ShuffleRDD,優先放入記憶體,記憶體不夠用則放入磁碟,然後ResulTask開始進行聚合,最後生成我們希望獲取的那個MapPartitionRDD

缺點:

如上圖所示:在這裡有1個worker,2個executor,每一個executor執行2個ShuffleMapTask,有三個ReduceTask,所以總共就有4 * 3=12個bucket和12個block file。

# 如果資料量較大,將會生成M*R個小檔案,比如ShuffleMapTask有100個,ResultTask有100個,這就會產生100*100=10000個小檔案

# bucket快取很重要,需要將ShuffleMapTask所有資料都寫入bucket,才會刷到磁碟,那麼如果Map端資料過多,這就很容易造成記憶體溢位,儘管後面有優化,bucket寫入的資料達到重新整理到磁碟的閥值之後,就會將資料一點一點的重新整理到磁碟,但是這樣磁碟I/O就多了

1.3 優化後的HashShuffle


每一個Executor程序根據核數,決定Task的併發數量,比如executor核數是2,就是可以併發執行兩個task,如果是一個則只能執行一個task

假設executor核數是1,ShuffleMapTask數量是M,那麼它依然會根據ResultTask的數量R,建立R個bucket快取,然後對key進行hash,資料進入不同的bucket中,每一個bucket對應著一個block file,用於重新整理bucket快取裡的資料

然後下一個task執行的時候,那麼不會再建立新的bucket和block file,而是複用之前的task已經建立好的bucket和block file。即所謂同一個Executor程序裡所有Task都會把相同的key放入相同的bucket緩衝區中

這樣的話,生成檔案的數量就是(本地worker的executor數量*executor的cores*ResultTask數量)如上圖所示,即2 * 1* 3 = 6個檔案,每一個Executor的shuffleMapTask數量100,ReduceTask數量為100,那麼

未優化的HashShuffle的檔案數是2 *1* 100*100 =20000,優化之後的數量是2*1*100 = 200檔案,相當於少了100倍

缺點:如果 Reducer 端的並行任務或者是資料分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小檔案。

二 Sort-Based Shuffle

2.1 Sort-Based Shuffle概述

HashShuffle回顧

HashShuffle寫資料的時候,記憶體有一個bucket緩衝區,同時在本地磁碟有對應的本地檔案,如果本地有檔案,那麼在記憶體應該也有檔案控制代碼也是需要耗費記憶體的。也就是說,從記憶體的角度考慮,即有一部分儲存資料,一部分管理檔案控制代碼。如果Mapper分片數量為1000,Reduce分片數量為1000,那麼總共就需要1000000個小檔案。所以就會有很多記憶體消耗,頻繁IO以及GC頻繁或者出現記憶體溢位。

而且Reducer端讀取Map端資料時,Mapper有這麼多小檔案,就需要開啟很多網路通道讀取,很容易造成Reducer(下一個stage)通過driver去拉取上一個stage資料的時候,說檔案找不到,其實不是檔案找不到而是程式不響應,因為正在GC.

2.2 Sorted-Based Shuffle介紹

為了緩解Shuffle過程產生檔案數過多和Writer快取開銷過大的問題,spark引入了類似於hadoop Map-Reduce的shuffle機制。該機制每一個ShuffleMapTask不會為後續的任務建立單獨的檔案,而是會將所有的Task結果寫入同一個檔案,並且對應生成一個索引檔案。以前的資料是放在記憶體快取中,等到資料完了再刷到磁碟,現在為了減少記憶體的使用,在記憶體不夠用的時候,可以將輸出溢寫到磁碟,結束的時候,再將這些不同的檔案聯合記憶體的資料一起進行歸併,從而減少記憶體的使用量。一方面檔案數量顯著減少,另一方面減少Writer快取所佔用的記憶體大小,而且同時避免GC的風險和頻率。


Sort-Based Shuffle有幾種不同的策略:BypassMergeSortShuffleWriter、SortShuffleWriter和UnasfeSortShuffleWriter。

對於BypassMergeSortShuffleWriter,使用這個模式特點:

# 主要用於處理不需要排序和聚合的Shuffle操作,所以資料是直接寫入檔案,資料量較大的時候,網路I/O和記憶體負擔較重

# 主要適合處理Reducer任務數量比較少的情況下

# 將每一個分割槽寫入一個單獨的檔案,最後將這些檔案合併,減少檔案數量;但是這種方式需要併發開啟多個檔案,對記憶體消耗比較大

因為BypassMergeSortShuffleWriter這種方式比SortShuffleWriter更快,所以如果在Reducer數量不大,又不需要在map端聚合和排序,而且

Reducer的數目 <  spark.shuffle.sort.bypassMergeThrshold指定的閥值,就是用的是這種方式。

對於SortShuffleWriter,使用這個模式特點:

# 比較適合資料量很大的場景或者叢集規模很大

# 引入了外部外部排序器,可以支援在Map端進行本地聚合或者不聚合

# 如果外部排序器enable了spill功能,如果記憶體不夠,可以先將輸出溢寫到本地磁碟,最後將記憶體結果和本地磁碟的溢寫檔案進行合併

對於UnsafeShuffleWriter由於需要謹慎使用,我們暫不做分析。

另外這個Sort-Based ShuffleExecutor核數沒有關係,即跟併發度沒有關係,它是每一個ShuffleMapTask都會產生一個data檔案和index檔案,所謂合併也只是將該ShuffleMapTask的各個partition對應的分割槽檔案合併到data檔案而已。所以這個就需要個Hash-BasedShuffleconsolidation機制區別開來。