1. 程式人生 > >MapReduce-shuffle過程詳解

MapReduce-shuffle過程詳解

等待 通知 10個 線程數 硬盤 res .sh 現在 溢出

Shuffle

技術分享圖片

map端

map函數開始產生輸出時,並不是簡單地將它寫到磁盤。這個過程很復雜,它利用緩沖的方式寫到內存並出於效率的考慮進行預排序。
每個map任務都有一個環形內存緩沖區用於存儲任務輸出。在默認情況下,緩沖區的大小為100MB,辭職可以通過io.sort.mb屬性來調整。一旦緩沖內容達到閾值(io.sort.spill.percent,默認是0.8),一個後臺線程便開始把內容溢出(spill)到磁盤。在溢出寫到磁盤過程中,map輸出繼續寫到緩沖區,但如果在此期間緩沖區被填滿,map會被阻塞知道寫磁盤過程完成。
溢出寫過程按輪詢方式將緩沖區中的內容寫到mapred.local.dir屬性指定的作業特定子目錄中的目錄中。
在寫磁盤之前,線程首先根據數據最終要傳的reducer把數據劃分成相應的分區(partition).在每個分區中,後臺線程按鍵進行內排序,如果有一個combiner,它就在排序後的輸出上運行。運行combiner使得map輸出結果更緊湊,因此減少寫到磁盤的數據和傳遞給reducer的數據。
每次內存緩沖區達到溢出閾值,就會新建一個溢出文件(spill file),因此在map任務寫完其最後一個輸出記錄之後,會有幾個溢出文件。在任務完成之前,溢出文件被合並成一個已分區且已排序的輸出文件。配置屬性io.sort.factor控制著一次最多能合並多少流,默認值是10.
如果至少存在3個溢出文件(通過min.num.spills.for.combine屬性設置)時,則combiner就會在輸出文件寫到磁盤之前再次運行。Combiner可以在輸入上反復運行,但並不影響最終結果。如果只有一兩個溢出文件,那麽對map輸出的減少方面不值得調用combiner,不會為該map輸出再次運行combiner。
在將壓縮map輸出寫到磁盤的過程中對它進行壓縮是個不錯的主意,因為這樣寫磁盤的速度更快,節約磁盤空間,並且減少傳給reducer的數據量。在默認情況下,輸出是不壓縮的,但只要將mapred.compress.map.output設置為true,就可以輕松啟用此功能。使用的壓縮庫由mapred.map.output.compression.codec指定
reducer通過HTTP方式得到輸出文件的分區。用於文件分區的工作線程的數量由任務的tracker.http.threads屬性控制,此設置針對的是每一個tasktracker,而不是針對每個map任務槽。默認值是40,在運行大型作業的大型集群上,辭職可以根據需要而增加。在MapReduce2中,該屬性是不適用的,因為使用的最大線程數是基於機器的處理器數量自動設定的。MapReduce2使用netty,默認情況下允許值為處理器數量的兩倍。

reduce端

map輸出文件位於運行map任務的tasktracker的本地磁盤(註意,盡管map輸出經常寫到map tasktracker的本地磁盤,但reduce輸出並不這樣),現在tasktracker需要為分區文件運行reduce任務。而且,reduce任務需要集群上若幹個map任務的map輸出作為其特殊的分區文件。每個map任務的完成時間可能不同,因此只要有一個任務完成,reduce任務就開始復制其輸出。這就是reduce任務的復制階段。Reduce任務有少量復制線程,因此能夠並行取得map輸出。默認是5個線程,但這個默認值可以通過設置mapred.reduce.parallel.copies屬性來改變。

Reduce如果知道要從哪臺幾區取得map輸出?
Map任務成功完成後,它們會通知其父tasktracker狀態已更新,然後tasktracker今兒通知jobtracker.(在MapReduce2中,任務直接通知其應用程序master。)這些通知在心跳通信機制中傳輸。因此,對於指定作業,jobtracker(或應用程序master)知道map輸出和tasktracker之間的映射關系。Reducer中的一個線程定期詢問jobtracker以便獲取map輸出的位置,知道獲得所有輸出位置。
由於第一個reducer可能失敗,因此tasktracker並沒有在第一個reducer檢索到map輸出時就立即從磁盤上刪除它們。相反,tasktracker會等待,直到jobtracker告知它刪除map輸出,這是作業完成後執行的。

如果map輸出相對小,會被復制到reduce任務JVM的內存(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,指定用於此用途的堆空間的百分比),否則,map輸出被復制到磁盤。一旦內存緩沖區達到閾值大小(由mapred.job.shuffle.merge.percent決定)或達到map輸出閾值(mapred.inmem.merge.threshold控制),則合並後溢出寫到磁盤中。如果指定combiner,則在合並期間運行它以降低寫入硬盤的數據量。
隨著磁盤上復本增多,後臺線程會將它們合並為更大的,排好序的文件。這會為後面的合並節省一些時間。註意,為了合並,壓縮的map輸出(通過map任務)都必須在內存中被解壓縮。
復制完所有map輸出後,reduce任務進入排序階段(更恰當的說法是合並階段,因為排序是在map端進行的),這個階段將合並map輸出,維持其順序排序。這是循環進行的。比如,如果有50個map輸出,而合並因子是10(默認是10,由io.sort.factor屬性設置,與map的合並類似),合並將進行5此,每次將10個文件合並成一個文件,因此最後有5個中間文件。
在最後階段,即reduce階段,直接把數據輸入reduce函數,從而省略了一次磁盤往返進程,並沒有將這5個文件合並成一個已排序的文件作為最後一趟。最後的合並可以來自內存和磁盤片段。
在reduce階段,對已排序輸出中的每個鍵調用reduce函數。此階段的輸出直接寫到輸出文件系統,一般為HDFS。如果采用HDFS,由於tasktracker節點(或節點管理器)也運行數據節點,所以第一塊復本將被寫到本地磁盤。

每次合並的文件數實際上比舉例中的有所不同。目標是合並最小數量的文件以便滿足最後一次的合並系數。因此,如果有40個文件,並不會在四次合並中每次合並10個文件而得到4個文件。相反,第一次只合並4個文件,隨後的三次合並所有10個文件。在最後一次合並中,4個已合並的文件和余下的6個(未合並的)文件合計10個文件。
註意,這並沒有改變合並次數,只是一個優化措施,目的是盡量減少寫到磁盤的數據量,因為最後一次總是直接合並到reduce

技術分享圖片

MapReduce-shuffle過程詳解