1. 程式人生 > >MapReduce 圖解流程超詳細解答(1)-【map階段】

MapReduce 圖解流程超詳細解答(1)-【map階段】


  • Map Phase:若干 Map Tasks 被執行
  • Reduce Phase: 若干Reduce Tasks 被執行

reduce可能會在map階段結束之前開始執行,因此上面顯示的有重疊的地方。

現在我們集中考察map相,一個關鍵的問題是一個應用需要多少map任務去執行現在的這個job

使用者給了我們什麼?

我們退回到之前的一步,當一個使用者提交一個應用的時候,若干資訊被提供給了YARN ,分別是:

  • 一個配置:這可以是一部分的,因為一些引數不需要使用者特別指定,可以有自己的預設值。
  • 一個jar檔案,含有一個map,一個combiner,一個reduce
  • 一個輸入和輸出資訊 輸入目錄 是不是在hdfs上,有多少檔案呢?輸出的時候,我們儲存在哪裡

The number of files inside the input directory is used for deciding the number of Map Tasks of a job.
那麼,輸入的目錄中檔案的數量決定多少個map會被執行起來

多少個map任務?

應用針對每一個分片執行一個map,一般而言,對於每一個輸入的檔案會有一個map split。如果輸入檔案太大,超過了hdfs塊的大小(64M)那麼對於同一個輸入檔案我們會有多餘2個的map執行起來。下面是FileInputFormat class 的getSplits()的虛擬碼: 

num_splits = 0
for each input file f:
   remaining = f.length
   while remaining / split_size > split_slope:
      num_splits += 1
      remaining -= split_size

where:

split_slope = 1.1 分割斜率
split_size =~ dfs.blocksize 分割大小約等於hdfs塊大小

在mapreduce2.0以上版本mapreduce.job.maps 屬性會被忽略

MapTask Launch
啟動MapTask

mapreduce應用會向資源管理器請求這個job需要的容器,一個maptask容器請求每一個maptask。一個容器對每一個maptask的請求會嘗試利用map分片的本地性,應用會請求一下資料:

  • 請求map split 和container在同一個節點管理器的container
  • 如果沒有,請求一個map split 和container在
    同一個機架上的節點管理器上的container
  • 否則請求任意節點管理器上的container

這只是一小部分資源任務。資源任務器在資源任務器既定目標和指定目標衝突的時候,可以忽略本地性。當一個容器被分配一個任務,map就馬上啟動了。

Map階段:一個執行階段的例子

Map Phase execution

map 相的一個簡要圖:

  • 有兩個節點管理器:每一個2GB的記憶體,每一個map需要1GB我們可以並行執行兩個容器。這是最好的情況,而資源任務器的決策可能會有所不同
  • 叢集沒有其他的YARN任務執行
  • 我們的job有8個map分片,也就是在輸入資料夾中有7個檔案,只有一個是大於hdfs塊大小的,需要被拆分為兩個檔案。


map任務的執行時間線

Map Task Execution Timeline

現在我們可以聚焦單個的map task:這是單個map的執行時間線:

  • 初始相:我們設定map任務
  • 執行相:map分片裡面的每一個鍵值對進行map()函式運算
  • 溢寫相:map的輸出儲存在環形記憶體緩衝區,當緩衝區滿80%(一般80%),啟動溢寫相,將緩衝的資料寫出到磁碟。
  • 洗牌相:在溢寫相的結尾,我們合併多有的輸出,並且打包他們以便進行reduce相處理。


map任務:初始化

在初始化階段,我們:

  1. 建立一個上下文物件(context)(TaskAttemptContext
  2. 建立使用者map.class例項
  3. 設定輸入
  4. 設定輸出
  5. 建立mapper的上下文(MapContext.classMapper.Context.class)
  6. 初始化輸入也就是:
  7. 建立 SplitLineReader.class 分片行閱讀器
  8. 建立HdfsDataInputStream.class hdfs資料輸入流


Map任務:執行階段

MapTask execution

執行階段通過Mapper class.的run()方法:

使用者可以重寫這個方法,但是預設的時候通常會呼叫setup而啟動這個程式。這個函式預設並不做什麼有用的 事情,但是可以被使用者覆蓋重寫以便於設定任務(例如初始化類的變數),當設定完成之後,分片的每一個鍵值對會激發map()方法。因此map()接收到一個鍵,一個值,以及一個上下文context。使用這個上下文物件,一個map就會儲存其輸出到快取中。

請注意,map分片是一個快一個塊擷取的(例如64kb),每一個快分割成為若干鍵值對的資料( SplitLineReader.class乾的好事),這是在Mapper.Context.nextKeyValue內部完成的。當map分片被全部處理之後,run()會呼叫clean()方法。預設的,沒有什麼會被執行,除非使用者重寫覆蓋他。


map任務:溢寫階段

Spilling phase

正如我們在執行階段看到的一樣,map會使用Mapper.Context.write()將map函式的輸出溢寫到記憶體中的環形緩衝區(MapTask.MapOutputBuffer)。緩衝區的大小是固定的,通過mapreduce.task.io.sort.mb (default: 100MB)指定。 任何時候當這個緩衝區將要充滿的時候(mapreduce.map. sort.spill.percent: 預設80% ),溢寫將會被執行(這是一個並行過程,使用的是單獨的執行緒,緩衝池還可以繼續被寫入)。如果溢寫執行緒太慢,而緩衝區又忙了的話,map()就會暫停執行而等待。 溢寫執行緒執行下面的動作:
  1. 建立一個溢寫記錄SpillRecord 和一個FSOutputStream 檔案輸出流(本地檔案系統)
  2. 記憶體內排序緩衝中的塊:輸出的資料會使用快排演算法按照partitionIdx, key排序
  3. 排序之後的輸出會分割成為分割槽:每一個分割槽對應一個reduce
  4. 分割槽序列化寫到本地檔案