1. 程式人生 > >MapReduce系列之MapReduce任務處理流程

MapReduce系列之MapReduce任務處理流程

MapReduce處理資料的流程一般是這樣的:

1、從HDFS上讀取資料,因為是分散式與平行計算,需要將資料劃分給多個MapReduce任務。HDFS儲存檔案也是分塊的,每個MapReduce的輸入一般是和HDFS的資料塊是對應的。也就是說一個HDFS資料塊作為一個MapReduce任務的輸入。這是Hadoop預設的情況,我們也可以實現InputFormat自定義輸入格式。

2、Map進行計算:這一步和Reduce都是由使用者根據需要實現的。在WordCount例子中,對每個單詞做對映,word-->(word,1)

3、shuffle and sort:這一步是MapReduce的核心,但使用者基本不用管,可能會根據具體的需要自定義比較器和分割槽器。具體詳細過程如下:

  •         maptask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中
  •         從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案
  •         多個溢位檔案會被合併成大的溢位檔案
  •         在溢位過程中,及合併的過程中,都要呼叫partitoner進行分組和針對key進行排序
  •         reducetask根據自己的分割槽號,去各個maptask機器上取相應的結果分割槽資料
  •         reducetask會取到同一個分割槽的來自不同maptask的結果檔案,reducetask會將這些檔案再進行合併(歸併排序)
  •         合併成大檔案後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,呼叫使用者自定義的reduce()方法)

Shuffle中的緩衝區大小會影響到mapreduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快

緩衝區的大小可以通過引數調整,  引數:io.sort.mb  預設100M。

4、Reduce:和Map一樣需要使用者根據具體需求實現。在WordCount例子中,對每個單詞的一系列值做加法。

5、將計算結果輸出到HDFS,可以實現OutputFormat介面自定義輸出格式。

以上就是MapReduce程式設計的一個大體流程。

附  WordCount例子程式碼:https://github.com/taowenjun/MapReduce/tree/master/cn/tao/wordcount

宣告:圖片來自網路