1. 程式人生 > >hadoop之mapreduce詳解(進階篇)

hadoop之mapreduce詳解(進階篇)

上篇文章hadoop之mapreduce詳解(基礎篇)我們瞭解了mapreduce的執行過程和shuffle過程,本篇文章主要從mapreduce的元件和輸入輸出方面進行闡述。

一、mapreduce作業控制模組以及其他功能

mapreduce包括作業控制模組,程式設計模型,資料處理引擎。這裡我們重點闡述作業控制模組MRAppMaster。

1.1、MRAppMaster的構成

MRAppMaster主要有如下幾個元件構成,如下圖所示:

1、ContainerAllocator:與resourcemanager通訊,為mapreduce申請資源,作業的所需資源描述為<priority,hostname,capability,containers,relax_locality>5元組格式,分別表示作業的優先順序,期望資源所在的節點,資源量,container數目,是否鬆弛本地性。ContainerAllocator週期性的通過RPC與resourcemanager通訊,而resourcemanager則通過心跳應答的方式為之返回所需的container列表,完成的container列表等資訊。

ContainerAllocator工作流程:
步驟1:將Map Task的資源需求傳送給RM;
步驟2:如果達到了Reduce Task的排程條件,則開始為Reduce Task申請資源;
步驟3:如果為某個Task申請到了資源,則取消其他重複資源的申請。由於在HDFS中,任何一份資料通常有三個備份,而對於一個任務而言,考慮到rack和any級別的本地性,它可能會對應7個資源請求
步驟4:如果任務執行失敗,則會重新為該任務申請資源;
步驟5:如果一個任務執行速度過慢,則會為其額外申請資源以啟動備份任務(如果啟動了推測執行功能);
步驟6:如果一個節點失敗的任務數目過多,則會撤銷對該節點的所有資源的申請請求

2、ClientServer:實現了MRClientprotocol協議,客戶端可以通過該協議獲取到作業的執行狀態(不需要通過resourcemanager)和控制作業(比如殺死作業,改變作業的優先順序等)

3、Job:是一個mapreduce作業,負責監控作業的執行狀態,維護一個作業的狀態機,實現非同步執行各種作業的相關操作

4、Task:一個mapreduce作業中一個任務,負責監控一個任務的執行狀態,維護一個任務的狀態機,實現非同步執行各種任務的相關操作。

5、TaskAttempt:表示一個執行例項

6、TaskCleaner:負責清理失敗任務或者殺死任務使用的目錄和產生的臨時結果,維護一個執行緒池和一個共享佇列,非同步刪除任務產生的垃圾資料

7、Speculator:完成推測執行功能,當一個任務在執行速度上明顯慢於其他任務的時候,Speculator將會啟動一個功能相同的任務,先執行完成的任務會kill掉沒執行完的那個作業。

8、ContainerLauncher:負責與NodeManager通訊,以啟動container

9、TaskAttempListener:負責各個任務的心跳資訊,如果一個任務一段時間內未彙報心跳資訊,則認為該任務死掉了,會將其從系統中移除。

10、JobHistoryEventHandler:負責各個作業的事件記錄日誌,比如作業的建立,執行等,都會寫入hdfs的指定目錄下,對作業的恢復很有用。

 

1.2、mapreduce客戶端

     是使用者和yarn通訊的唯一途徑,通過該客戶端,使用者可以向yarn提交作業,獲取作業的執行狀態,以及控制作業(殺死作業或者任務),該客戶端設計到兩個通訊協議:

ApplicationClientProtocol:resourcemanager實現了該協議,客戶端需要通過該協議提交作業,殺死作業,改變作業的優先順序等操作

MRClientProtocol:當作業啟動Application Master後,會啟動MRClientServer服務,該服務實現了MRClientProtocol協議,從而允許使用者直接通過該協議直接與Application Master通訊,獲取作業的執行狀態和控制作業,減輕resourcemanager的壓力。

1.3、MRAppMaster工作流程

作業的執行分為local模式,yarn的uber模式和yarn的非uber模式:

      首先看local模式和yarn模式的選擇:客戶端通過JobClient提交作業時,會通過java標準庫中的Serverloader動態載入所有的ClientProtocolProvider的實現。預設情況下有兩種實現:LocalClientProcotolProvider和YarnClientProcotolProvider。如果在配置中引數mapreduce.framework.mode設定為yarn時,客戶端則會採用YarnClientProcotolProvider,建立一個YarnRunner物件作為真正的客戶端,這樣就可以通過YarnRunner.submitJob方法提交給yarn作業了。在該方法的內部實現會進一步呼叫ApplicationClientProcotol的submitApplication方法,提交作業給Resourcemanager。原始碼詳情可見:hadoop2.7之作業提交詳解(上)

