1. 程式人生 > >hadoop之mapreduce詳解(基礎篇)

hadoop之mapreduce詳解(基礎篇)

本篇文章主要從mapreduce執行作業的過程,shuffle,以及mapreduce作業失敗的容錯幾個方面進行詳解。

一、mapreduce作業執行過程

1.1、mapreduce介紹

     MapReduce是一種程式設計模型,用於大規模資料集(大於1TB)的並行運算。概念"Map(對映)"和"Reduce(歸約)",是它們的主要思想,都是從函數語言程式設計語言裡借來的,還有從向量程式語言裡借來的特性。它極大地方便了程式設計人員在不會分散式並行程式設計的情況下,將自己的程式執行在分散式系統上。 當前的軟體實現是指定一個Map(對映)函式,用來把一組鍵值對對映成一組新的鍵值對,指定併發的Reduce(歸約)函式,用來保證所有對映的鍵值對中的每一個共享相同的鍵組。 ---來源於百度百科

MapReduce是一個基於叢集的高效能平行計算平臺(Cluster Infrastructure)
MapReduce是一個平行計算與執行軟體框架(Software Framework)
MapReduce是一個並行程式設計模型與方法(Programming Model & Methodology)

      mapreduce是hadoop中一個批量計算的框架,在整個mapreduce作業的過程中,包括從資料的輸入,資料的處理,資料的資料輸入這些部分,而其中資料的處理部分就要map,reduce,combiner等操作組成。在一個mapreduce的作業中必定會涉及到如下一些元件:

1、客戶端,提交mapreduce作業
2、yarn資源管理器,負責叢集上計算資源的協調
3、yarn節點管理器,負責啟動和監控叢集中機器上的計算容器(container)
4、mapreduce的application master,負責協調執行mapreduce的作業
5、hdfs,分散式檔案系統,負責與其他實體共享作業檔案

1.2、作業執行過程

作業的執行過程主要包括如下幾個步驟:

1、作業的提交
2、作業的初始化
3、作業任務的分配
4、作業任務的執行
5、作業執行狀態更新
6、作業完成

具體作業執行過程的流程圖如下圖所示:

 

 

1.2.1、作業的提交

作業提交原始碼分析詳情見:hadoop2.7之作業提交詳解(上) hadoop2.7之作業提交詳解(下)

     在MR的程式碼中呼叫waitForCompletion()方法,裡面封裝了Job.submit()方法,而Job.submit()方法裡面會建立一個JobSubmmiter物件。當我們在waitForCompletion(true)時,則waitForCompletion方法會每秒輪詢作業的執行進度,如果發現與上次查詢到的狀態有差別,則將詳情列印到控制檯。如果作業執行成功,就顯示作業計數器,否則將導致作業失敗的記錄輸出到控制檯。

其中JobSubmmiter實現的大概過程如下:
1、向資源管理器resourcemanager提交申請,用於一個mapreduce作業ID,如圖步驟2所示
2、檢查作業的輸出配置,判斷目錄是否已經存在等資訊
3、計算作業的輸入分片的大小
4、將執行作業的jar,配置檔案,輸入分片的計算資源複製到一個以作業ID命名的hdfs臨時目錄下,作業jar的複本比較多,預設為10個(通過引數mapreduce.client.submit.file.replication控制),
5、通過資源管理器的submitApplication方法提交作業

1.2.2、作業的初始化

1、當資源管理器通過方法submitApplication方法被呼叫後,便將請求傳給了yarn的排程器,然後排程器在一個節點管理器上分配一個容器(container0)用來啟動application master(主類是MRAppMaster)程序。該程序一旦啟動就會向resourcemanager註冊並報告自己的資訊,application master並且可以監控map和reduce的執行狀態。因此application master對作業的初始化是通過建立多個薄記物件以保持對作業進度的跟蹤。

2、application master接收作業提交時的hdfs臨時共享目錄中的資原始檔,jar,分片資訊,配置資訊等。並對每一個分片建立一個map物件,以及通過mapreduce.job.reduces引數(作業通過setNumReduceTasks()方法設定)確定reduce的數量。

3、application master會判斷是否使用uber(作業與application master在同一個jvm執行,也就是maptask和reducetask執行在同一個節點上)模式執行作業,uber模式執行條件:map數量小於10個,1個reduce,且輸入資料小於一個hdfs塊

可以通過引數:

mapreduce.job.ubertask.enable   #是否啟用uber模式
mapreduce.job.ubertask.maxmaps   #ubertask的最大map數
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce數
mapreduce.job.ubertask.maxbytes #ubertask最大作業大小

4、application master呼叫setupJob方法設定OutputCommiter,FileOutputCommiter為預設值,表示建立做的最終輸出目錄和任務輸出的臨時工作空間

1.2.3、作業任務的分配

1、在application master判斷作業不符合uber模式的情況下,那麼application master則會向資源管理器為map和reduce任務申請資源容器。

2、首先就是為map任務發出資源申請請求,直到有5%的map任務完成時,才會為reduce任務所需資源申請發出請求。

