1. 程式人生 > >Spark中的Spark Shuffle詳解(多看幾遍)

Spark中的Spark Shuffle詳解(多看幾遍)

Shuffle簡介

Shuffle描述著資料從map task輸出到reduce task輸入的這段過程。shuffle是連線Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的效能高低直接影響了整個程式的效能和吞吐量。因為在分散式情況下,reduce task需要跨節點去拉取其它節點上的map task結果。這一過程將會產生網路資源消耗和記憶體,磁碟IO的消耗。通常shuffle分為兩部分:Map階段的資料準備和Reduce階段的資料拷貝處理。一般將在map端的Shuffle稱之為Shuffle Write,在Reduce端的Shuffle稱之為Shuffle Read.

Hadoop MapReduce Shuffle

Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有著諸多類似,一些概念可直接套用,例如,Shuffle 過程中,提供資料的一端,被稱作 Map 端,Map 端每個生成資料的任務稱為 Mapper,對應的,接收資料的一端,被稱作 Reduce 端,Reduce 端每個拉取資料的任務稱為 Reducer,Shuffle 過程本質上都是將 Map 端獲得的資料使用分割槽器進行劃分,並將資料傳送給對應的 Reducer 的過程。

\

map端的Shuffle簡述:

1)input, 根據split輸入資料,執行map任務;

2)patition, 每個map task都有一個記憶體緩衝區,儲存著map的輸出結果;

3)spill, 當緩衝區快滿的時候需要將緩衝區的資料以臨時檔案的方式存放到磁碟;

4)merge, 當整個map task結束後再對磁碟中這個map task產生的所有臨時檔案做合併,生成最終的正式輸出檔案,然後等待reduce task來拉資料。

reduce 端的Shuffle簡述:

reduce task在執行之前的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge,也最終形成一個檔案作為reduce task的輸入檔案。

1) Copy過程,拉取資料。

2)Merge階段,合併拉取來的小檔案

3)Reducer計算

4)Output輸出計算結果

我們可以將Shuffle的過程以資料流的方式呈現:

\

圖形象的描述了MR資料流動的整個過程:

圖解釋:

map端,有4個map;Reduce端,有3個reduce。4個map 也就是4個JVM,每個JVM處理一個數據分片(split1~split4),每個map產生一個map輸出檔案,但是每個map都為後面的reduce產生了3部分資料(分別用紅1、綠2、藍3標識),也就是說每個輸出的map檔案都包含了3部分資料。正如前面第二節所述:

mapper執行後,通過Partitioner介面,根據key或value及reduce的數量來決定當前map的輸出資料最終應該交由哪個reduce task處理.Reduce端一共有3個reduce,去前面的4個map的輸出結果中抓取屬於自己的資料。

關於Hadoop MR的Shuffle的詳細請檢視部落格:“戲”說hadoop--hadoop MapReduce Shuffle過程詳解

Spark Shuffle

\

在“戲”說Spark-Spark核心-Stage劃分及Pipline的計算模式一文中,我們知道stage中是高效快速的pipline的計算模式,寬依賴之間會劃分stage,而Stage之間就是Shuffle,如圖中的stage0,stage1和stage3之間就會產生Shuffle。

在Spark的中,負責shuffle過程的執行、計算和處理的元件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發展有兩種實現的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種

Spark Shuffle發展史

在Spark的版本的發展,ShuffleManager在不斷迭代,變得越來越先進。

在Spark 1.2以前,預設的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個非常嚴重的弊端,就是會產生大量的中間磁碟檔案,進而由大量的磁碟IO操作影響了效能。因此在Spark 1.2以後的版本中,預設的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁碟檔案,但是最後會將所有的臨時檔案合併(merge)成一個磁碟檔案,因此每個Task就只有一個磁碟檔案。在下一個stage的shuffle read task拉取自己的資料時,只要根據索引讀取每個磁碟檔案中的部分資料即可。

Hash shuffle

HashShuffleManager的執行機制主要分成兩種,一種是普通執行機制,另一種是合併的執行機制。

合併機制主要是通過複用buffer來優化Shuffle過程中產生的小檔案的數量。Hash shuffle是不具有排序的Shuffle。

普通機制的Hash shuffle

\

圖解:

這裡我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task執行緒,同一時間都只能執行一個task執行緒。

圖中有3個 Reducer,從Task 開始那邊各自把自己進行 Hash 計算(分割槽器:hash/numreduce取模),分類出3個不同的類別,每個 Task 都分成3種類別的資料,想把不同的資料匯聚然後計算出最終的結果,所以Reducer 會在每個 Task 中把屬於自己類別的資料收集過來,匯聚成一個同類別的大集合,每1個 Task 輸出3份本地檔案,這裡有4個 Mapper Tasks,所以總共輸出了4個 Tasks x 3個分類檔案 = 12個本地小檔案。

1:shuffle write階段

主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的運算元(比如reduceByKey,groupByKey),而將每個task處理的資料按key進行“分割槽”。所謂“分割槽”,就是對相同的key執行hash演算法,從而將相同key都寫入同一個磁碟檔案中,而每一個磁碟檔案都只屬於reduce端的stage的一個task。在將資料寫入磁碟之前,會先將資料寫入記憶體緩衝中,當記憶體緩衝填滿之後,才會溢寫到磁碟檔案中去。

