MapReduce階段源碼分析以及shuffle過程詳解
1. MapReduce階段源碼分析
1)客戶端提交源碼分析
解釋:
- 判斷是否打印日誌
- 判斷是否使用新的API,檢查連接
- 在檢查連接時,檢查輸入輸出路徑,計算切片,將jar、配置文件復制到HDFS
- 計算切片時,計算最小切片數(默認為1,可自定義)和最大切片數(默認是long的最大值,可以自定義)
- 查看給定的是否是文件,如果是否目錄計算目錄下所有文件的切片
- 通過block大小和最小切片數、最大切片數計算出切片大小
- 提交job給yarn,進行MapReduce計算
2)map階段源碼分析源碼分析(Map 的input階段)
解釋:
- 首先Map Task任務,調用run()方法,run()方法會經過以下幾個階段
- 初始化taskcontext對象
- 對mapper對象的初始化,此處包括一個默認值的判斷,如果沒有自定義mapper類,默認用系統的Mapper
- 對文件輸入的格式化,此處包括一個默認值的判斷,如果沒有自定義inputFormat類,默認用系統的TextinputFormat
- Input初始化:計算打開位置,讀取文件內容,(放棄第一行)
- 調用mapper的run方法循環讀取,直到末尾,多讀一行,start放棄第一行的數據被上一個切片讀到,註意這裏的run方法中就會調用我們編寫的Mapper類中的setup、map、cleanup方法
3)map階段源碼分析源碼分析(Map 的output階段)
解釋:
- 由newOutCollector創建output對象
- newOutCollector中需要準備collector和partitions計算reduce數量,會將map端輸出的K,V,P(分區號)寫入collector中
- 在準備collector實際上是準備MpaOutputBuffer,這是一特別復雜的過程,這裏向大致的解釋一下,就是先將收集的KV,P寫入一個環形的緩沖區,然後在經過排序和分區將數據寫入到文件中。(具體過程會在下面的shuffle中講解)
- 最後mapOut結束之後,會調用close方法關閉output,在關閉時,會將剩余在buffer環的數據緩沖出去,並且將所有一些的小文件進行排序然後合並成一個大文件。
2. shuffle過程詳解
過程介紹:
- 假如在hdfs中存儲一個300M文件,每個block的大小默認為128M,而且默認的切片大小也是128M,因此,每一個MapTask任務會處理一個split,則是有三個MapTask並行處理。
- 每一個MapTask任務處理完成後,會通過收集器,將輸出的結果存入一個環形緩沖區中,寫入的過程會經過簡單的排序,這個環形緩沖區的默認是100M,當環形緩沖區的大小使用超過80%,一個後臺線程就會啟動把環形緩沖區中的數據寫入到磁盤文件,同時Map會繼續向環形緩沖區中寫入數據。
- 環形緩沖去的工作原理:
- 環形緩沖區的大小默認為100M(可以配置mapred-site.xml:mapreduce.task.io.sort.mb)
- 環形緩沖區的閾值為:80%((mapred-site.xml:mapreduce.map.sort.spill.percent,默認80%)
- 在環形緩沖區中,存儲了兩種數據,一個是元數據:分區號,map的key的起始位置,map的value的起始位置,map的value的長度(每一個元數據長度為4個int長度,長度固定)
- 一種是原始數據:存放map的key和value
- 在存儲原始數據和元數據的時候,會將元數據和原始數據中間建立一個赤道,分割二者,然後不斷的向兩端寫入數據,在環形緩沖區的數據寫入到80%的時候,將這些數據鎖定,然後向硬盤中溢寫成小文件,同時環形緩沖區的剩下的部分仍然可以寫數據,直到溢寫結束,鎖定釋放,繼續可以將元數據和原始數據寫入緩沖區中。
- 緩沖區溢寫小文件:在溢寫小文件的時候,會對緩沖區中的元數據根據分區號和key進行排序,然後根據排序好的元數據,溢寫相應的原始數據(這是因為元數據的大小是固定的,比直接排序原始數據更容易),這樣最後就會溢寫出多個已經根據分區和key排序好的小文件(這裏可以加入conbiner)
- 對溢寫後的小文件進行歸並:此時會將溢寫後的小文件進行歸並成一個大文件(使用歸並排序),此時合並的大文件已經按照分區和key排好序,
- reduce拉取相應的數據:Reducer 中的一個線程定期向MRAppMaster詢問Mapper輸出結果文件位置,mapper結束後會向MRAppMaster匯報信息,從而 Reducer 得知 Mapper 狀態,得到 map 結果文件目錄;reduce會相應的拉取相同分區的小文件到本地
- 然後會將拉取得到的相應的相同分區的小文件,進行歸並排序合並成為一個有序的大文件(相同的key在一起)。
- 然後根據分組規則,相同的key為一組調用一次reduce方法,處理數據
- 最終將結果數據根據分區寫入到不同的分區文件中。
MapReduce階段源碼分析以及shuffle過程詳解