1. 程式人生 > >Task執行過程分析5——ReduceTask內部實現

Task執行過程分析5——ReduceTask內部實現

與MapTask一樣,ReduceTask也分為四種,即Job-setup Task,Job-cleanup Task,Task-cleanup Task和Reduce Task。本文重點介紹第四種——普通Reduce Task。
Reduce Task要從各個Map Task上讀取一片資料,經排序後,以組為單位交給使用者編寫的reduce()函式處理,並將結果寫到HDFS上。本文將深入剖析ReduceTask內部各個階段的實現原理。

Reduce Task整體流程
Reduce Task的整體計算流程如圖所示,共分為5個階段
1、Shuffle階段:也稱為Copy階段。Reduce Task從各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。
2、Merge階段:在遠端拷貝資料的同時,Reduce Task啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。
3、Sort階段:按照MapReduce語義,使用者編寫的reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。
4、Reduce階段:在該階段中,ReduceTask將每組資料依次交給使用者編寫的reduce()函式處理
5、Write階段:reduce()函式將計算結果寫到HDFS上
這裡寫圖片描述

1、Shuffle和Merge階段分析
在Reduce Task中,Shuffle階段和Merge階段是並行進行的。當遠端拷貝資料量達到一定閾值後,便會觸發相應的合併執行緒對資料進行合併。這兩個階段均是由類org.apache.hadoop.mapred.ReduceTask的內部類ReduceCopier實現的。
總體上看,Shuffle&Merge階段可進一步劃分為三個子階段。
(1)準備執行完成的Map Task列表
GetMapEventsThread執行緒週期性通過RPC從TaskTracker獲取已完成Map Task列表,並儲存到對映表mapLocations(儲存了TaskTracker Host與已完成任務列表的對映關係)中。為防止出現網路熱點,Reduce Task通過對所有TaskTracker Host進行“混洗”操作以打亂資料拷貝順序,並將調整後的Map Task輸出資料位置儲存到scheduledCopies列表中。
(2)遠端拷貝資料
Reduce Task同時啟動多個MapOutputCopier執行緒,這些執行緒從scheduledCopies列表中獲取MapTask輸出位置,並通過HTTP Get遠端拷貝資料。對於獲取的資料分片,如果大小超過一定閾值,則存放到磁碟上,否則直接放到記憶體中。
(3)合併記憶體檔案和磁碟檔案
為了防止記憶體或者磁碟上的檔案資料過多,ReduceTask啟動了LocalFSMerger和InMemFsMergeThread兩個執行緒分別對記憶體和磁碟上的檔案進行合併。
這裡寫圖片描述


接下來,我們將詳細剖析每個階段的內部實現細節
(1)準備執行完成的MapTask列表
TaskTracker啟動了MapEventsFetcherThread執行緒。該執行緒會週期性(週期性為心跳時間間隔)通過RPC從JobTracker上獲取已經執行完成的MapTask列表,並儲存到TaskCompletionEvent型別列表allMapEvents中。
而對於ReduceTask而言,它會啟動GetMapEventsThread執行緒。該執行緒週期性通過RPC從TaskTracker上獲取已執行完成的MapTask列表,並將成功執行完成的MapTask放到列表mapLocations中。
為了避免出現數據訪問熱點(大量程序集中讀取某個TaskTracker上的資料),ReduceTask不會直接將列表mapLocations中的Map Task輸出資料位置交給MapOutputCopier執行緒,而是事先進行一次預處理:將所有TaskTracker Host進行混洗操作(隨機打亂順序),然後儲存到scheduledCopies列表中,而OutputCopier執行緒將從該列表中獲取待拷貝的MapTask輸出資料位置。需要注意的是,對於一個TaskTracker而言,曾拷貝失敗的MapTask將優先獲得拷貝機會。
(2)遠端拷貝資料
Reduce Task同時啟動mapred.reduce.parallel.copies(預設是5)個數據拷貝執行緒MapOutputCopier。該執行緒從scheduledCopies列表中獲取MapTask資料輸出描述物件,並利用HTTP Get從對應的Task Tracker遠端拷貝資料,如果資料分片大小超過一定閾值,則將資料臨時寫到工作目錄下,否則直接儲存到記憶體中。不管是儲存到記憶體中還是磁碟上,MapOutputCopier均會儲存一個MapOutput物件描述資料的元資訊。如果資料被儲存到記憶體中,則將該物件新增到列表mapOutputsFilsInMemory中,否則將該物件儲存到列表mapOutputFilesOnDisk中。
在Reduce Task中,大部分記憶體用於快取從MapTask端拷貝的資料分片,這些記憶體佔到JVM Max Heap Size(由引數-Xmx指定)的mapred.job.shuffle.input.buffer.percent(預設是0.70)倍,並由類ShuffleRamManager管理。Reduce Task規定,如果一個數據分片大小未超過該記憶體的0.25倍,則可存放到記憶體中。如果MapOutputCopier執行緒要拷貝的資料分片可存放到記憶體中,則它先要向ShuffleRamManager申請相應的記憶體,待同意後才會正式拷貝資料,否則需要等待它釋放記憶體。
由於遠端拷貝資料可能需要跨網路讀取多個節點上的資料,期間很容易由於網路或者磁碟等原因造成讀取失敗,因此提供良好的容錯機制是非常必要的。當出現拷貝錯誤時,ReduceTask提供了以下幾個容錯機制:
1、如果拷貝資料出錯次數超過abortFailureLimit,則殺死該ReduceTask(等待排程器重新排程執行),其中,abortFailureLimit計算方法如下:
abortFailureLimit = max{30,numMaps/10}
2、如果拷貝資料出錯次數超過maxFetchFailuresBeforeReporting(可通過引數mapreduce.reduce.shuffle.maxfetchfailures設定,預設是10),則進行一些必要的檢查,以絕對是否殺死該Reduce Task
3、如果前兩個條件均不滿足,則採用對數迴歸模型推遲一段時間後重新拷貝對應MapTask的輸出資料,其中延遲時間delayTime的計算方法如下:
delayTime =10000*1.3的noFailedFetches次方,其中noFailedFetches為拷貝錯誤次數。