那麼每個執行shuffle write的task,要為下一個stage建立多少個磁碟檔案呢?很簡單,下一個stage的task有多少個,當前stage的每個task就要建立多少份磁碟檔案。比如下一個stage總共有100個task,那麼當前stage的每個task都要建立100份磁碟檔案。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那麼每個Executor上總共就要建立500個磁碟檔案,所有Executor上會建立5000個磁碟檔案。由此可見,未經優化的shuffle write操作所產生的磁碟檔案的數量是極其驚人的。

2:shuffle read階段

shuffle read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網路都拉取到自己所在的節點上,然後進行key的聚合或連線等操作。由於shuffle write的過程中,task給Reduce端的stage的每個task都建立了一個磁碟檔案,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁碟檔案即可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的資料,然後通過記憶體中的一個Map進行聚合等操作。聚合完一批資料後,再拉取下一批資料,並放到buffer緩衝中進行聚合操作。以此類推,直到最後將所有資料到拉取完,並得到最終的結果。

注意:

1).buffer起到的是快取作用,快取能夠加速寫磁碟,提高計算的效率,buffer的預設大小32k。

分割槽器:根據hash/numRedcue取模決定資料由幾個Reduce處理,也決定了寫入幾個buffer中

block file:磁碟小檔案,從圖中我們可以知道磁碟小檔案的個數計算公式:

block file=M*R

2).M為map task的數量,R為Reduce的數量,一般Reduce的數量等於buffer的數量,都是由分割槽器決定的

Hash shuffle普通機制的問題

1).Shuffle前在磁碟上會產生海量的小檔案,建立通訊和拉取資料的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小檔案)

2).可能導致OOM,大量耗時低效的 IO 操作 ,導致寫磁碟時的物件過多,讀磁碟時候的物件也過多,這些物件儲存在堆記憶體中,會導致堆記憶體不足,相應會導致頻繁的GC,GC會導致OOM。由於記憶體中需要儲存海量檔案操作控制代碼和臨時資訊,如果資料處理的規模比較龐大的話,記憶體不可承受,會出現 OOM 等問題。

合併機制的Hash shuffle

合併機制就是複用buffer,開啟合併機制的配置是spark.shuffle.consolidateFiles。該引數預設值為false,將其設定為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。

\

這裡還是有4個Tasks,資料類別還是分成3種類型,因為Hash演算法會根據你的 Key 進行分類,在同一個程序中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡,然後把Buffer中的資料寫入以Core數量為單位的本地檔案中,(一個Core只有一種型別的Key的資料),每1個Task所在的程序中,分別寫入共同程序中的3份本地檔案,這裡有4個Mapper Tasks,所以總共輸出是 2個Cores x 3個分類檔案 = 6個本地小檔案。

圖解:

開啟consolidate機制之後,在shuffle write過程中,task就不是為下游stage的每個task建立一個磁碟檔案了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁碟檔案,磁碟檔案的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會建立一個shuffleFileGroup,並將資料寫入對應的磁碟檔案內。

Executor的CPU core執行完一批task,接著執行下一批task時,下一批task就會複用之前已有的shuffleFileGroup,包括其中的磁碟檔案。也就是說,此時task會將資料寫入已有的磁碟檔案中,而不會寫入新的磁碟檔案中。因此,consolidate機制允許不同的task複用同一批磁碟檔案,這樣就可以有效將多個task的磁碟檔案進行一定程度上的合併,從而大幅度減少磁碟檔案的數量,進而提升shuffle write的效能。

假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor,每個Executor執行5個task。那麼原本使用未經優化的HashShuffleManager時,每個Executor會產生500個磁碟檔案,所有Executor會產生5000個磁碟檔案的。但是此時經過優化之後,每個Executor建立的磁碟檔案的數量的計算公式為:CPU core的數量 * 下一個stage的task數量。也就是說,每個Executor此時只會建立100個磁碟檔案,所有Executor只會建立1000個磁碟檔案。

注意:

1).啟動HashShuffle的合併機制ConsolidatedShuffle的配置:

spark.shuffle.consolidateFiles=true

2).block file=Core*R

Core為CPU的核數,R為Reduce的數量

Hash shuffle合併機制的問題

如果 Reducer 端的並行任務或者是資料分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小檔案。

Sort shuffle

SortShuffleManager的執行機制主要分成兩種,一種是普通執行機制,另一種是bypass執行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold引數的值時(預設為200),就會啟用bypass機制。

Sort shuffle的普通機制

圖解:

\

寫入記憶體資料結構

該圖說明了普通的SortShuffleManager的原理。在該模式下,資料會先寫入一個記憶體資料結構中(預設5M),此時根據不同的shuffle運算元,可能選用不同的資料結構。如果是reduceByKey這種聚合類的shuffle運算元,那麼會選用Map資料結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle運算元,那麼會選用Array資料結構,直接寫入記憶體。接著,每寫一條資料進入記憶體資料結構之後,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那麼就會嘗試將記憶體資料結構中的資料溢寫到磁碟,然後清空記憶體資料結構。

