1. 程式人生 > >map/reduce之間的shuffle,partition,combiner過程的詳解

map/reduce之間的shuffle,partition,combiner過程的詳解

用戶 這也 阻止 每一個 ner bsp job ack 網絡資源

  Shuffle的本意是洗牌、混亂的意思,類似於java中的Collections.shuffle(List)方法,它會隨機地打亂參數list裏的元素順序。MapReduce中的Shuffle過程。所謂Shuffle過程可以大致的理解成:怎樣把map task的輸出結果有效地傳送到reduce輸入端。也可以這樣理解, Shuffle描述著數據從map task輸出到reduce task輸入的這段過程。

技術分享圖片

  上圖表示的是Shuffle的整個過程。在Hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去讀取其它節點上的map task結果,並存儲到本地。如果集群正在運行的job有很多,那麽task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。另外在節點內,相比於內存,磁盤IO對job完成時間的影響也是比較大的,spark 就是基於這點對hadoop做出了改進,將map和reduce的所有任務都在內存中進行,並且中間接過都保存在內存中,從而比hadoop的速度要快100倍以上。從最基本的要求來說,我們對Shuffle過程希望做到:上圖表示的是Shuffle的整個過程。在Hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去讀取其它節點上的map task結果,並存儲到本地。如果集群正在運行的job有很多,那麽task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。另外在節點內,相比於內存,磁盤IO對job完成時間的影響也是比較大的,spark 就是基於這點對hadoop做出了改進,將map和reduce的所有任務都在內存中進行,並且中間接過都保存在內存中,從而比hadoop的速度要快100倍以上。從最基本的要求來說,我們對Shuffle過程希望做到:
-完整地從map task端讀取數據到reduce 端。
-在跨節點讀取數據時,盡可能地減少對帶寬的不必要消耗。
-減少磁盤IO對task執行的影響。
Shuffle前半段過程主要包括:

1、split過程

2、partition過程:partition是分割map每個節點的結果,按照key分別映射給不同的reduce,也是可以自定義的。

3、溢寫過程

4、Merge過程

每個map task都有一個內存緩沖區,存儲著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的所有臨時文件做合並,生成最終的正式輸出文件,然後等待reduce task來讀取數據。 下面可以將Shuffle過程主要分為四個步驟:(結合WordCount的例子來進行說明)

1、split過程:在map task執行時,它的輸入數據來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關系可能是多對一,默認是一對一。在WordCount例子裏,假設map的輸入數據都是像“aaa”這樣的字符串。

2、partiton過程:在經過mapper的運行後,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數值1。因為當前map端只做加1的操作,在reduce task裏才去合並結果集。前面我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce去做呢,這個主要有partition來決定。下面就說明如何決定由哪個reduce去做這個事情。
MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。默認是對key hash後再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以自己重新實現partition的接口並設置到job上即可。
在我們的例子中,“aaa”經過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。接下來,需要將數據寫入內存緩沖區中,緩沖區的作用是批量收集map結果,減少磁盤IO的影響。我們的key/value對以及Partition的結果都會被寫入緩沖區。當然寫入之前,key與value值都會被序列化成字節數組。

3、溢寫過程:這個內存緩沖區是有大小限制的,默認是100MB,也可以通過設置配置文件中的參數mapreduce.task.io.sort.mb來設置。當map task的輸出結果很多時,就可能會撐爆內存,所以需要在一定條件下將緩沖區中的數據臨時寫入磁盤,然後重新利用這塊緩沖區。這個從內存往磁盤寫數據的過程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,所以整個緩沖區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩沖區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB內存中寫,互不影響。
當溢寫線程啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這裏的排序也是對序列化的字節做的排序。
在這裏我們可以想想,因為map task的輸出是需要發送到不同的reduce端去,而內存緩沖區沒有對將發送到相同reduce端的數據做合並,那麽這種合並應該是體現是磁盤文件中的。從官方圖上也可以看到寫到磁盤中的溢寫文件是對不同的reduce端的數值做過合並。所以溢寫過程一個很重要的細節在於,如果有很多個key/value對需要發送到某個reduce端去,那麽需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。
在針對每個reduce端而合並數據時,有些數據可能像這樣:“aaa”/1, “aaa”/1。對於WordCount例子,就是簡單地統計單詞出現的次數,如果在同一個map task的結果中有很多個像“aaa”一樣出現多次的key,我們就應該把它們的值合並到一塊,這個過程叫reduce也叫combine。但MapReduce的術語中,reduce只指reduce端執行從多個map task取數據做計算的過程。除reduce外,非正式地合並數據只能算做combine了。其實大家知道的,MapReduce中將Combiner等同於Reducer。
如果client設置過Combiner,那麽現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁盤的數據量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那麽哪些場景才能使用Combiner呢?從這裏分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

