1. 程式人生 > >大資料系列之Hadoop知識整理(七)MapReduce的核心之Shuffle詳解

大資料系列之Hadoop知識整理(七)MapReduce的核心之Shuffle詳解

1.MapReduce的核心之shuffle詳解

上一篇中我們介紹了MapReduce是什麼,以及MapReduce的執行過程,其中在執行過程中主要分為Map端與Reducer端,MapReduce計算模型主要完成了對映與化簡,在這其中,有一個最重要的過程那就是其核心——shuffle,shuffle翻譯過來也就是混洗。可能大家比較熟知的是JAVA API中的Collections.shuffle(List)方法,這個方法會隨機的打亂引數list裡的元素順序。我們先來看看官方給出的MapReduce裡面的shuffle,圖示如下:

             

在Hadoop這樣的叢集中,大部分的Map任務與Reducer任務是在不同的節點上執行的,這就會導致Reducer從不同的Map節點去拷貝輸出的資料,這樣就可能造成巨大的網路消耗,這種網路消耗是正常的,不可避免,我們能做的只有減少一些不必要的網路消耗。從基本的要求來說,我們希望shuffle為我們解決:

(1)能夠將Map端輸出的資料完整的對映到Reduce端,以保證Reducer能夠完整的獲取Map端的輸出資料

(2)Reduce在跨節點好從Map端獲取資料時,儘可能減少不必要的網路消耗

(3)儘量減少磁碟IO

面對上述,如果是我們自己設計shuffle,那麼目標是什麼呢?我覺得能優化的地方就是減少在網路中傳輸大量資料的消耗盡量使用記憶體,而不是磁碟。

在上一篇的MapReduce執行過程的圖示中,我們已經進行了分析,下面我們結合shuffle,進一步的分析一下Map端與Reduce端的執行過程。其中shuffle橫跨Map端與Reducer端

Map端如下:

上圖描述的是某一個Map的執行情況,對應了官網給出圖片的左半部分,Map端的shuffle對應了partition(分割槽),sort(排序),merge(歸併)。

整體來說,分為4個部分。每一個Map都有一個記憶體緩衝區,儲存著Map的輸出結果,當緩衝區內資料的數量達到百分之80時,溢位到磁碟的一個臨時檔案,當整個Map任務結束之後,再對磁碟中飯的這個Map所產生的所有臨時檔案進行合併,產生最終的正式輸出檔案,以備Reduce的呼叫。

每部分的說明如下:

第一,在Map任務執行時,他的輸入資料來源於分散式檔案系統HDFS的block,在輸入Map之前,會先對塊進行切片,處理原理是最小切片minSize大小是1,最大切片maxSplit大小是Long.Max,資料塊大小blockSize取3者中的中間值,我們假設這輸入的資料是“hello world hello”。

第二,通過Map的map函式處理之後,輸出<K,V>對,也就是<hello,1>,<world,1>,<hello,1>,Key是單詞,Value是數量1。然後輸出的Key通過shuffle的Partitioner進入到了不同的分割槽(預設的分割槽演算法是對Key進行Hash),分割槽是為了告訴資料進入到哪哪一個Reducer區中。 MapReduce提供Partitioner介面,我們可以自定義屬於自己任務的分割槽函式。在我們這個例子中,hello經過partition之後返回0,也就是把這個值交給第一個Reducer來進行資料處理.。

第三,已經分割槽之後的資料進入到了記憶體緩衝區,記憶體緩衝區的大小是有限制的,預設是100M,如果Map端輸出的資料在緩衝區中已經達到了百分之80(100 * 0.8=80),之後,就會開啟溢寫執行緒(Spill),將資料溢寫到磁碟的臨時目錄下,當溢寫執行緒啟動之後,會對著要溢位的80M資料進行排序,預設是按照字典順序進行排序。在這裡,我們先想一下,由於Map任務的輸出是要傳送到不同的Reducer端的,而記憶體緩衝區沒有將相同的資料例如<hello,1>,<hello,1>進行合併,如果傳送到網路中,這就必然造成了巨大的網路消耗,為了減少網路消耗,我們把Map任務輸出的相同的Key提前進行合併,例如將<hello,1>,<hello,1>合併成<hello,2>這個過程就叫做Combiner,也就做Map端的reduce。說白了,就是一個Reducer提前執行,只不過是發生在Map端。那哪些場景才能使用Combiner呢?從這裡分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等(注意:求平均等不能用Combiner)。Combiner的使用一定得慎重,如果用好,它對任務執行效率有幫助,反之會影響Reducer的最終結果。 

第四,Merge(歸併),將具有相同Key值的資料進行合併,例如<hello,1>,<hello,1>合併成<hello,[1,1]>,寫到磁碟上。特別注意:

每次溢寫都會在磁碟的臨時目錄下生成一個臨時檔案,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個溢寫檔案存在。當Map任務完成的時候,也會對這些溢寫檔案中的資料進行歸併(Merge),最終生成一個檔案,例如檔案1中<hello,1>,檔案2中<hello,2>,檔案3中<hello,3>合併之後就是<hello,[1,2,3]>。注意,如果這時候設定了Combiner則會執行來合併相同的Key。

至此,Map端的所有工作全部完成。簡單的說,就是在不停的在讀資料,輸出資料,分割槽,排序,合併

Reducer端如下:

Shufle在Reducer端總結起來一共三步,Copy,Merge,Reduce的輸入檔案

第一,Copy階段。Reducer會開啟一些Copy執行緒,通過HTTP的方式獲取Map端的輸出檔案。

第二,Merge階段。這裡的Merge如map端的Merge動作,只是陣列中存放的是不同Map端Copy來的數值。Copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比Map端的更為靈活,它基於JVM的heap size設定,因為Shuffle階段Reducer不執行,所以應該把絕大部分的記憶體都給Shuffle用。這裡需要強調的是,Merge有三種形式:1)記憶體到記憶體  2)記憶體到磁碟  3)磁碟到磁碟。預設情況下第一種形式不啟用,讓人比較困惑,是吧。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的Merge。與Map 端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種Merge方式一直在執行,直到沒有Map端的資料時才結束,然後啟動第三種磁碟到磁碟的Merge方式生成最終的那個檔案。 

第二,Reduce的輸入檔案。不斷的Merge之後,形成一個最終的檔案,這個檔案有可能在記憶體,也有可能存放在磁碟上,如果存在記憶體中,那麼就直接作為Reducer的輸入,但是預設情況下是儲存在磁碟上的。當Reducer的輸入檔案已經確定,整個Shuffle才會最終的結束。然後就是Reducer執行,把最後的結果儲存在HDFS上。