注意:

shuffle中的定時器:定時器會檢查記憶體資料結構的大小,如果記憶體資料結構空間不夠,那麼會申請額外的記憶體,申請的大小滿足如下公式:

applyMemory=nowMenory*2-oldMemory

申請的記憶體=當前的記憶體情況*2-上一次的內嵌情況

意思就是說記憶體資料結構的大小的動態變化,如果儲存的資料超出記憶體資料結構的大小,將申請記憶體資料結構儲存的資料*2-記憶體資料結構的設定值的記憶體大小空間。申請到了,記憶體資料結構的大小變大,記憶體不夠,申請不到,則發生溢寫

排序

在溢寫到磁碟檔案之前,會先根據key對記憶體資料結構中已有的資料進行排序。

溢寫

排序過後,會分批將資料寫入磁碟檔案。預設的batch數量是10000條,也就是說,排序好的資料,會以每批1萬條資料的形式分批寫入磁碟檔案。寫入磁碟檔案是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩衝輸出流,首先會將資料緩衝在記憶體中,當記憶體緩衝滿溢之後再一次寫入磁碟檔案中,這樣可以減少磁碟IO次數,提升效能。

merge

一個task將所有資料寫入記憶體資料結構的過程中,會發生多次磁碟溢寫操作,也就會產生多個臨時檔案。最後會將之前所有的臨時磁碟檔案都進行合併,這就是merge過程,此時會將之前所有臨時磁碟檔案中的資料讀取出來,然後依次寫入最終的磁碟檔案之中。此外,由於一個task就只對應一個磁碟檔案,也就意味著該task為Reduce端的stage的task準備的資料都在這一個檔案中,因此還會單獨寫一份索引檔案,其中標識了下游各個task的資料在檔案中的start offset與end offset。

SortShuffleManager由於有一個磁碟檔案merge的過程,因此大大減少了檔案數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由於每個task最終只有一個磁碟檔案,因此此時每個Executor上只有5個磁碟檔案,所有Executor只有50個磁碟檔案。

注意:

1)block file= 2M

一個map task會產生一個索引檔案和一個數據大檔案

2) m*r>2m(r>2):SortShuffle會使得磁碟小檔案的個數再次的減少

Sort shuffle的bypass機制

\

bypass執行機制的觸發條件如下:

1)shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值。

2)不是聚合類的shuffle運算元(比如reduceByKey)。

此時task會為每個reduce端的task都建立一個臨時磁碟檔案,並將資料按key進行hash然後根據key的hash值,將key寫入對應的磁碟檔案之中。當然,寫入磁碟檔案時也是先寫入記憶體緩衝,緩衝寫滿之後再溢寫到磁碟檔案的。最後,同樣會將所有臨時磁碟檔案都合併成一個磁碟檔案,並建立一個單獨的索引檔案。

該過程的磁碟寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁碟檔案,只是在最後會做一個磁碟檔案的合併而已。因此少量的最終磁碟檔案,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的效能會更好。

而該機制與普通SortShuffleManager執行機制的不同在於:

第一,磁碟寫機制不同;

第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的效能開銷。

總結:

Shuffle 過程本質上都是將 Map 端獲得的資料使用分割槽器進行劃分,並將資料傳送給對應的 Reducer 的過程。

shuffle作為處理連線map端和reduce端的樞紐,其shuffle的效能高低直接影響了整個程式的效能和吞吐量。map端的shuffle一般為shuffle的Write階段,reduce端的shuffle一般為shuffle的read階段。Hadoop和spark的shuffle在實現上面存在很大的不同,spark的shuffle分為兩種實現,分別為HashShuffle和SortShuffle,

HashShuffle又分為普通機制和合並機制,普通機制因為其會產生M*R個數的巨量磁碟小檔案而產生大量效能低下的Io操作,從而效能較低,因為其巨量的磁碟小檔案還可能導致OOM,HashShuffle的合併機制通過重複利用buffer從而將磁碟小檔案的數量降低到Core*R個,但是當Reducer 端的並行任務或者是資料分片過多的時候,依然會產生大量的磁碟小檔案。

SortShuffle也分為普通機制和bypass機制,普通機制在記憶體資料結構(預設為5M)完成排序,會產生2M個磁碟小檔案。而當shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值。或者運算元不是聚合類的shuffle運算元(比如reduceByKey)的時候會觸發SortShuffle的bypass機制,SortShuffle的bypass機制不會進行排序,極大的提高了其效能

在Spark 1.2以前,預設的shuffle計算引擎是HashShuffleManager,因為HashShuffleManager會產生大量的磁碟小檔案而效能低下,在Spark 1.2以後的版本中,預設的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁碟檔案,但是最後會將所有的臨時檔案合併(merge)成一個磁碟檔案,因此每個Task就只有一個磁碟檔案。在下一個stage的shuffle read task拉取自己的資料時,只要根據索引讀取每個磁碟檔案中的部分資料即可。