3、在任務的分配過程中,reduce任務可以在任何的datanode節點執行,但是map任務執行的時候需要考慮到資料本地化的機制,在給任務指定資源的時候每個map和reduce預設為1G記憶體,可以通過如下引數配置:

mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores

1.2.4、作業任務的執行

       application master提交申請後,資源管理器為其按需分配資源,這時,application master就與節點管理器通訊來啟動容器。該任務由主類YarnChild的一個java應用程式執行。在執行任務之前,首先將所需的資源進行本地化,包括作業的配置,jar檔案等。接下來就是執行map和reduce任務。YarnChild在單獨的JVM中執行。

1.2.5、作業任務的狀態更新

    每個作業和它的每個任務都有一個狀態:作業或者任務的狀態(執行中,成功,失敗等),map和reduce的進度,作業計數器的值,狀態訊息或描述當作業處於正在執行中的時候,客戶端可以直接與application master通訊,每秒(可以通過引數mapreduce.client.progressmonitor.pollinterval設定)輪詢作業的執行狀態,進度等資訊。

1.2.6、作業的完成

當application master收到最後一個任務已完成的通知,便把作業的狀態設定為成功。
在job輪詢作業狀態時,知道任務已經完成,然後列印訊息告知使用者,並從waitForCompletion()方法返回。
當作業完成時,application master和container會清理中間資料結果等臨時問題。OutputCommiter的commitJob()方法被呼叫,作業資訊由作業歷史服務存檔,以便使用者日後查詢。

二、shuffle

   mapreduce確保每個reduce的輸入都是按照鍵值排序的,系統執行排序,將map的輸入作為reduce的輸入過程稱之為shuffle過程。shuffle也是我們優化的重點部分。shuffle流程圖如下圖所示:

 

 

 

2.1、map端

