1. 程式人生 > >mapreduce shuffle 和sort 詳解

mapreduce shuffle 和sort 詳解

改變 struct 堆內存 傳輸 工具 默認 臨時 arc 快速排序

MapReduce 框架的核心步驟主要分兩部分:Map 和Reduce。當你向MapReduce 框架提交一個計算作業時,它會首先把計算作業拆分成若幹個Map 任務,然後分配到不同的節點上去執行,每一個Map 任務處理輸入數據中的一部分,當Map 任務完成後,它會生成一些中間文件,這些中間文件將會作為Reduce 任務的輸入數據。Reduce 任務的主要目標就是把前面若幹個Map 的輸出匯總到一起並輸出。

技術分享

本文的重點是剖析MapReduce 的核心過程——ShuffleSort。在本文中,Shuffle是指從Map 產生輸出開始,包括系統執行排序以及傳送Map 輸出到Reducer 作為輸入的過程。在這裏我們將去探究Shuffle

是如何工作的,因為對基礎的理解有助於對MapReduce 程序進行調優。

首先從Map 端開始分析。當Map 開始產生輸出時,它並不是簡單的把數據寫到磁盤,因為頻繁的磁盤操作會導致性能嚴重下降。它的處理過程更復雜,數據首先是寫到內存中的一個緩沖區,並做了一些預排序,以提升效率。

每個Map 任務都有一個用來寫入輸出數據的循環內存緩沖區。這個緩沖區默認大小是100MB,可以通過io.sort.mb 屬性來設置具體大小。當緩沖區中的數據量達到一個特定閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默認是0.80)時,系統將會啟動一個後臺線程把緩沖區中的內容spill 到磁盤。在spill 過程中,Map 的輸出將會繼續寫入到緩沖區,但如果緩沖區已滿,Map 就會被阻塞直到spill 完成。spill 線程在把緩沖區的數據寫到磁盤前,會對它進行一個二次快速排序,首先根據數據所屬的partition 排序,然後每個partition 中再按Key 排序。輸出包括一個索引文件和數據文件。如果設定了Combiner,將在排序輸出的基礎上運行。Combiner 就是一個Mini Reducer,它在執行Map 任務的節點本身運行,先對Map 的輸出做一次簡單Reduce,使得Map 的輸出更緊湊,更少的數據會被寫入磁盤和傳送到Reducer。spill 文件保存在由mapred.local.dir指定的目錄中,Map 任務結束後刪除。

每當內存中的數據達到spill 閥值的時候,都會產生一個新的spill 文件,所以在Map任務寫完它的最後一個輸出記錄時,可能會有多個spill 文件。在Map 任務完成前,所有的spill 文件將會被歸並排序為一個索引文件和數據文件,如圖3 所示。這是一個多路歸並過程,最大歸並路數由io.sort.factor 控制(默認是10)。如果設定了Combiner,並且spill文件的數量至少是3(由min.num.spills.for.combine 屬性控制),那麽Combiner 將在輸出文件被寫入磁盤前運行以壓縮數據。

對寫入到磁盤的數據進行壓縮(這種壓縮同Combiner 的壓縮不一樣)通常是一個很好的方法,因為這樣做使得數據寫入磁盤的速度更快,節省磁盤空間,並減少需要傳送到Reducer 的數據量。默認輸出是不被壓縮的, 但可以很簡單的設置mapred.compress.map.output 為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec 來設定,

目前主要有以下幾個壓縮格式:

DEFLATE 無DEFLATE .deflate 不支持不可以

gzip gzip DEFLATE .gz 不支持不可以

ZIP zip DEFLATE .zip 支持可以

bzip2 bzip2 bzip2 .bz2 不支持可以

LZO lzop LZO .lzo 不支持不可以

當spill 文件歸並完畢後,Map 將刪除所有的臨時spill 文件,並告知TaskTracker 任務已完成。Reducers 通過HTTP 來獲取對應的數據。用來傳輸partitions 數據的工作線程數由tasktracker.http.threads 控制,這個設定是針對每一個TaskTracker 的,並不是單個Map,默認值為40,在運行大作業的大集群上可以增大以提升數據傳輸速率。

