1. 程式人生 > >MapReduce階段源碼分析以及shuffle過程詳解

MapReduce階段源碼分析以及shuffle過程詳解

不同 小文件 需要 因此 輸入輸出 map 定義 shu mas

MapReducer工作流程圖:
技術分享圖片

1. MapReduce階段源碼分析

1)客戶端提交源碼分析

技術分享圖片
解釋
   - 判斷是否打印日誌
   - 判斷是否使用新的API,檢查連接
   - 在檢查連接時,檢查輸入輸出路徑,計算切片,將jar、配置文件復制到HDFS
   - 計算切片時,計算最小切片數(默認為1,可自定義)和最大切片數(默認是long的最大值,可以自定義)
   - 查看給定的是否是文件,如果是否目錄計算目錄下所有文件的切片
   - 通過block大小和最小切片數、最大切片數計算出切片大小

   - 過切片大小,計算出map的數量以及分發到的節點
   - 提交job給yarn,進行MapReduce計算

2)map階段源碼分析源碼分析(Map 的input階段)

技術分享圖片
解釋
   - 首先Map Task任務,調用run()方法,run()方法會經過以下幾個階段
   - 初始化taskcontext對象
   - 對mapper對象的初始化,此處包括一個默認值的判斷,如果沒有自定義mapper類,默認用系統的Mapper
   - 對文件輸入的格式化,此處包括一個默認值的判斷,如果沒有自定義inputFormat類,默認用系統的TextinputFormat

   - 創建input對象,創建具體的文件讀取類,通過lineReader(),默認每次叠代讀取一行,此處實現一個叠代的判斷的nextKeyVaule(),並在nextKeyVaule實現時初始化key和value
   - Input初始化:計算打開位置,讀取文件內容,(放棄第一行)
   - 調用mapper的run方法循環讀取,直到末尾,多讀一行,start放棄第一行的數據被上一個切片讀到,註意這裏的run方法中就會調用我們編寫的Mapper類中的setup、map、cleanup方法

3)map階段源碼分析源碼分析(Map 的output階段)

技術分享圖片
解釋
   - 由newOutCollector創建output對象
   - newOutCollector中需要準備collector和partitions計算reduce數量,會將map端輸出的K,V,P(分區號)寫入collector中
   - 在準備collector實際上是準備MpaOutputBuffer,這是一特別復雜的過程,這裏向大致的解釋一下,就是先將收集的KV,P寫入一個環形的緩沖區,然後在經過排序和分區將數據寫入到文件中。(具體過程會在下面的shuffle中講解)
   - 最後mapOut結束之後,會調用close方法關閉output,在關閉時,會將剩余在buffer環的數據緩沖出去,並且將所有一些的小文件進行排序然後合並成一個大文件。

2. shuffle過程詳解

技術分享圖片
過程介紹

  • 假如在hdfs中存儲一個300M文件,每個block的大小默認為128M,而且默認的切片大小也是128M,因此,每一個MapTask任務會處理一個split,則是有三個MapTask並行處理。
  • 每一個MapTask任務處理完成後,會通過收集器,將輸出的結果存入一個環形緩沖區中,寫入的過程會經過簡單的排序,這個環形緩沖區的默認是100M,當環形緩沖區的大小使用超過80%,一個後臺線程就會啟動把環形緩沖區中的數據寫入到磁盤文件,同時Map會繼續向環形緩沖區中寫入數據。
  • 環形緩沖去的工作原理:
    • 環形緩沖區的大小默認為100M(可以配置mapred-site.xml:mapreduce.task.io.sort.mb)
    • 環形緩沖區的閾值為:80%((mapred-site.xml:mapreduce.map.sort.spill.percent,默認80%)
    • 在環形緩沖區中,存儲了兩種數據,一個是元數據:分區號,map的key的起始位置,map的value的起始位置,map的value的長度(每一個元數據長度為4個int長度,長度固定)
    • 一種是原始數據:存放map的key和value
    • 在存儲原始數據和元數據的時候,會將元數據和原始數據中間建立一個赤道,分割二者,然後不斷的向兩端寫入數據,在環形緩沖區的數據寫入到80%的時候,將這些數據鎖定,然後向硬盤中溢寫成小文件,同時環形緩沖區的剩下的部分仍然可以寫數據,直到溢寫結束,鎖定釋放,繼續可以將元數據和原始數據寫入緩沖區中。
  • 緩沖區溢寫小文件:在溢寫小文件的時候,會對緩沖區中的元數據根據分區號和key進行排序,然後根據排序好的元數據,溢寫相應的原始數據(這是因為元數據的大小是固定的,比直接排序原始數據更容易),這樣最後就會溢寫出多個已經根據分區和key排序好的小文件(這裏可以加入conbiner)
  • 對溢寫後的小文件進行歸並:此時會將溢寫後的小文件進行歸並成一個大文件(使用歸並排序),此時合並的大文件已經按照分區和key排好序,
  • reduce拉取相應的數據:Reducer 中的一個線程定期向MRAppMaster詢問Mapper輸出結果文件位置,mapper結束後會向MRAppMaster匯報信息,從而 Reducer 得知 Mapper 狀態,得到 map 結果文件目錄;reduce會相應的拉取相同分區的小文件到本地
  • 然後會將拉取得到的相應的相同分區的小文件,進行歸並排序合並成為一個有序的大文件(相同的key在一起)。
  • 然後根據分組規則,相同的key為一組調用一次reduce方法,處理數據
  • 最終將結果數據根據分區寫入到不同的分區文件中。

MapReduce階段源碼分析以及shuffle過程詳解