在生成map之前,會計算檔案分片的大小:計算原始碼詳見:hadoop2.7作業提交詳解之檔案分片

   然後會根據分片的大小計算map的個數,對每一個分片都會產生一個map作業,或者是一個檔案(小於分片大小*1.1)生成一個map作業,然後通過自定的map方法進行自定義的邏輯計算,計算完畢後會寫到本地磁碟。

     在這裡不是直接寫入磁碟,為了保證IO效率,採用了先寫入記憶體的環形緩衝區,並做一次預排序(快速排序)。緩衝區的大小預設為100MB(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入記憶體緩衝區的大小到達一定比例時,預設為80%(可通過mapreduce.map.sort.spill.percent配置項修改),將啟動一個溢寫執行緒將記憶體緩衝區的內容溢寫到磁碟(spill to disk),這個溢寫執行緒是獨立的,不影響map向緩衝區寫結果的執行緒,在溢寫到磁碟的過程中,map繼續輸入到緩衝中,如果期間緩衝區被填滿,則map寫會被阻塞到溢寫磁碟過程完成。溢寫是通過輪詢的方式將緩衝區中的記憶體寫入到本地mapreduce.cluster.local.dir目錄下。在溢寫到磁碟之前,我們會知道reduce的數量,然後會根據reduce的數量劃分分割槽,預設根據hashpartition對溢寫的資料寫入到相對應的分割槽。在每個分割槽中,後臺執行緒會根據key進行排序,所以溢寫到磁碟的檔案是分割槽且排序的。如果有combiner函式,它在排序後的輸出執行,使得map輸出更緊湊。減少寫到磁碟的資料和傳輸給reduce的資料。

    每次環形換衝區的記憶體達到閾值時,就會溢寫到一個新的檔案,因此當一個map溢寫完之後,本地會存在多個分割槽切排序的檔案。在map完成之前會把這些檔案合併成一個分割槽且排序(歸併排序)的檔案,可以通過引數mapreduce.task.io.sort.factor控制每次可以合併多少個檔案。

    在map溢寫磁碟的過程中,對資料進行壓縮可以提交速度的傳輸,減少磁碟io,減少儲存。預設情況下不壓縮,使用引數mapreduce.map.output.compress控制,壓縮演算法使用mapreduce.map.output.compress.codec引數控制。

2.2、reduce端

       map任務完成後,監控作業狀態的application master便知道map的執行情況,並啟動reduce任務,application master並且知道map輸出和主機之間的對應對映關係,reduce輪詢application master便知道主機所要複製的資料。

     一個Map任務的輸出,可能被多個Reduce任務抓取。每個Reduce任務可能需要多個Map任務的輸出作為其特殊的輸入檔案,而每個Map任務的完成時間可能不同,當有一個Map任務完成時,Reduce任務就開始執行。Reduce任務根據分割槽號在多個Map輸出中抓取(fetch)對應分割槽的資料,這個過程也就是Shuffle的copy過程。。reduce有少量的複製執行緒,因此能夠並行的複製map的輸出,預設為5個執行緒。可以通過引數mapreduce.reduce.shuffle.parallelcopies控制。

    這個複製過程和map寫入磁碟過程類似,也有閥值和記憶體大小,閥值一樣可以在配置檔案裡配置,而記憶體大小是直接使用reduce的tasktracker的記憶體大小,複製時候reduce還會進行排序操作和合並檔案操作。

   如果map輸出很小,則會被複制到Reducer所在節點的記憶體緩衝區,緩衝區的大小可以通過mapred-site.xml檔案中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在節點的記憶體緩衝區達到閥值,或者緩衝區中的檔案數達到閥值,則合併溢寫到磁碟。

   如果map輸出較大,則直接被複制到Reducer所在節點的磁碟中。隨著Reducer所在節點的磁碟中溢寫檔案增多,後臺執行緒會將它們合併為更大且有序的檔案。當完成複製map輸出,進入sort階段。這個階段通過歸併排序逐步將多個map輸出小檔案合併成大檔案。最後幾個通過歸併合併成的大檔案作為reduce的輸出

2.3、總結

當Reducer的輸入檔案確定後,整個Shuffle操作才最終結束。之後就是Reducer的執行了,最後Reducer會把結果存到HDFS上。

在Hadoop叢集環境中,大部分map 任務與reduce任務的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去拉取其它節點上的map任務結果。如果叢集正在執行的job有很多,那麼task的正常執行對叢集內部的網路資源消耗會很嚴重。這種網路消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於記憶體,磁碟IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的期望可以有:

1、完整地從map task端拉取資料到reduce 端。
2、在跨節點拉取資料時,儘可能地減少對頻寬的不必要消耗。
3、減少磁碟IO對task執行的影響。

在MapReduce計算框架中,主要用到兩種排序演算法:快速排序和歸併排序。在Map任務發生了2次排序,Reduce任務發生一次排序:

1、第1次排序發生在Map輸出的記憶體環形緩衝區,使用快速排序。當緩衝區達到閥值時,在溢寫到磁碟之前,後臺執行緒會將緩衝區的資料劃分成相應分割槽,在每個分割槽中按照鍵值進行內排序。

2、第2次排序是在Map任務輸出的磁碟空間上將多個溢寫檔案歸併成一個已分割槽且有序的輸出檔案。由於溢寫檔案已經經過一次排序,所以合併溢寫檔案時只需一次歸併排序即可使輸出檔案整體有序。

3、第3次排序發生在Shuffle階段,將多個複製過來的Map輸出檔案進行歸併,同樣經過一次歸併排序即可得到有序檔案。

三、作業失敗和容錯

既然有作業的執行,肯定會有作業的失敗,作業的失敗(不考慮硬體,平臺原因引起的失敗)可能會存在不同的問題,如下:

3.1、任務執行失敗

   使用者程式碼丟擲異常(程式碼沒寫好):這種情況任務JVM會在退出之前向application master傳送錯誤報告,並記錄進使用者日誌,application master對該作業標記為failed,並釋放掉佔有的資源容器。

     另一種就是JVM突然退出,這種情況節點管理器會注意到程序已經退出,並通知application master將此任務標記為失敗,如果是因為推測執行而導致任務被終止,則不會被被標記為失敗。而任務掛起又不同,一旦application master注意到有一段時間沒有收到進度更新,便會把任務標記為失敗,預設為10分鐘,引數mapreduce.task.timeout控制application master被告知一個任務失敗,將會重新排程該任務執行(會在與之前失敗的不同節點上執行),預設重試4次,如果四次都失敗,則作業判定為失敗,引數控制為:

mapreduce.map.maxattempts
mapreduce.reduce.maxattempts

3.2、application master執行失敗

AM也可能由於各種原因(如網路問題或者硬體故障)失效,Yarn同樣會嘗試重啟AM
可以為每個作業單獨配置AM的嘗試重啟次數:mapreduce.am.max-attempts,預設值為2
Yarn中的上限一起提高:yarn.resourcemanager.am.nax-attempts,預設為2,單個應用程式不可以超過這個限制,除非同時修改這兩個引數。

恢復過程:application master向資源管理器傳送週期性的心跳。當application master失敗時,資源管理器會檢測到該失敗,並在一個新的容器中啟動application master,並使用作業歷史來恢復失敗的應用程式中的執行任務狀態,使其不必重新執行,預設情況下恢復功能是開啟的,yarn.app.mapreduce.am.job.recovery.enable控制客戶端向application master輪詢作業狀態時,如果application master執行失敗了,則客戶端會向資源管理器resourcemanager詢問和快取application master地址。

3.3、節點管理器執行失敗

    如果節點管理器崩潰或者執行非常緩慢,則就會停止向資源管理器傳送心跳資訊,如果10分鐘(可以通過引數yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms設定)資源管理器沒有收到一條心跳資訊,則資源管理器將會通知停止傳送心跳的節點管理器,並將其從自己的資源池中移除該節點管理器,在該節點上的application master和任務的失敗,都通過如上兩種恢復機制進行恢復。

3.4、資源管理器執行失敗

   資源管理器失敗時一個很嚴重的問題,所有的任務將不能被分配資源,作業和容器都無法啟動,那麼整個通過yarn控制資源的叢集都處於癱瘓狀態。

容錯機制:resourcemanager HA 詳情見:hadoop高可用安裝和原理詳解

 

更多hadoop生態文章見: hadoop生態系列

 

參考:

《Hadoop權威指南 大資料的儲存與分析 第四