現在讓我們轉到Shuffle的Reduce 部分。Map 的輸出文件放置在運行Map 任務的TaskTracker 的本地磁盤上(註意:Map 輸出總是寫到本地磁盤,但Reduce 輸出不是,一般是寫到HDFS),它是運行Reduce 任務的TaskTracker 所需要的輸入數據。Reduce 任務的輸入數據分布在集群內的多個Map 任務的輸出中,Map 任務可能會在不同的時間內完成,只要有其中的一個Map 任務完成,Reduce 任務就開始拷貝它的輸出。這個階段稱之為拷貝階段。Reduce 任務擁有多個拷貝線程, 可以並行的獲取Map 輸出。可以通過設定mapred.reduce.parallel.copies 來改變線程數,默認是5。

Reducer 是怎麽知道從哪些TaskTrackers 中獲取Map 的輸出呢?當Map 任務完成之後,會通知它們的父TaskTracker,告知狀態更新,然後TaskTracker 再轉告JobTracker。這些通知信息是通過心跳通信機制傳輸的。因此針對一個特定的作業,JobTracker 知道Map 輸出與TaskTrackers 的映射關系。Reducer 中有一個線程會間歇的向JobTracker 詢問Map 輸出的地址,直到把所有的數據都取到。在Reducer 取走了Map 輸出之後,TaskTrackers 不會立即刪除這些數據,因為Reducer 可能會失敗。它們會在整個作業完成後,JobTracker告知它們要刪除的時候才去刪除。

如果Map 輸出足夠小,它們會被拷貝到Reduce TaskTracker 的內存中(緩沖區的大小

由mapred.job.shuffle.input.buffer.percent 控制,制定了用於此目的的堆內存的百分比);如果緩沖區空間不足,會被拷貝到磁盤上。當內存中的緩沖區用量達到一定比例閥值(由mapred.job.shuffle.merge.threshold 控制),或者達到了Map 輸出的閥值大小(由mapred.inmem.merge.threshold 控制),緩沖區中的數據將會被歸並然後spill 到磁盤。

拷貝來的數據疊加在磁盤上,有一個後臺線程會將它們歸並為更大的排序文件,這樣做節省了後期歸並的時間。對於經過壓縮的Map 輸出,系統會自動把它們解壓到內存方便對其執行歸並。

當所有的Map 輸出都被拷貝後,Reduce 任務進入排序階段(更恰當的說應該是歸並階段,因為排序在Map 端就已經完成),這個階段會對所有的Map 輸出進行歸並排序,這個工作會重復多次才能完成。

假設這裏有50 個Map 輸出(可能有保存在內存中的),並且歸並因子是10(由io.sort.factor 控制,就像Map 端的merge 一樣),那最終需要5 次歸並。每次歸並會把10個文件歸並為一個,最終生成5 個中間文件。在這一步之後,系統不再把5 個中間文件歸並壓縮格式工具算法擴展名支持分卷是否可分割成一個,而是排序後直接“餵”給Reduce 函數,省去向磁盤寫數據這一步。最終歸並的數據可以是混合數據,既有內存上的也有磁盤上的。由於歸並的目的是歸並最少的文件數目,使得在最後一次歸並時總文件個數達到歸並因子的數目,所以每次操作所涉及的文件個數在實際中會更微妙些。譬如,如果有40 個文件,並不是每次都歸並10 個最終得到4 個文件,相反第一次只歸並4 個文件,然後再實現三次歸並,每次10 個,最終得到4 個歸並好的文件和6 個未歸並的文件。要註意,這種做法並沒有改變歸並的次數,只是最小化寫入磁盤的數據優化措施,因為最後一次歸並的數據總是直接送到Reduce 函數那裏。

在Reduce 階段,Reduce 函數會作用在排序輸出的每一個key 上。這個階段的輸出被直接寫到輸出文件系統,一般是HDFS。在HDFS 中,因為TaskTracker 節點也運行著一個DataNode 進程,所以第一個塊備份會直接寫到本地磁盤。

到此,MapReduce 的ShuffleSort分析完畢。

mapreduce shuffle 和sort 詳解