1. 程式人生 > >Hadoop學習之路(二十三)MapReduce中的shuffle詳解

Hadoop學習之路(二十三)MapReduce中的shuffle詳解

就是 多個 流程 http cer 分開 分享圖片 數據分區 bsp

概述

1、MapReduce 中,mapper 階段處理的數據如何傳遞給 reducer 階段,是 MapReduce 框架中 最關鍵的一個流程,這個流程就叫 Shuffle

2、Shuffle: 數據混洗 ——(核心機制:數據分區,排序,局部聚合,緩存,拉取,再合並 排序)

3、具體來說:就是將 MapTask 輸出的處理結果數據,按照 Partitioner 組件制定的規則分發 給 ReduceTask,並在分發的過程中,對數據按 key 進行了分區和排序

MapReduce的Shuffle過程介紹

Shuffle的本義是洗牌、混洗,把一組有一定規則的數據盡量轉換成一組無規則的數據,越隨機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規則的數據盡量轉換成一組具有一定規則的數據。

為什麽MapReduce計算模型需要Shuffle過程?我們都知道MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責數據的過濾分發;Reduce是規約,負責數據的計算歸並。Reduce的數據來源於Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來獲取數據。

從Map輸出到Reduce輸入的整個過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:

技術分享圖片

Spill過程

Spill過程包括輸出、排序、溢寫、合並等步驟,如圖所示:

技術分享圖片

Collect

每個Map任務不斷地以對的形式把數據輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。

這個數據結構其實就是個字節數組,叫Kvbuffer,名如其義,但是這裏面不光放置了數據,還放置了一些索引數據,給放置索引數據的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(字節序采用的是平臺自身的字節序)的馬甲。數據區域和索引數據區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來劃分兩者,分界點不是亙古不變的,而是每次Spill之後都會更新一次。初始的分界點是0,數據的存儲方向是向上增長,索引數據的存儲方向是向下增長,如圖所示:

技術分享圖片

Kvbuffer的存放指針bufindex是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之後,bufindex增長為4,一個Int型的value寫完之後,bufindex增長為8。

索引是對在kvbuffer中的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然後再向上一個格子一個格子地填充四元組的數據。比如Kvindex初始位置是-4,當第一個寫完之後,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然後Kvindex跳到-8位置,等第二個和索引寫完之後,Kvindex跳到-32位置。

Kvbuffer的大小雖然可以通過參數設置,但是總共就那麽大,和索引不斷地增加,加著加著,Kvbuffer總有不夠用的那天,那怎麽辦?把數據從內存刷到磁盤上再接著往內存寫數據,把Kvbuffer中的數據刷到磁盤上的過程就叫Spill,多麽明了的叫法,內存中的數據滿了就自動地spill到具有更大空間的磁盤。

關於Spill觸發的條件,也就是Kvbuffer用到什麽程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點縫都不剩的時候再開始Spill,那Map任務就需要等Spill完成騰出空間之後才能繼續寫數據;如果Kvbuffer只是滿到一定程度,比如80%的時候就開始Spill,那在Spill的同時,Map任務還能繼續寫數據,如果Spill夠快,Map可能都不需要為空閑空間而發愁。兩利相衡取其大,一般選擇後者。

Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到“命令”之後就開始正式幹活,幹的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。

Sort

先把Kvbuffer中的數據按照partition值和key兩個關鍵字升序排序,移動的只是索引數據,排序結果是Kvmeta中數據按照partition為單位聚集在一起,同一partition內的按照key有序。

Spill

Spill線程為這次Spill過程創建一個磁盤文件:從所有的本地目錄中輪訓查找能存儲這麽大空間的目錄,找到之後在其中創建一個類似於“spill12.out”的文件。Spill線程根據排過序的Kvmeta挨個partition的把數據吐到這個文件中,一個partition對應的數據吐完之後順序地吐下個partition,直到把所有的partition遍歷完。一個partition在文件中對應的數據也叫段(segment)。

所有的partition對應的數據都放在這個文件裏,雖然是順序存放的,但是怎麽直接知道某個partition在這個文件中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個partition對應的數據在這個文件中的索引:起始位置、原始數據長度、壓縮之後的數據長度,一個partition對應一個三元組。然後把這些索引信息存放在內存中,如果內存中放不下了,後續的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪訓查找能存儲這麽大空間的目錄,找到之後在其中創建一個類似於“spill12.out.index”的文件,文件中不光存儲了索引數據,還存儲了crc32的校驗數據。(spill12.out.index不一定在磁盤上創建,如果內存(默認1M空間)中能放得下就放在內存中,即使在磁盤上創建了,和spill12.out文件也不一定在同一個目錄下。)