(3)合併記憶體檔案和磁碟檔案
前面提到,ReduceTask從MapTask端拷貝的資料,可能儲存到記憶體或者磁碟上。隨著拷貝資料的增多,記憶體或者磁碟上的檔案數目也必將增加,為了減少檔案數目,在資料拷貝過程中,執行緒LocalFSMerger和InMemFSMergeThread將分別對記憶體和磁碟上的檔案進行合併。
對於磁碟上檔案,當檔案數目超過(2*ioSortFactor-1)後(ioSortFactor值由引數io.sort.factor指定,預設是10),執行緒LocalFSMerger會從列表mapOutputFilesOnDisk中取出最小的ioSortFactor個檔案進行合併,並將合併後的檔案再次寫到磁碟上。
對於記憶體中的檔案,當滿足以下幾個條件之一時InMemFSMergeThread執行緒會將記憶體中所有資料合併後寫到磁碟上:
1、所有資料拷貝完畢後,關閉ShuffleRamManager
2、ShuffleRamManager中已使用記憶體超過可用記憶體的mapred.job.shuffle.merge.percent(預設是66%)倍且記憶體檔案數目超過2個。
3、記憶體中的檔案數目超過mapred.inmem.merge.threshold(預設是1000)。
4、阻塞在ShuffleRamManager上的請求數目超過拷貝執行緒數目mapred.reduce.parallel.copies的75%

2、Sort和Reduce階段分析
當所有資料拷貝完成後,資料可能存放在記憶體中或者磁碟上,此時還不能將資料直接交給使用者編寫的reduce()函式處理。根據MapReduce語義,ReduceTask需將key值相同的資料聚集到一起,並按組將資料交給reduce()函式處理。為此,Hadoop採用了基於排序的資料聚集策略。前面提到,各個Map Task已經事先對自己的輸出分片進行了區域性排序,因此,Reduce Task只需進行一次歸併排序即可保證資料整體有序。為了提高效率,Hadoop將Sort階段和Reduce階段並行化。在Sort階段,Reduce Task為記憶體和磁碟中的檔案建立了小頂堆,儲存了指向該小頂堆根節點的迭代器,且該迭代器保證了以下兩個約束條件:
1、磁碟上檔案數目小於io.sort.factor(預設是10)。
2、當Reduce階段開始時,記憶體中資料量小於最大可用記憶體(JVM Max Heap Size)的mapred.job.reduce.input.buffer.percent(預設是0)。
在Reduce階段,Reduce Task不斷地移動迭代器,以將key 相同的資料順次交給reduce()函式處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程,這樣,Sort和Reduce可並行進行。