4、merge過程:merge是將多個溢寫文件合並到一個文件。每次溢寫會在磁盤上生成一個溢寫文件,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內存緩沖區中的數據也全部溢寫到磁盤中形成一個溢寫文件。最終磁盤中會至少有一個這樣的溢寫文件存在(如果map的輸出結果很少,當map執行完成時,只會產生一個溢寫文件),因為最終的文件只有一個,所以需要將這些溢寫文件歸並到一起,這個過程就叫做Merge。Merge是怎樣的?如前面的例子,“aaa”從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什麽是group。對於“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]},數組中的值就是從不同溢寫文件中讀取出來的,然後再把這些值加起來。請註意,因為merge是將多個溢寫文件合並到一個文件,所以可能也有相同的key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合並相同的key。

  至此,map端的所有工作都已結束,最終生成的這個文件也存放在TaskTracker夠得著的某個本地目錄內。每個reduce task不斷地通過RPC從JobTracker那裏獲取map task是否完成的信息,如果reduce task得到通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啟動。

  簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job裏每個map task的最終結果,然後對從不同地方拉取過來的數據不斷地做merge,也最終形成一個文件作為reduce task的輸入文件。

  Shuffle在reduce端的過程也能用三點來概括。當前reduce copy數據的前提是它要從JobTracker獲得有哪些map task已執行結束。Reducer真正運行之前,所有的時間都是在拉取數據,做merge,且不斷重復地在做。如前面的方式一樣,下面我也分段地描述reduce 端的Shuffle細節:

1. Copy過程,簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。

2. Merge階段。這裏的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩沖區中,這裏的緩沖區大小要比map端的更為靈活,它基於JVM的heap size設置,因為Shuffle階段Reducer不運行,所以應該把絕大部分的內存都給Shuffle用。這裏需要強調的是,merge有三種形式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認情況下第一種形式不啟用,讓人比較困惑,是吧。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然後在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,然後啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。

3. Reducer的輸入文件。不斷地merge後,最後會生成一個“最終文件”。為什麽加引號?因為這個文件可能存在於磁盤上,也可能存在於內存中。對我們來說,當然希望它存放於內存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。然後就是Reducer執行,把結果放到HDFS上。

Shuffle產生的意義是什麽?
Shuffle過程的期望可以有:
完整地從map task端拉取數據到reduce 端。
在跨節點拉取數據時,盡可能地減少對帶寬的不必要消耗。
減少磁盤IO對task執行的影響。

每個map task都有一個內存緩沖區,存儲著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據該如何處理?
每個map task都有一個內存緩沖區,存儲著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的所有臨時文件做合並,生成最終的正式輸出文件,然後等待reduce task來拉數據。

MapReduce提供Partitioner接口,它的作用是什麽?
MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。默認對key hash後再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制並設置到job上。

什麽是溢寫?
在一定條件下將緩沖區中的數據臨時寫入磁盤,然後重新利用這塊緩沖區。這個從內存往磁盤寫數據的過程被稱為Spill,中文可譯為溢寫。

溢寫是為什麽不影響往緩沖區寫map結果的線程?
溢寫線程啟動時不應該阻止map的結果輸出,所以整個緩沖區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩沖區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB內存中寫,互不影響。


當溢寫線程啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這裏的排序也是對誰的排序?
當溢寫線程啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這裏的排序也是對序列化的字節做的排序。

溢寫過程中如果有很多個key/value對需要發送到某個reduce端去,那麽如何處理這些key/value值?
如果有很多個key/value對需要發送到某個reduce端去,那麽需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。

哪些場景才能使用Combiner呢?
Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

Merge的作用是什麽?
最終磁盤中會至少有一個這樣的溢寫文件存在(如果map的輸出結果很少,當map執行完成時,只會產生一個溢寫文件),因為最終的文件只有一個,所以需要將這些溢寫文件歸並到一起,這個過程就叫做Merge

每個reduce task不斷的通過什麽協議從JobTracker那裏獲取map task是否完成的信息?
每個reduce task不斷地通過RPC從JobTracker那裏獲取map task是否完成的信息

reduce中Copy過程采用是什麽協議?
Copy過程,簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。

reduce中merge過程有幾種方式?
merge有三種形式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認情況下第一種形式不啟用,讓人比較困惑,是吧。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然後在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,然後啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。

補充:

combiner:
背景:1、網絡寬帶嚴重被占降低程序效率。
2、單一節點承載過重降低程序性能。
解決方法:每一個map都可能會產生大量的本地輸出,combiner的作用就是對map端的輸出先做一次合並,以減少map和reduce節點之間的數據輸出量,以提高網絡IO性能。
設置map規約combiner
Job.setcombinerclass(myreducer.class);
執行後看到map的輸出和combine的輸入統計是一致的,而combine的輸出與reduce的輸入統計是一樣的。由此可以看出規約操作成功,而且執行在map的最後,reduce之前.
Partitioner:
1、哪個key到哪個Reducer的分配過程,是由Partitioner規定的。
2、MapReduce的使用者通常會指定Reduce任務和Reduce任務輸出文件的數量(R)。
用戶在中間key上使用分區函數來對數據進行分區,之後在輸入到後續任務執行進程。一個默認的分區函數式使用hash方法(比如常見的:hash(key) mod R)進行分區。hash方法能夠產生非常平衡的分區。
3、一般我們都會使用默認的分區函數HashPartitioner。

shuffle:
針對多個map任務的輸出按照不同的分區(Partition)通過網絡復制到不同的reduce任務節點。

map/reduce之間的shuffle,partition,combiner過程的詳解