每一次Spill過程就會最少生成一個out文件,有時還會生成index文件,Spill的次數也烙印在文件名中。索引文件和數據文件的對應關系如下圖所示:

技術分享圖片

在Spill線程如火如荼的進行SortAndSpill工作的同時,Map任務不會因此而停歇,而是一無既往地進行著數據輸出。Map還是把數據寫到kvbuffer中,那問題就來了:只顧著悶頭按照bufindex指針向上增長,kvmeta只顧著按照Kvindex向下增長,是保持指針起始位置不變繼續跑呢,還是另謀它路?如果保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之後再重新開始或者移動內存都比較麻煩,不可取。Map取kvbuffer中剩余空間的中間位置,用這個位置設置為新的分界點,bufindex指針移動到這個分界點,Kvindex移動到這個分界點的-16位置,然後兩者就可以和諧地按照自己既定的軌跡放置數據了,當Spill完成,空間騰出之後,不需要做任何改動繼續前進。分界點的轉換如下圖所示:

技術分享圖片

Map任務總要把輸出的數據寫到磁盤上,即使輸出數據量很小在內存中全部能裝得下,在最後也會把數據刷到磁盤上。

Merge

Map任務如果輸出數據量很大,可能會進行好幾次Spill,out文件和Index文件會產生很多,分布在不同的磁盤上。最後把這些文件進行合並的merge過程閃亮登場。

Merge過程怎麽知道產生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產生的Spill文件,然後把路徑存儲在一個數組裏。Merge過程又怎麽知道Spill的索引信息呢?沒錯,也是從所有的本地目錄上掃描得到Index文件,然後把索引信息存儲在一個列表裏。到這裏,又遇到了一個值得納悶的地方。在之前Spill過程中的時候為什麽不直接把這些信息存儲在內存中呢,何必又多了這步掃描的操作?特別是Spill的索引數據,之前當內存超限之後就把數據寫到磁盤,現在又要從磁盤把這些數據讀出來,還是需要裝到更多的內存中。之所以多此一舉,是因為這時kvbuffer這個內存大戶已經不再使用可以回收,有內存空間來裝這些數據了。(對於內存空間較大的土豪來說,用內存來省卻這兩個io步驟還是值得考慮的。)

然後為merge過程創建一個叫file.out的文件和一個叫file.out.Index的文件用來存儲最終的輸出和索引。

一個partition一個partition的進行合並輸出。對於某個partition來說,從索引列表中查詢這個partition對應的所有索引信息,每個對應一個段插入到段列表中。也就是這個partition對應一個段列表,記錄所有的Spill文件中對應的這個partition那段數據的文件名、起始位置、長度等等。

然後對這個partition對應的所有的segment進行合並,目標是合並成一個segment。當這個partition對應很多個segment時,會分批地進行合並:先從segment列表中把第一批取出來,以key為關鍵字放置成最小堆,然後從最小堆中每次取出最小的輸出到一個臨時文件中,這樣就把這一批段合並成一個臨時的段,把它加回到segment列表中;再從segment列表中把第二批取出來合並輸出到一個臨時segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的文件中。

最終的索引數據仍然輸出到Index文件中。

技術分享圖片

Map端的Shuffle過程到此結束。

Copy

Reduce任務通過HTTP向各個Map任務拖取它所需要的數據。每個節點都會啟動一個常駐的HTTP server,其中一項服務就是響應Reduce拖取Map數據。當有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應的Map輸出文件中對應這個Reduce部分的數據通過網絡流輸出給Reduce。

Reduce任務拖取某個Map對應的數據,如果在內存中能放得下這次數據的話就直接把數據寫到內存中。Reduce要向每個Map去拖取數據,在內存中每個Map對應一塊數據,當內存中存儲的Map數據占用空間達到一定程度的時候,開始啟動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中。

如果在內存中不能放得下這個Map的數據的話,直接把Map數據寫到磁盤上,在本地目錄創建一個文件,從HTTP流中讀取數據然後寫到磁盤,使用的緩存區大小是64K。拖一個Map數據過來就會創建一個文件,當文件數量達到一定閾值時,開始啟動磁盤文件merge,把這些文件合並輸出到一個文件。

有些Map的數據較小是可以放在內存中的,有些Map的數據較大需要放在磁盤上,這樣最後Reduce任務拖過來的數據有些放在內存中了有些放在磁盤上,最後會對這些來一個全局合並。

Merge Sort

這裏使用的Merge和Map端使用的Merge過程一樣。Map的輸出數據已經是有序的,Merge進行一次合並排序,所謂Reduce端的sort過程就是這個合並的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。

Reduce端的Shuffle過程至此結束。

Hadoop學習之路(二十三)MapReduce中的shuffle詳解