1. 程式人生 > >Mapreduec流程之Shuffle過程詳解

Mapreduec流程之Shuffle過程詳解

作為整個Mapreduce中最為神祕,複雜的部分,恰恰是平時業務中最經常接觸的地方。僅僅依靠map和reduce階段的業務程式碼編輯,是不能滿足平時的業務需要的。真正的業務處理中,經常會涉及到自定義partition,sort,groupcomparator等情況。而只有瞭解清楚了shuffle階段是怎麼執行的,才能更好的幫助我們按需修改mapreduce中的各個元件。

廢話不說,直接進入主題!

Shuffle階段,詳細的來說可以分為三個部分:map-shuffle,fetch和reduce-shuffle。整個shuffle階段的開始是從map輸出鍵值對開始,到reduce階段接受鍵值對之前。

name我們就從第一個階段map-shuffle開始說起。通過查閱maptask的原始碼,我們知道,context.write方法其實等價於output.write方法,而output.write方法其實是呼叫了底層的collector的collect方法,該方法有三個引數,分別為:key,value 和getPartition的返回值---reducetask的個數。通過collector物件,我們發現mapOutputCollector的父類mapOutputBuffer實現了一個很複雜的功能,就是我們常說的環形緩衝區。由此可以得出,從maptask出來的資料全部都進入了這個環形緩衝區當中。下面我們來具體聊下這個環形緩衝區的作用。

這個所謂的環形緩衝區,其實就是一個首尾相連的記憶體區域。從在mapOutputBuffer中定義了一個數組,kvbuffer[]=100M,我們得知這個記憶體區域的大小為100M。那麼問題來了,我們邏輯切片的大小一般都等於切分的資料塊的大小-128M,100M的記憶體區顯然是裝不下一整個資料塊的資料的。因此,這裡我們需要了解到一個叫溢位的概念。就是當緩衝區內資料讀取到一定程度的時候,資料就會溢位,持久化到磁碟。而真實情況是,在這個緩衝區內,真正處理資料的記憶體只有80M,預留了20M的記憶體,保證了緩衝區可以邊存邊寫。而這80M裡面,還包括一部分的索引大小,因此實際儲存的資料量要比80M還小。那麼這個緩衝區,資料到底進行了什麼處理呢?就是分割槽,然後持久化到磁碟。經過緩衝區處理的資料,根據鍵的大小,被分成了不同的部分,也就是我們所說的partition的元件乾的事。記憶體不斷的讀取資料,分割槽,排序(快速排序),然後溢位到磁碟。這個階段就是shuffle中的map-shuffle階段。

而其中的fetch階段就比較簡單了,就是指溢位的資料持久化到磁碟的過程。以便reduce-shuffle階段拉取資料。

對於reduce-shuffle階段,線路就比較清晰了。map-shuffle階段溢位的資料,持久化到磁碟之後,都帶上了分割槽編號。此時,reduce-shuffle階段的主要工作就是不同的reducer根據自己需要處理的資料的編號去拉去那些溢位持久化到磁碟的資料。一個reducetask能夠獲取多個maptask溢位的資料,然後reducetask會對資料進行歸併,將多個數據塊合併,排序(遞迴排序),生成一個完整的資料塊。該資料塊順序儲存了多個鍵值對,按照鍵的排序,將值儲存在對應的迭代器中,也就是我們在reducer的reduce方法中的iterator引數。

暫時不方便上圖,稍後補上。另外補一句,為啥mapreduce計算速度慢?這一次次的磁碟存和取,還能快到哪裡去。。。。不過好訊息,據說hadoop3.x版本,進行了比較徹底的改進,速度幾十倍的提高了。看來有必要去研究研究了。