@
前言-MR概述
MapReduce是一個分散式計算框架,是使用者開發“基於Hadoop的資料分析應用”的核心框架。主要由兩部分組成:程式設計模型和執行時環 境。其中,程式設計模型為使用者提供了非常易用的程式設計介面,使用者只需要像編寫序列程式 一樣實現幾個簡單的函式即可實現一個分散式程式,而其他比較複雜的工作,如節點 間的通訊、節點失效、資料切分等,全部由MapReduce執行時環境完成,使用者無須 關心這些細節。
1.Hadoop MapReduce設計思想及優缺點
設計思想
Hadoop MapReduce誕生於搜尋領域,主要解決搜尋引擎面臨的海量資料處理擴充套件性差的問 題。它的實現很大程度上借鑑了谷歌MapReduce的設計思想,包括簡化程式設計介面、 提高系統容錯性等。
優點:
易於程式設計:
傳統的分散式程式設計(如MPI)非常複雜,使用者需要關注的細節 非常多,比如資料分片、資料傳輸、節點間通訊等,因而設計分散式程式的門檻非常 高。Hadoop的一個重要設計目標便是簡化分散式程式設計,將所有並行程式均需要關注的設計細節抽象成公共模組並交由系統實現,而使用者只需專注於自己的應用程式邏輯實現,這樣簡化了分散式程式設計且提高了開發效率。
良好的擴充套件性:
隨著公司業務的發展,積累的資料量(如搜尋公司的網頁量) 會越來越大,當資料量增加到一定程度後,現有的叢集可能已經無法滿足其計算能力和儲存能力,這時候管理員可能期望通過新增機器以達到線性擴充套件叢集能力的目的。
高容錯性:
在分散式環境下,隨著叢集規模的增加,叢集中的故障率(這裡 的“故障”包括磁碟損壞、機器宕機、節點間通訊失敗等硬體故障和壞資料或者使用者 程式bug產生的軟體故障)會顯著增加,進而導致任務失敗和資料丟失的可能性增 加。為此,Hadoop通過計算遷移或者資料遷移等策略提高叢集的可用性與容錯性。
適合PB級以上海量資料的離線處理:
可以實現上千臺伺服器叢集併發工作,提供資料處理能力。
缺點:
不擅長實時計算:
MapReduce無法像MySQL一樣,在毫秒或者秒級內返回結果。
不擅長流式計算
流式計算的輸入資料是動態的,而MapReduce的輸入資料集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了資料來源必須是靜態的。
不擅長DAG(有向圖)計算
多個應用程式存在依賴關係,後一個應用程式的輸入為前一個的輸出。在這種情況下,MapReduce並不是不能做,而是使用後,每個MapReduce作業的輸出結果都會寫入到磁碟,會造成大量的磁碟IO,導致效能非常的低下。
2. Hadoop MapReduce核心思想
從MapReduce自身的命名特點可以看出,MapReduce由兩個階段組成:Map階段 和Reduce階段。
(1)分散式的運算程式往往需要分成至少2個階段。
(2)第一個階段的MapTask併發例項,完全並行執行,互不相干。
(3)第二個階段的ReduceTask併發例項互不相干,但是他們的資料依賴於上一個階段的所有MapTask併發例項的輸出。
(4)MapReduce程式設計模型只能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。
每一個Map階段和Reduce階段都可以由多個Map Task和Reduce Task
實際應用中我們只需編寫map()和reduce()兩個函式,即可完成簡單的分散式程式的 設計。
map()函式以key/value對作為輸入,產生另外一系列key/value對作為中間輸出 寫入本地磁碟。
MapReduce框架會自動將這些中間資料按照key值進行聚集,且key 值相同(使用者可設定聚集策略,預設情況下是對key值進行雜湊取模)的資料被統一 交給reduce()函式處理。
reduce()函式以key及對應的value列表作為輸入,經合併key相同的value值後, 產生另外一系列key/value對作為最終輸出寫入HDFS。
hadoop MapReduce對外提供了5個可程式設計元件,分別是 InputFormat、Mapper、Partitioner、Reducer和OutputFormat
3.MapReduce工作機制
剖析MapReduce執行機制
過程描述
- 客戶端:提交MapReduce作業
- YARN資源管理器,負責協調叢集上計算機資源的分配
- YARN節點管理器,負責啟動和監視叢集中機器上的計算容器(container)
- MapReduce的application master,負責協調執行MapReduce作業的任務。他和MapReduce任務在容器中執行,這些容器有資源管理器分配並由節點管理器進行管理
- 分散式檔案系統(一般為HDFS),用來與其他實體間共享作業檔案
第一階段:作業提交(圖1-4步)
步驟:
Job的submit()方法建立一個內部的JobSummiter例項,並且呼叫其submitJobInternal()方法。提交作業後,waitForCompletion()每秒輪詢作業的進度,如果發現自上次報告後有改變,便把進度報告到控制檯。作業完成後,如果成功,就顯示作業計數器;如果失敗,則導致作業失敗的錯誤被記錄到控制檯。
1.客戶端提交作業Job,並輪詢監控作業進度和狀態;
2.Job tracker向RM申請一個新應用ID,用作MR作業的ID;(圖中整個流程的步驟2)
3.Job 計算作業的輸入分片。如果無法進行分片計算,比如:輸入路徑不存在,作業就不提交,將錯誤返回給MR程式;
4.如果上一步OK,Job將執行作業所需要的資源(包括作業的JAR檔案、配置檔案和計算所得的輸入分片)複製到一個以作業ID命名的目錄下的共享檔案系統中(一般都是HDFS)。作業JAR的複本較多(複本的數量有作業提交的時候MR的引數控制:mapreduce.client.submit.file.replication屬性,預設為10),因此執行作業的任務時,叢集中有很多個複本可供節點管理器(Node Manager )訪問讀取;
5.上傳完畢後,通過呼叫資源管理器的submitApplication()方法提交作業。
第二階段:作業初始化(圖5-7步)
步驟:
1.資源管理器收到呼叫它的submitApplication()訊息後,便將請求傳遞給YARN排程器。排程器分配一個容器(container),並讓容器執行MRAppMaster程式,本質是RM(資源管理器)在NM(節點管理器)的管理下在容器中啟動application master的程序;(5a、5b)
2.MR作業的application master是一個Java應用程式,其主類是MRAppMaster。由於MRAppMaster要能相應客戶端對於應用程式執行狀態的查詢和狀態,因此application master對作業的初始化是通過建立多個簿記物件(就像一個多重賬單,形象的命名為簿記物件)以保持對作業進度的跟蹤來完成的。接下來,它接受來自共享檔案系統的、在客戶端計算的輸入分片。然後對每一個分片建立一個map任務物件以及有mapreduce.job.reduces屬性(通過作業的setNumReducetasks()方法設定)確定的多個reduce任務物件。任務ID在此時分配。
application master必須決定如何執行構成MapReduce作業的各個任務。如果作業很小,就選擇和自己在同一個JVM上執行任務。與在一個節點上順序執行這些任務相比,當application master判斷在新的容器中分配和執行任務的開銷大於並行執行它們的開銷時,就會發生這一情況。這樣的作業稱為uberized。或者作為Uber任務執行。
小作業是如何定義的?
預設情況下,小作業就是小於10個mapper且只有1個reducer且輸入大小小於一個HDFS塊的作業。其引數可以通過如下屬性進行設定:
- mapreduce.job.ubertask.maxmaps :最大map任務數量
- mapreduce.job.ubertask.maxreduces:最大reduce任務數量
- mapreduce.job.ubertask.maxbytes:處理檔案的最大容量,byte為單位
- mapreduce.job.ubertask.enable:true表示為啟用Uber任務
最後,在任何任務執行之前,application master呼叫setupJob()方法設定OutputCommitter。FileOutputCommitter為預設值,表示將建立作業的最終輸出目錄及任務輸出的臨時工作空間。
第三階段:任務的分配(圖8)
當作業不適合作業Uber任務執行,那麼application master就會為該作業中的所有map任務和reduce任務向資源管理器(RM)請求容器。
步驟:
- 首先Map任務發出請求,該請求優先順序要高於reduce任務的請求,這是因為所有的map任務必須在reduce排序階段能夠啟動前完成。直到有5% 的map任務已經完成時,為reduce任務申請容器的請求才會發出。
- reduce任務能夠在叢集中任意位置執行,但是map任務的請求有著資料本地化侷限,這也是排程器所關注的。在理想情況下,任務時資料本地化的。也就是說任務在分片駐留的同一節點上執行。次選的情況是,任務是可以機架本地化(rack local),即和分片在同一機架上而非同一節點上執行。有一些任務既不是資料本地化也不是機架本地化,他們會從別的機架,而不是執行所在的機器上獲取自己的資料。對於一個特定的作業執行,可以通過檢視做的計數器來確定在每個本地化層次上執行的任務的數量。
- 請求為任務指定了記憶體需求和CPU數。預設情況下,每個map任務和reduce任務都分配到1G記憶體和一個虛擬核心。這些值也是可以在每個作業的基礎上進行配置。配置屬性如下:
- mapreduce.map.memory.mb:map任務記憶體大小
- mapreduce.reduce.memory.mb:reduce任務記憶體大小
- mapreduce.map.cpu.vcores:map任務cpu核數
- mapreduce.reduce.cpu.vcores:reduce任務CPU核數
第四階段:任務的執行(圖9-11)
步驟:
- 一旦資源管理器的排程器為任務分配了一個特點節點上的容器,application master就通過與節點管理器通訊來啟動容器.
- 該任務有主類為YarnChild的一個Java應用程式執行。在它執行任務之前,首先將任務需要的資源本地化,包括任務的配置、JAR檔案和所有來自分散式快取的檔案;
- 最後執行map任務或reduce任務。
YarnChild 在指定的JVM中執行,因此使用者定義的map和reduce函式(甚至是YarnChild)中的任何缺陷不會影響到節點管理器,例如導致其崩潰或掛起。
每個任務都能夠執行搭建(setup)和提交(commit)動作,他們和任務本身在同一個JVM中執行,並有作業的OutputCommitter確定。對於基於檔案的作業,提交動作將任務輸出由臨時位置搬移到最終位置。提交協議確保當推測執行被啟用時,只有一個任務副本被提交,其他都取消。
推測執行
MapReduce模型將作業分解成任務,然後並行地執行任務以使作業的整體執行時間少於各個任務順序執行的時間。這就使得作業執行時間對執行緩慢的任務很敏感,因為只執行一個緩慢的任務會使整個作業所用的時間遠遠長於執行其他任務的時間,當一個作業由幾百或幾千個任務組成時,可能出現少數“拖後腿”的任務,這是很常見的。
任務執行緩慢的可能有很多種,但是檢測具體原因是比較困哪的(比如,硬體方面當前節點的效能低於其他節點或者軟體應用配置如記憶體,JVM,reduce個數等等的問題),儘管執行時間比預期長,但是任務最終是成功執行的。hadoop 不會嘗試診斷或者修復執行慢的任務,相反,在下一個任務執行比預期慢的時候,它會盡量檢測,並啟動另一個相同的任務作為備份,這就是所謂的推測執行。
我們很容易想到的是,如果同時啟動了兩個重複的任務,對於資源的開銷時比較大,且相同任務之間會產生互相競爭(比如資源,當同任務在同一節點啟動時的記憶體、cpu等),這都不是我們想要的,在什麼情況下對某個任務啟動推測執行的時機是沒有絕對的,我們可以相對於各自型別任務(map和reduce)的平均執行進度作為一個基準【比如,map階段的任務平均耗時10s,當一個map任務執行了20S還沒有執行完成,那我們有理由相信這個任務可能是出問題,我們應該啟動一個副本任務】。在此基準上將執行速度明顯低於平均水平的那一部分任務進行推測執行副本。當一個推測執行的任務完成之後,其他正在執行的重複任務都將被中止執行,這個很容易理解,我們已經得到了想要的結果,就不需要副本任務了。
但是推測執行開啟就一定是好事麼?小可愛們可以思考下,什麼情況下推測執行會產生負面的影響。
我們都清楚推測執行是需要開啟相同的任務,那就是一個任務需要的資源是更多的,那對於一個繁忙的叢集,執行推測執行就會消耗更多的資源,減少了叢集整體的吞吐量,因而此時推測執行對於整體而言是不利的,就是因為一個任務慢導致整個叢集都慢。
對於reduce任務,我們知道Reduce任務是需要將Map任務的輸出結果彙集之後,如果reduce任務有大量的任務推測執行,對於叢集的網路IO是會產生較大的影響,對於叢集整體也會產生影響。另外,reduce可能會產生資料傾斜,對於此,一個reduce任務因為資料雜湊問題本身執行就慢,開啟推測執行反而不會有積極的影響。因此,對於reduce任務,關閉推測執行是相對好的選擇。
第五階段:作業完成
當application master收到作業最後一個任務已完成的通知後,便把作業的狀態設定為成功。然後,在Job輪詢狀態時,便知道任務已成功完成。於是Job列印一條訊息告知使用者,然後從waitForCompletion()方法返回。Job的統計資訊和計數值也在這個時候輸出到控制檯。
如果我們希望當任務執行完成之後,application master可以主動通知我們,而不是等待Job輪詢才能獲取到任務的完成狀態。可以在application master進行相應的設定,這時application master會發送一個HTTP作業通知。客戶端通過設定屬性【mapreduce.job.end-notification.url】進行相應的設定。
最後,作業完成時,各個分配的container和application master會清理其工作狀態,任務執行期間的中間輸出將被刪除,OutputCommitter的commitJob()方法會被呼叫。作業資訊有作業歷史伺服器存檔,以便日後需要時檢視。
在這我們就已經走完了MR程式整個執行過程,對於其中的部分細節我們在下邊在來介紹一下。
Tips 知識點:進度和狀態更新
MapReduce作業是長時間執行的批量作業,執行時間範圍從數秒到數小時。在這個很長的時間內,使用者想要去獲取關於作業的一些執行反饋是很重要的。一個作業和它的每個任務都有一個狀態(status),包括:作業或任務的狀態(比如:執行中,成功完成,失敗)、map和reduce的進度、作業計數器的值、狀態訊息或描述(可以由使用者程式碼設定)。之前我們也有提到使用者是可以隨時檢視作業的狀態資訊的,那這些狀態是怎麼通過客戶端進行通訊得到的呢?
任務在執行時,對其進度(progress,即任務完成百分比)保持追蹤。對Map任務,任務進度是已處理輸入所佔的比例。對reduce任務,情況會複雜一些,但系統仍然會估計已處理reduce輸入的比例。整個過程分為三部分,與shuffle的三個階段相對應。比如,如果任務已經執行reducer一半的輸入,那麼任務的進度便是5/6,這是因為已經完成複製和排序階段(每個佔1/3),並且完成reduce階段的一半(1/6)。
任務也有一組計數器,負責對任務執行過程中各個時間進行計數,這些計數器要麼內建框架中,比如寫入的map輸出記錄數,要麼是使用者自定義的。
當map任務或reduce任務執行時,子程序和自己的父application master通過umbilical介面通訊。預設每隔3秒,任務通過這個umbilical介面向自己的application master報告進度和狀態(包括計數器),application master會形成一個作業的匯聚檢視。注意:application master不會主動獲取任務的進度,是被動接受task任務的上報。
在作業期間,客戶端每秒鐘輪訓一次application master以接收最新狀態(輪詢間隔通過mapreduce.client,progressmonitor.ploointerval設定)。客戶端也可以通過使用job的getStatus()方法得到一個JobStatus例項,其內包含作業的所有狀態資訊。
4.MR各組成部分工作機制原理
4.1概覽:
4.2 MapTask工作機制
MapTask的整體計算如上圖所示,共分為5個階段,如下:
其中最重要的部分是輸出結果在記憶體和磁碟中的組織方式,具體涉及Collect、Spill和Combine三個階段,對於這三個階段我們介紹時會深入介紹。
(1)Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該階段主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列新的key/value。
(3)Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫OutputCollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫Partitioner),並寫入一個環形記憶體緩衝區中。
輸出深入講解
當map函式處理完一對key/value產生新的key/value後,會呼叫collect()函式輸出結果。在輸出結果時,OutputCollecter物件會根據作業是否有Reduce Task進行不同的處理,如果沒有Reduce Task階段,則把結果直接輸出到HDFS。如果後續有對應的Reduce Task,則開始組織封裝結果:
- 1.獲取對應記錄的分割槽號partition,然後寫到環形緩衝區;
- 2.環形緩衝區中,當資料寫入到一定閾值後,會有專屬的寫出執行緒(SpillThread)將資料寫到一個臨時檔案中此操作稱之為落盤,當所有資料處理完畢後,對所有臨時檔案進行一次合併以生成一個最終檔案。環形快取區使得Collect階段和Spill階段可以並行進行。
- 3.資料(新的key/value)寫入是由兩部分組成,索引和真實key/value。通過讓索引和資料共享環形緩衝區,提升整個緩衝區的效率:存放如下
指標equator,該指標界定了索引和資料的共同起始存放位置,從該位置開始,索引和資料分別沿相反的方向增長記憶體使用空間。當記憶體使用達到80%(預設情況下) 的時候,落盤也是從該指標開始讀取資料到指定的位置,資料和索引的落盤是分開進行的,索引落盤一般是當索引大小超過1MB才開始進行落盤。
(4)Spill階段:即“溢寫”,當環形緩衝區滿後,MapReduce會將資料寫到本地磁碟上,生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序演算法對快取區區間內的資料進行排序,排序方式是,先按照分割槽編號Partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。
步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。
步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。
環形緩衝區認知:
(5)Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。
當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。
在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併io.sort.factor(預設10)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。
讓每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。
4.3 ReduceTask工作機制
(1)Copy階段/Shuffle階段:ReduceTask從各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。
(2)Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。
總體上看,Shuffle&Merge階段可進一步劃分為三個子階段。
(1)準備執行完成的Map Task列表:
GetMapEventsThread執行緒週期性通過RPC從TaskTracker獲取已完成Map Task列表,並儲存到對映表mapLocations(儲存了TaskTracker Host與已完成任務列表的對映關係)中。為防止出現網路熱點,Reduce Task通過對所有TaskTracker Host進行“混洗”操作以打亂資料拷貝順序,並將調整後的Map Task輸出資料位置儲存到scheduledCopies列表中。
(2)遠端拷貝資料
Reduce Task同時啟動多個MapOutputCopier執行緒,這些執行緒從scheduledCopies列表中獲取Map Task輸出位置,並通過HTTP Get遠端拷貝資料。對於獲取的資料分片,如果大小超過一定閾值,則存放到磁碟上,否則直接放到記憶體中。
(3)合併記憶體檔案和磁碟檔案
為了防止記憶體或者磁碟上的檔案資料過多,Reduce Task啟動了LocalFSMerger和InMemFSMergeThread兩個執行緒分別對記憶體和磁碟上的檔案進行合併。
(3)Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。
(4)Reduce階段:reduce()函式將計算結果寫到HDFS上。
前面提到,各個Map Task已經事先對自己的輸出分片進行了區域性排序,因此,Reduce Task只需進行一次歸併排序即可保證資料整體有序。為了提高效率,Hadoop將Sort階段和Reduce階段並行化。在Sort階段,Reduce Task為記憶體和磁碟中的檔案建立了小頂堆,儲存了指向該小頂堆根節點的迭代器,且該迭代器保證了以下兩個約束條件:
1.磁碟上檔案數目小於io.sort.factor(預設是10)。
2.當Reduce階段開始時,記憶體中資料量小於最大可用記憶體(JVM Max HeapSize)的mapred.job.reduce.input.buffer.percent(預設是0)。
在Reduce階段,Reduce Task不斷地移動迭代器,以將key相同的資料順次交給reduce()函式處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程,這樣,Sort和Reduce可並行進行。
4.4shuffle 階段
4.4.1 定義:
我們將MR過程中,將Map輸出作為輸入傳給reducer的過程成為shuffle,但一般情況下,我們把從map端產生輸出到reduce消化輸入的整個過程都稱之為shuffle。
大致流程如下:
也就是說shuffle階段包括了從Map端Collect階段開始一直到Reduce端Sort階段的整個過程,也可以看出這是整個MR的核心過程,在生產中優化MR更多的是在shuffle階段的各個過程做文章,提高整個MR的處理效率。
小結:
通過這篇文章我們瞭解了什麼是MapReduce。在Hadoop中MapReduce是計算處理的邏輯關鍵,而Shuffle階段又是整個MR的核心關鍵,Shuffle是部分Map task階段和Reduce Task階段的處理過程誠摯為Shuffle階段。對於我們進行MR調優的大部分操作都是在Shuffle階段的機制中去優化各個節點的處理,進而提升MR的處理效率。針對於如何調優MR,我們會在下個篇章進行一些生產中常見的調優的策略,而對於MR的理解能幫助我們更好的去進行調優處理。
好了,今天的文章就到這裡結束了。路漫漫其修遠兮,吾將上下而求索。希望這篇文章對於大家有所幫助。