1. 程式人生 > >MapReduce框架在Yarn上的詳解

MapReduce框架在Yarn上的詳解

MapReduce任務解析

Timeline MapReduce Job

YARN上一個MapReduce任務叫做一個Job。一個Job的主程式在MapReduce框架上實現的應用名稱叫MRAppMaster.

MapReduce任務的Timeline

這是一個MapReduce作業執行時間:

  • Map 階段:根據資料塊會執行多個Map Task
  • Reduce 階段:根據配置項會執行多個Reduce Task

為提高Shuffle效率Reduce階段會在Map階段結束之前就開始。(直到所有MapTask完成之後ReduceTask才能完成,因為每個ReduceTask依賴所有的MapTask的結果)

Map階段

首先看看Map階段,一個

Job需要多少Map Task

使用者會提交什麼?

當一個客戶端提交的應用時會提供以下多種型別的資訊到YARN上。

  • 一個configuration(配置項):Hadoop有預設的配置項,所以即使什麼都不寫它也有預設的配置項載入。優先順序高到低順序是使用者指定的配置項>etc/conf下的XML>預設配置項
  • 一個JAR
    • 一個map()實現(Map抽象類的實現)
    • 一個combiner 實現(combiner抽象類的實現,預設是跟Reduce實現一樣)
    • 一個reduce()實現(Reduce抽象類的實現)
  • 輸出輸入資訊:
    • 輸入目錄:輸入目錄的指定,如輸入HDFS上的目錄、S3或是多少個檔案。
    • 輸出目錄:輸出目錄的指定,在HDFS還是在S3

輸入目錄中的檔案數用於決定一個JobMapTask的數量。

那麼到底會有多少個MapTask?

Application Master會為每一個split(分片)建立一個MapTask。通常情況下,每個檔案都會是一個split。如果檔案太大(大於128MHDFS預設塊大小)就會分為多個split並關聯到這個檔案,也就是一個檔案會產生多個Map Task。獲取split數量方法程式碼如下 getSplits() of the FileInputFormat class:

num_splits = 0
for each input file f:
   remaining = f.length
   while remaining / split_size > split_slope:
      num_splits += 1
      remaining -= split_size
split_slope = 1.1
split_size =~ dfs.blocksize

MapTask執行過程

Application Master會向Resource Maneger資源管理器提交job所需要的資源:為每一個split檔案申請一個container來執行Map Task

為了提高檔案讀取效率containermap split所在的機器上執行是最為理想的。因此AM會根據資料本地性>CPU>記憶體匹配的方式分配container

  • 如果發現一個Node Manager上有所需的map split那麼相關的container就會分配到這個NM(因為根據HDFS備份機制有3臺機器上同時擁有相同的塊);
  • 否則, 會分配到機櫃內的其他機器上;
  • 否則, 會分配到叢集上的任何一個機器上

當容器被分配給AMMap Task任務就會啟動。

Map 階段:示例

Map Phase execution

這是一個典型的Map執行場景:

  • 2Node Manager:每個Node Manager擁有2GB記憶體,而每個MapTask需要1GB記憶體,因此每個NM可以同時執行2container
  • 沒有其他的應用程式在叢集中執行
  • 我們的job9split (例如,在輸入目錄裡有8個檔案,但其中只有一個是大於HDFS塊大小的檔案,所以我們把它分為2map split);因此需要9map

MapTask執行的Timeline

Map Task Execution Timeline

現在讓我們專注於一個Map Task任務。這是Map Task任務執行時間線:

  • 初始化(INIT)階段:初始化Map Task(預設是什麼都沒有。。)
  • 執行(EXECUTION)階段: 對於每個 (key, value)執行map()函式
  • 排序(SPILLING)階段:map輸出會暫存到記憶體當中排序,當快取達到一定程度時會寫到磁碟上,並刪除記憶體裡的資料
  • SHUFFLE 階段:排序結束後,會合並所有map輸出,並分割槽傳輸給reduce

MapTask:初始化(INIT

1. 建立一個Task上下文,Reduce也繼承自它(TaskAttemptContext.class)

2. 建立MAP例項Mapper.class

3. 設定input (e.g., InputFormat.class, InputSplit.class, RecordReader.class)

4. 設定output (NewOutputCollector.class)

5. 建立mapper的上下文(MapContext.class, Mapper.Context.class)

6. 初始化輸入,例如

7. 建立一個SplitLineReader.class object

8. 建立一個HdfsDataInputStream.class object

MapTask:執行(EXECUTION

MapTask execution

Map的執行階段從 Mapper classrun 方法開始,我們通常要寫的也就是它了。預設情況下run之前會呼叫setup方法:這個函式沒有做任何事情,但是我們可以重寫它來配置相關的類變數等資訊。執行setup方法之後會對每一個<key, value>執行map()函式。之後map context會儲存這些資料到一個快取區,為後續排序做準備。

map執行完處理時,還會呼叫一個clean方法:預設情況下,也不執行任何操作,但使用者也可以重寫它。

MapTask:排序(SPILLING

Spilling phase

執行階段期間map會把資料寫進一個快取區(MapTask.MapOutputBuffer)。這個快取大小由配置項設定mapreduce.task.io.sort.mb  (預設:100MB)。為了提高硬碟刷寫速度快取區達到80%會寫資料到磁碟,會有一個單獨的執行緒並行執行。當快取區容量達到100%那麼就要等到這個單獨的執行緒把資料寫完才能繼續執行map方法。

排序執行緒會執行以下動作:

1. 建立一個SpillRecord和一個FSOutputStream (在本地檔案系統)

2. 在記憶體中對鍵值對進行快速排序

3. 分割槽

4. 按順序寫入本地分割槽檔案。

Shuffle階段


shuffle階段主要是做資料的排序和合並操作,然後把資料存到本地檔案系統上,等待Reduce來獲取資料。等到所有的MapTask產出的資料傳輸都Reduce機器上,並對資料進行排序以後才能算是Shuffle過程的結束。也就說從Map函數出來之後到Reduce函式之前的所有資料操作都叫Shuffle操作,包括排序、合併、分割槽、傳輸等。

Reduce階段

Reduce階段的run與Map階段的run執行是類似的。

refhttp://ercoppa.github.io/HadoopInternals/AnatomyMapReduceJob.html