uber模式和非uber模式的選擇:

uber模式是小作業的一個優化,MrAppMaster不會再為每一個任務申請資源,而是讓其重用一個container,map和reduce會在同一份資源上序列執行。

uber模式條件:

mapreduce.job.ubertask.enable #是否啟用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map數 (預設9)
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce數 (預設1)
mapreduce.job.ubertask.maxbytes #ubertask最大作業大小 (預設為block.size)
map和reduce使用的資源不得超過MRAppMaster可使用的資源

滿足如上條件則使用yarn的uber模式執行, 否則為非uber模式執行

在yarn上執行mapreduce作業需要解決兩個問題:
1、reduce作業啥時候啟動比較合適

由引數mapreduce.job.reduce.slowstart.completedmaps控制,表示當Map Task完成的比例達到該值後才會為Reduce Task申請資源,預設是0.05;

2、怎樣完成shuffle過程

當用戶向YARN中提交一個MapReduce應用程式後,YARN將分兩個階段執行該應用程式:
第一個階段是由ResourceManager啟動MRAppMaster;
第二個階段是由MRAppMaster建立應用程式,為它申請資源,並監控它的整個執行過程,直到執行完成。

具體的執行流程圖如下:

步驟1:使用者向YARN中提交應用程式,其中包括MRAppMaster程式、啟動MRAppMaster的命令、使用者程式等;
步驟2:ResourceManager為該應用程式分配第一個Container,並與對應的NodeManager通訊,要求它在這個Container中啟動應用程式的MRAppMaster;
步驟3:MRAppMaster啟動後,首先向ResourceManager註冊,這樣使用者可以直接通過ResourceManager檢視應用程式的執行狀態,之後,它將為內部任務申請資源,並監控它們的執行狀態,直到執行結束,即重複步驟4~7;
步驟4:MRAppMaster採用輪詢的方式通過RPC協議向ResourceManager申請和領取資源;
步驟5:一旦MRAppMaster申請到資源後,則與對應的NodeManager通訊,要求它啟動任務;
步驟6:NodeManager為任務設定好執行環境(包括環境變數、JAR包、二進位制程式等)後,將任務啟動命令寫到一個指令碼中,並通過執行該指令碼啟動任務;
步驟7:各個任務通過RPC協議向MRAppMaster彙報自己的狀態和進度,以讓MRAppMaster隨時掌握各個任務的執行狀態,從而可以在任務失敗時重新啟動任務;
步驟8:應用程式執行完成後,MRAppMaster向ResourceManager登出並關閉自己

1.4、推測執行

    在分散式叢集環境下,因軟體Bug、負載不均衡或者資源分佈不均等原因,造成同一個作業的多個任務之間執行速度不一致,有的任務執行速度明顯慢於其他任務(比如某個時刻,一個作業的某個任務進度只有10%,而其他所有Task已經執行完畢),則這些任務將拖慢作業的整體執行進度。為了避免這種情況發生,運用推測執行(Speculative Execution)機制,Hadoop會為該任務啟動一個備份任務,讓該備份任務與原始任務同時處理一份資料,誰先執行完成,則將誰的結果作為最終結果。

    推測執行演算法的核心思想是:某一時刻,判斷一個任務是否拖後腿或者是否是值得為其啟動備份任務,採用的方法為,先假設為其啟動一個備份任務,則可估算出備份任務的完成時間estimatedEndTime2;同樣地,如果按照此刻該任務的計算速度,可估算出該任務最有可能的完成時間estimatedEndTime1,這樣estimatedEndTime1與estimatedEndTime2之差越大,表明為該任務啟動備份任務的價值越大,則傾向於為這樣的任務啟動備份任務。

    這種演算法的最大優點是,可最大化備份任務的有效率,其中有效率是有效備份任務數與所有備份任務數的比值,有效備份任務是指完成時間早於原始任務完成時間的備份任務(即帶來實際收益的備份任務)。備份任務的有效率越高、推測執行演算法就越優秀,帶來的收益也就越大。

   推測執行機制實際上採用了經典的演算法優化方法:以空間換時間,它同時啟動多個相同任務處理同一份資料,並讓這些任務競爭以縮短資料處理時間,顯然這種方法需要佔用更多的計算資源,在叢集資源緊缺的情況下,應合理使用該機制,爭取在多用少量資源情況下,減少大作業的計算時間。

引數控制:

mapreduce.map.speculative
mapreduce.reduce.speculative

 二、mapreduce輸入輸出格式

2.1、輸入格式

2.1.1、分片

在FileInputFormat中,分片計算原始碼詳見:hadoop2.7作業提交詳解之檔案分片

計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize))
minSize的預設值是1,而maxSize的預設值是long型別的最大值,即可得切片的預設大小是blockSize(128M)
maxSize引數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個引數的值
minSize引數調的比blockSize大,則可以讓切片變得比blocksize還大

