1. 程式人生 > >MapReduce階段原始碼分析以及shuffle過程詳解

MapReduce階段原始碼分析以及shuffle過程詳解

MapReducer工作流程圖:
MapReduce階段原始碼分析以及shuffle過程詳解

1. MapReduce階段原始碼分析

1)客戶端提交原始碼分析

MapReduce階段原始碼分析以及shuffle過程詳解
解釋
   - 判斷是否列印日誌
   - 判斷是否使用新的API,檢查連線
   - 在檢查連線時,檢查輸入輸出路徑,計算切片,將jar、配置檔案複製到HDFS
   - 計算切片時,計算最小切片數(預設為1,可自定義)和最大切片數(預設是long的最大值,可以自定義)
   - 檢視給定的是否是檔案,如果是否目錄計算目錄下所有檔案的切片
   - 通過block大小和最小切片數、最大切片數計算出切片大小
   - 過切片大小,計算出map的數量以及分發到的節點
   - 提交job給yarn,進行MapReduce計算

2)map階段原始碼分析原始碼分析(Map 的input階段)

MapReduce階段原始碼分析以及shuffle過程詳解
解釋
   - 首先Map Task任務,呼叫run()方法,run()方法會經過以下幾個階段
   - 初始化taskcontext物件
   - 對mapper物件的初始化,此處包括一個預設值的判斷,如果沒有自定義mapper類,預設用系統的Mapper
   - 對檔案輸入的格式化,此處包括一個預設值的判斷,如果沒有自定義inputFormat類,預設用系統的TextinputFormat
   - 建立input物件,建立具體的檔案讀取類,通過lineReader(),預設每次迭代讀取一行,此處實現一個迭代的判斷的nextKeyVaule(),並在nextKeyVaule實現時初始化key和value
   - Input初始化:計算開啟位置,讀取檔案內容,(放棄第一行)
   - 呼叫mapper的run方法迴圈讀取,直到末尾,多讀一行,start放棄第一行的資料被上一個切片讀到,注意這裡的run方法中就會呼叫我們編寫的Mapper類中的setup、map、cleanup方法

3)map階段原始碼分析原始碼分析(Map 的output階段)

MapReduce階段原始碼分析以及shuffle過程詳解
解釋
   - 由newOutCollector建立output物件
   - newOutCollector中需要準備collector和partitions計算reduce數量,會將map端輸出的K,V,P(分割槽號)寫入collector中
   - 在準備collector實際上是準備MpaOutputBuffer,這是一特別複雜的過程,這裡向大致的解釋一下,就是先將收集的KV,P寫入一個環形的緩衝區,然後在經過排序和分割槽將資料寫入到檔案中。(具體過程會在下面的shuffle中講解)
   - 最後mapOut結束之後,會呼叫close方法關閉output,在關閉時,會將剩餘在buffer環的資料緩衝出去,並且將所有一些的小檔案進行排序然後合併成一個大檔案。

2. shuffle過程詳解

MapReduce階段原始碼分析以及shuffle過程詳解
過程介紹

  • 假如在hdfs中儲存一個300M檔案,每個block的大小預設為128M,而且預設的切片大小也是128M,因此,每一個MapTask任務會處理一個split,則是有三個MapTask並行處理。
  • 每一個MapTask任務處理完成後,會通過收集器,將輸出的結果存入一個環形緩衝區中,寫入的過程會經過簡單的排序,這個環形緩衝區的預設是100M,當環形緩衝區的大小使用超過80%,一個後臺執行緒就會啟動把環形緩衝區中的資料寫入到磁碟檔案,同時Map會繼續向環形緩衝區中寫入資料。
  • 環形緩衝去的工作原理:
    • 環形緩衝區的大小預設為100M(可以配置mapred-site.xml:mapreduce.task.io.sort.mb)
    • 環形緩衝區的閾值為:80%((mapred-site.xml:mapreduce.map.sort.spill.percent,預設80%)
    • 在環形緩衝區中,儲存了兩種資料,一個是元資料:分割槽號,map的key的起始位置,map的value的起始位置,map的value的長度(每一個元資料長度為4個int長度,長度固定)
    • 一種是原始資料:存放map的key和value
    • 在儲存原始資料和元資料的時候,會將元資料和原始資料中間建立一個赤道,分割二者,然後不斷的向兩端寫入資料,在環形緩衝區的資料寫入到80%的時候,將這些資料鎖定,然後向硬碟中溢寫成小檔案,同時環形緩衝區的剩下的部分仍然可以寫資料,直到溢寫結束,鎖定釋放,繼續可以將元資料和原始資料寫入緩衝區中。
  • 緩衝區溢寫小檔案:在溢寫小檔案的時候,會對緩衝區中的元資料根據分割槽號和key進行排序,然後根據排序好的元資料,溢寫相應的原始資料(這是因為元資料的大小是固定的,比直接排序原始資料更容易),這樣最後就會溢寫出多個已經根據分割槽和key排序好的小檔案(這裡可以加入conbiner)
  • 對溢寫後的小檔案進行歸併:此時會將溢寫後的小檔案進行歸併成一個大檔案(使用歸併排序),此時合併的大檔案已經按照分割槽和key排好序,
  • reduce拉取相應的資料:Reducer 中的一個執行緒定期向MRAppMaster詢問Mapper輸出結果檔案位置,mapper結束後會向MRAppMaster彙報資訊,從而 Reducer 得知 Mapper 狀態,得到 map 結果檔案目錄;reduce會相應的拉取相同分割槽的小檔案到本地
  • 然後會將拉取得到的相應的相同分割槽的小檔案,進行歸併排序合併成為一個有序的大檔案(相同的key在一起)。
  • 然後根據分組規則,相同的key為一組呼叫一次reduce方法,處理資料
  • 最終將結果資料根據分割槽寫入到不同的分割槽檔案中。