hadoop為每個分片構建一個map任務,可以並行處理多個分片上的資料,整個資料的處理過程將得到很好的負載均衡,因為一臺效能較強的計算機能處理更多的資料分片,分片也不能切得太小,否則多個map和reduce間資料的傳輸時間,管理分片,構建多個map任務的時間將決定整個作業的執行時間.(大部分時間都不在計算上)如果檔案大小小於128M,則該檔案不會被切片,不管檔案多小都會是一個單獨的切片,交給一個maptask處理.如果有大量的小檔案,將導致產生大量的maptask,大大降低叢集效能.

注意:分片本身不包含資料本身,而是指向資料的引用,儲存位置供mapreduce系統使用以便使得map任務儘量資料本地化,而分片的大小用來排序,以便優先處理大的分片,
從而做小化作業執行時間。

2.1.2、小檔案處理

小檔案不僅會增加NameNode的儲存壓力,還會增加執行作業時的定址次數,也會造成map的大批量增加,所以處理小檔案是必要的。

1、 在資料處理的前端就將小檔案整合成大檔案,再上傳到hdfs上,即避免了hdfs不適合儲存小檔案的缺點,又避免了後期使用mapreduce處理大量小檔案的問題。(最提倡的做法)

2、小檔案已經存在hdfs上了,可以使用另一種inputformat來做切片(CombineFileInputFormat),它的切片邏輯和FileInputFormat(預設)不同,它可以將多個小檔案在邏輯上規劃到一個切片上,交給一個maptask處理。

2.1.3、如何避免切片

1、動態調整blocksize

2、重寫isSplitable()方法,返回false

2.1.4、文字行整條資料分片儲存

文字行一行記錄是否會被切分存放在兩個分片上,又如何保證資料不丟失和資料不重複。

事實上,Hadoop對這種某一行跨兩個分片的情況進行了特殊的處理。
通常Hadoop使用的InputSplit是FileSplit,一個FileSplit主要儲存了三個資訊<path, start, 分片length>。假設根據設定分片大小為100,那麼一個250位元組大小的檔案切分之後,我們會得到如下的FileSplit:
<path, 0, 100>
<path, 100, 100>
<path, 200, 50>
(具體的切分演算法可以參考FileInputFormat的實現)

因此,事實上,每個MapReduce程式得到的只是類似<path, 0, 100>的資訊。當MapReduce程式開始執行時,會根據path構建一個FSDataInputStream,定位到start,然後開始讀取資料。在處理一個FileSplit的最後一行時,當讀取到一個FileSplit的最後一個字元時,如果不是換行符,那麼會繼續讀取下一個FileSplit的內容,直到讀取到下一個FileSplit的第一個換行符。這樣子就保證我們不會得到一個不完整的行了。

那麼當MapReduce在處理下一個FileSplit的時候,怎麼知道上一個FileSplit有沒有已經處理了這個FileSplit的第一行內容?
我們只需要檢查一下前一個FileSplit的最後一個字元是不是換行符,如果是,那麼當前Split的第一行還沒有被處理,如果不是,表示當前Split的第一行已經被處理,我們應該跳過。
在LineRecordReader中,使用了一個很巧妙的方法來實現上述的邏輯,把當前FileSplit的start減一,然後跳過第一行(下面是這個程式碼片斷)

}else{
if(start!= 0) {
skipFirstLine =true;
--start;
 fileIn.seek(start);
}
in=newLineReader(fileIn, job, recordDelimiter);
 }
if(skipFirstLine) {// skip first line and re-establish "start".
start+=in.readLine(newText(), 0,
(int)Math.min((long)Integer.MAX_VALUE,end-start));
}

2.1.5、常用的輸入格式

 

 

 FileInputFormat是所有使用檔案作為其資料來源的InputFormat的基類,其提供了兩個功能,一個是指出作業的輸入檔案位置,一個是為輸入檔案生成分片的程式碼實現。把分片分割成記錄的實現由其子類完成,FileInputFormat層次結構圖如下:

 

 

 

2.2、輸出格式

針對上一節的輸入格式,都會有相對應的輸入格式,OutputFormat層次結構圖如下:

 

 

 常用的輸出格式:

 

 

 

更多hadoop生態文章見: hadoop生態系列

參考:

https://blog.csdn.net/appstore81/article/details/15027767
https://www.cnblogs.com/52mm/p/p15.html
https://blog.csdn.net/u011812294/article/details/63262624
https://blog.csdn.net/penggougoude/article/details/82432802#commentBox

《Hadoop權威指南 大資料的儲存與分析 第四版》

《hadoop技術內幕深入解析yarn架構設計與實現原