大資料系列4:Yarn以及MapReduce 2
阿新 • • 發佈:2021-01-29
系列文章:
[大資料系列:一文初識Hdfs](https://mp.weixin.qq.com/s/t145iTxDwKjmu4rfqHkDRQ)
[大資料系列2:Hdfs的讀寫操作](https://mp.weixin.qq.com/s/c7aqP82wlnOZ6Kn96xlpEQ)
[大資料謝列3:Hdfs的HA實現](https://mp.weixin.qq.com/s/WiC_6PTIWIcCn8XJlMdo2A)
通過前文,我們對Hdfs的已經有了一定的瞭解,本文將繼續之前的內容,介紹``Yarn``與``Yarn``在``MapReduce 2``的應用
---
### MapReduce1 作業流程
在介紹``Yarn``之前,我們先介紹一下``Mapreduce1``作業流程。
有了這個基礎,再去看看採用``Yarn``以後的``MapReduce2``到底有啥優勢也許會有更好的理解。
首先先介紹一下相關的幾個實體:
1. ``Client``:負責提交 ``MapReduce`` 作業
2. ``jobtracker``:協調作業執行,是一個Jave程式,主類為``JobTracker``
3. ``tasktracker``:執行作業劃分後的任務,是一個Jave程式,主類為``TaskTracker``
4. ``Hdfs``:分散式檔案系統,用於在其他實體之間共享作業檔案
作業流程圖如下:
![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129181827651-906373059.png)
1. ``MapReduce Program`` 呼叫``runJob()``建立``JobClient``並告知其提交作業。
在提交作業後``runJob()``會每秒輪詢作業進度,如果發生改變就把進度輸出控制檯。
作業成後輸出作業計數器,如果失敗,則輸出失敗資訊。
2. ``JobClient``通過呼叫``JobTracker.getNewJobId()``請求一個新的``JoobId``。
3. 將執行作業需要的資源(作業``Ja``r檔案,配置檔案,計算所得的輸入分片)複製到以``JobId``命名的目錄下``jobtracker``的``HDFS``中。
作業Jar的會有多個副本(``mapred.submit.replication``預設10),在執行作業的時候,``tasktracker``可以訪問多個副本。
4. 呼叫``JobTracker.submitJob()``方法告知``jobtracker``作業準備執行。
5. ``JobTracker``接收到對``submitJob()``的呼叫後,會把改呼叫放入一個內部佇列,交由作業排程器(``job scheduler``)進行排程。
同時會對``Job``初始化,包括建立一個表示``Job``正在執行的物件,用來封裝任務和記錄的資訊,用於追蹤任務的狀態和程序。
6. 為了建立人物執行列表,作業排程器會從共享檔案系統中獲取``JobCient``已經計算好的輸入分片資訊。
然後為每一個分片建立一個``map``任務。
至於``reduce``任務則由``JonConf``的``mapred.reduce.task``決定,通過``setNumReduceTask()``設定,
然後排程器建立相應數量的``reduce``任務。
此時會被指定任務``ID``
7. ``tasktracker``與``jobtracker``之間維持一個心跳,
作為訊息通道,``tasktracker``或告知自身存活情況與是否可以執行新的任務。
根據資訊,``jobtracker``會決定是否為``tasktracker``分配任務(通過排程演算法)。
這個過程中,對於``map``任務會考慮資料本地性,對於``reduce``則不需要。
8. 一旦``tasktracker``被分配了任務,接下里就是執行,首先通過``Hdfs``把作業的``Jar``檔案複製到``tasktracker``所在的檔案系統。
實現作業``Jar``本地化。
同時,``tasktracker``把需要的檔案從``Hdfs``複製到本地磁碟。
然後為任務建立一個本地工作目錄,並將``Jar``中的內容解壓到這裡。
最後建立一個``TaskRunner``例項執行該任務。
9. ``TaskerRunner``啟動一個新的``JVM``用來執行每一個任務。
10. 分別執行``MapTask``或者``ReduceTask``,結束後告知``TaskTracker``結束資訊,同時``TaskTracker``將該資訊告知``JobTracker``
上面就是``Maopreduce1``作業執行的流程。我們先有個概念,後面介紹``Yarn``的時候做下對比。
> 這裡說的``Mapreduce1`` 指的是``Hadoop``初始版本(版本1以及更早的)中的``Mapreduce``分散式執行框架,也就是我們上面的作業流程。
> ``Mapreduce2`` 指的是使用``Yarn``(``Hadoop 2`` 以及以後版本)的``Mapreduce``執行方式。
> 這裡``Mapreduce1、Mapreduce2``指的不是``Hadoop``版本,指的是``Mapreduce``程式的不同執行機制而已。
---
### Yarn
``Yarn (Yet Another Resource Negotiator)``是在``Hadoop 2``引入的叢集資源管理系統,最初的目的是為了改善``MapReduce``的實現。
但是由於其具有強大的通用性,可以支援其他的分散式計算框架。
在引入的``Yarn``後,``Hadoop 2``的生態就發生了一變化,如下:
![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129181843844-726715812.png)
``Yarn``提供請求和使用叢集資源的``API``,但是一般都是由分散式框架(``Saprk、Flink``等)內部呼叫這些``API``,
使用者則使用分散式系統提供的更高層的``API``。
這種方式向用戶隱藏了資源管理的細節,一定程度上降低了開發難度和運維成本。
---
``Yarn``的結構很簡單,如下
![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129181912057-398438199.png)
``Yarn``的核心思想是將資源管理和作業排程/監視功能拆分為單獨的守護程序。
具體實現就是:
一個管理叢集上資源使用的全域性資源管理器(``RM,ResourceManager``);
執行在叢集所有結點上並且能夠啟動和監控容器(``Container``)的結點管理器(``Node Manager``)
> ``Container``是用於執行特定應用程式的程序,每個資源都有資源限制(記憶體、CPU等)
> ``Container``可以是``Unix``程序,也可以``Linux cgroup``
Yarn的組成介紹就這麼簡單,接下來我們就看看它怎麼提交執行一個任務。
---
### 提交任務
這裡分為兩部分,
第一部分會介紹Yarn任務提交流程,
第二部分會介紹Mapreduce 2 的提交流程
---
#### Yarn任務提交流程
``Yarn`` 任務的提交流程如下:
![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129181927522-1605777896.png)
1. 為了在``Yarn``上執行任務,``Client``會向``ResourceManager``發出執行 ``Application Master process``的請求。
2. ``Resource Manager``找到一個可以執行``Application Master``的``NodeManager``。
3. ``NodeMager``啟動一個容器,執行``Application Master``。
4. 此``時Application Master``會做什麼操作取決於``Application``本身,
可以是在``在Application Master``執行一個簡單計算任務,將結果返回``Client``,
也可以向``Resource Manager``申請更多容器。
5. 申請到更多的``container``。
從上面的步驟可以發現,``Yarn``本身是不會為應用的各個部分(``Client, Master, Process``)之間提供互動。
大多數基於``Yarn``的任務使用某些遠端通訊機制(比如``Hadoop RPC``)向客戶端傳遞資訊。
這些``RPC``通訊機制一般都是專屬於該應用的。
---
##### MapReduce 2 任務提交流程
有了上面的基礎,具體的應用怎麼提交。
此處選用``MapReduce 2``,與一開始``MapReduce 1``做個對比
涉及到五個實體:
1. ``Client``:提交 ``MapReduce job``的客戶端
2. ``YARN Resource Manager``:負責協調分配叢集計算資源
3. ``YARN Node Managers``:啟動並監視叢集中機器上的計算容器。
4. ``MapReduce Application Master``:協調``MapReduce job``的任務執行。
5. ``HDFS``:用於在其他實體之間共享``Job``檔案
> ``Application Master`` 和 ``MapReduce Tasks`` 在容器中執行,他們由``Resource Manager``排程,由``Node Managers``管理
提交流程如下:
![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129181941564-1022968034.png)
1. ``Job.sumbit()``方法建立一個內部``的JobSummiter``例項,並呼叫其``sunbmitJobInternal()``方法。
作業提交後,``waitForCompletion()``會每秒輪詢返回作業的進度。
如果作業完成後,如果成功則顯示作業計數器,否則輸出錯誤。
2. ``JobSummiter``向``Resource Manager``申請一個用於 ``MapReduce job ID``的新``Application ID``。
這個過程會檢查作業,輸出說明:
例如,如果沒有指定輸出目錄,或者輸出目錄已經存在,則不會提交作業,並向``MapReduce``程式丟擲錯誤;
計算作業的輸入分片。
如果無法計算分片(例如,因為輸入路徑不存在),則作業不提交,並向``MapReduce``程式丟擲錯誤。
3. 將執行作業需要的資源(作業``Jar``檔案,配置檔案,計算所得的輸入分片)複製到以``JobId``命名的``HDFS``的目錄下。
作業``Jar``的會有多個副本(``mapreduce.client.submit.file.replication``預設10),
當``Node Managers``執行任務時,可以跨叢集訪問許多副本。
4. 通過呼叫``Resource Manager``的``submitApplication()``提交任務。
5. ``Resource Manager``收到``submitApplication()``的呼叫請求後,將請求傳遞給``Yarn``的排程器(``Scheduler``)。
``Scheduler``會為其分配一個容器,
6. ``Node Manager``在容器中啟動一個``Application Master``,主類為``MRAppMaster``。
7. 由於``MRAppMaster``將從任務接收進度和完成報告,它通過建立許多簿記物件(``bookkeeping objects``)來初始化作業,以跟蹤作業的進度。
8. 接下來,``MRAppMaster``從共享檔案系統檢索在客戶機中計算的輸入切片,
它會為每個切片建立一個map task;
建立``mapreduce.job.reduces``(由``Job.setNumReduceTasks()``)數量的``reduce task``。
``MRAppMaster``根據任務的情況決定是執行一個``uber task``還是向``Resource Manager``請求更多的資源。
9. ``MRAppMaster``向``Resource Manager``為``job``中所有的``map、reduce tasks``申請容器。
10. 一旦``Resource Manager``的``Scheduler``為``task``在指定的``Node Manager``分配了容器以後``,Application Master``就會請求``Node Manager``分配容器。
11. ``Node Manager``收到請,啟動容器。容器中的主類為``YarnChild``,執行在專用的``JVM``中,所以``map、reduce``、甚至``YarnChild``本身出現的錯誤都不會影響``Node Manager``。
12. 在執行``task``之前,``YarnChild``會對任務需要的資源進行本地化,包括``job``配置、``JAR``檔案以及其他來自``Hdfs``的檔案。
13. 最後執行``map 或 reduce`` 任務。
關於的``ubertask``細節說明:
``MRAppMaster``必須決定如何執行``MapReduce job``。
利用並行的優勢,確實可以提高任務的執行效率,
但是在小任務或少任務的情況下,
在新的容器中分配和執行任務所額外消耗的時間大於並行執行帶來效率的提升。
這個時候在一個節點上順序執行這些任務反而能獲得更好的效率。
這樣的job被稱為``uber task``
> 簡單的說就是並行執行的時候任務效率的提升還不夠彌補你重新申請資源、建立容器、分發任務等消耗的時間。
那麼怎樣才算``small job``呢?
預設情況下:``small job``是有少於10個``mapper``,只有一個``reducer``,一個輸入大小小於一個``HDFS Block``大小的job。
當然也可以通過引數`` mapreduce.job.ubertask.maxmaps`` ,``mapreduce.job.ubertask.maxreduces`` , ``mapreduce.job.ubertask.maxbytes ``進行設定。
對於``Ubertasks``,``mapreduce.job.ubertask.enable``必須設定為``true``。
對於步驟9補充說明:
在這個過程中,會先申請``map``任務的容器,
因為所有的map任務都必須在reduce的排序階段開始之前完成(``Shuffle and Sort機制``)。
對``reduce``任務的請求直到5%的``map``任務完成才會發出(``reduce slow start機制``)。
對於``reduce``任務,可以在叢集的任何結點執行,
但是對``map``任務,會有資料本地性的要求(詳情此處不展開)
申請還為任務指定記憶體和``cpu``。預設情況下,每個``map``和``reduce``任務分配``1024 MB``記憶體和1個虛擬核,
可以通過``mapreduce.map.memory.mb ``, `` mapreduce.reduce.memory.mb`` , ``mapreduce.map.cpu.vcores``和`` mapreduce.reduce.cpu.vcores``進行配置
----
### Yarn 與mapreduce 1
上面就是``Mapreduce2``的任務提交執行流程,一開始我們就介紹了``Mapreduce1``,現在我們對比下兩個有啥區別。
>本質就是結合``Mapreduce 2 ``對比``Yarn``與``Mapreduce1``排程的區別,所以後面``Mapreduce 2 ``直接用``Yarn``替換
``Mapreduce 1 ``中,作業執行過程由兩類守護程序控制,分別為一個``jobtracker``和多個``tasktracker``。
``jobtracker`` 通過排程``tasktracker``上的任務來協調執行在系統的``Job``,並記錄返回的任務進度。
``tasktracker``負責執行任務並向`jobtracker``傳送任務進度。
``jobtracker``同時負責作業的排程(分配任務與``tasktracker``匹配)和任務進度監控(任務跟蹤、失敗重啟、記錄流水、維護進度、計數器等)
``Yarn`` 中,也有兩類守護程序``Resource Manager`` 和``Nonde Manager``分別類比``jobtracker``和``tasktracker``。
但是不一樣的地方在於,``jobtracker``的職責在``Yarn``中被拆分,由兩個實體``Reource Manger`` 和``Application Master```(每個Job有一個)。
``jobtracker`` 也可以儲存作業歷史,或者通過執行一個獨立守護程序作為歷史作業伺服器。而與對應的,``Yarn``提供一個時間軸伺服器(``timeline server``)來儲存應用的歷史。
二者組成類比
Mapreduce 1 | Yarn
:---:|:---:
``jobtracker`` | ``Reource Manger``
``Application Master``
``timeline server`` ``tasktracker`` | ``Nonde Manager`` ``Slot`` | ``container`` 對於二者的區別,心血來潮想了個例子,希望能夠幫助理解。 有三個角色:皇帝、大臣、打工人 ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182006633-24275832.png) 現在有兩個情況, 1:發生水災,需要賑災 2:敵寇入侵,邊疆告急 在這種情況下 ``Mapreduce 1`` 的做法是: ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182023815-584840748.png) ``Yarn``的做法: ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182035651-81530409.png) 簡單的說,就是``Yarn``讓專業的人做專業的事情。 遇到事情找個專家,我只負責提需求和提供資源, 其他的讓專家去做。 > 這個專家就是``MRAppMaster(Mapreduce)``,而對應的Spark也有自己的專家 由此也總結下``Yarn``帶來的優勢: 1. 可拓展性(``Scalability``) ``Yarn``可以在比``MapReduce 1``更大的叢集上執行。 ``MapReduce 1``在4000個節點和40000個任務的時候達到拓展性的瓶頸。 主要是因為``jobtracker``需要管理作業和任務。 ``Yarn``就拆分了這個,將作業與任務拆分,由``Manager/Application Master``分別負責,可以輕鬆將拓展至10,000 個節點 100,000 個任務。 2. 可用性(``Availability)`` 高可用性(HA)通常是通過複製另一個守護程序所需的狀態來實現的,以便在活躍狀態的守護程序掛掉時接管提供服務所需的工作。 但是,``jobtracker``的記憶體中有大量快速變化的複雜狀態(例如,每個任務狀態每隔幾秒更新一次),這使得將在``jobtracker``服務配置HA非常困難。 而對於``Yarn``而言,由於職責被拆分,那麼``HA``也隨之變成了分治問題。 可以先提供``Resource Manager``的HA,同時如果有需要可以為每個人應用也提供HA。 > 實際上對於``Mapreduce 2`` 對``Resource Manager`` 和 ``Application Master``都提供了HA,稍候介紹。 3. 利用率(``Utilization``) ``MapReduce 1`` 中,每個``tasktracker``都靜態配置若干個``slot``,在配置的時候被劃分為``map slot`` 和``reduce slot``,只能執行固定的任務。 而在``Yarn``中,``Node Manager``管理一個資源池,只要有資源,任務就可以執行。 同時資源是精細化管理的,任務可以按需申請資源。 4. 多租戶(``Multitenancy``) 其實,某種程度上來說,統一的資源管理,向其他分散式應用開放``Hadoop``才是``Yarn``的最大優勢。 比如我們可以部署``Spark、Flink``等等。此時``MapReduce``也僅僅是在這之上的一個應用罷了 --- ### High Availability 接下來再說一下HA吧。 這裡主要結合``Mapreduce 2`` 來說明 HA 針對的都是出現錯誤或失敗的情況。 在我們這裡,出現錯誤或失敗的場景有以下幾個 1. 任務失敗 2. ``Application Master``失敗 3. ``Node Manager``失敗 4. ``Resource Manager``失敗 接下來我們分別看看這些情況怎麼解決。 --- #### task 失敗 任務失敗的情況有可能出現下面的情況: 1. 使用者``map、reduce task``程式碼問題,這種失敗最常見,此時在``task JVM``在退出前會向``Application Master``傳送錯誤報告,該報告會被計入使用者日誌。最後``Application Master``會將該任務將任務嘗試標記為``failed``,並釋放容器,使其資源可供另一個任務使用。 2. 另一種情況是``task JVM``突然退出,可能存在一個``JVM bug``,導致JVM在特定環境下退出``MapReduce``的使用者程式碼。 這種情況下,``Node Manager``發現程序已經退出,會告知``Application Master``,並將任務嘗試標記為``failed``。 3. 還有一種是任務超時或者掛起,一旦``Application Master``注意到有一段時間沒有收到任務進度更新了,就會將該任務標記為``failed``,由引數``mapreduce.task.timeout``(預設10分鐘,0表示關閉超時,此時長時間執行任務永遠不會標記為``failed``,慎用)設定。 task 失敗的處理方式: 1. ``Application Master``發現任務失敗後,會重新排程該任務,會進行避免在之前失敗的``Node Manager``上排程該任務。 如果一個任務連續失敗四次(``mapreduce.map.maxattempts``,``mapreduce.reduce.maxattempts``),就不會繼續重調,整個Job也就失敗。 2. 而有些場景在少數任務失敗,結果仍舊可以使用,那麼此時我們不希望停止任務,可以配置一個允許任務失敗的閥值(百分比),此時不會觸發Job失敗。 通過``mapreduce.map.failures.maxpercent``、``mapreduce.reduce.failures.maxpercent``設定。 3. 還有一個情況是任務嘗試被kill,這種情況``Application Master``制動標記``killed``不屬於任務失敗。 > 推測機制(Speculative Execution),如果發現task執行的時間執行速度明顯慢於平均水平,就會在其他的結點啟動一個相同的任務,稱為推測執行。 這個不一定有效,僅僅是投機性的嘗試。 當任務成功完成時,任何正在執行的重複任務都將被終止,因為不再需要它們。 就是推測任務與原始任務誰能上位就看誰先完成了。 --- #### Application Master 失敗 當遇到``Application Master``失敗時,``Yarn``也會進行嘗試。 可以通過配置``mapreduce.am.max-attempts property``(預設:2)配置重試次數, 同時,``Yarn``對於叢集中執行的``Application Master``最大嘗試次數加了限制,也需要通過`` yarn.resourcemanager.am.max-attempts``(預設:2)進行配置。 重試的流程如下: ``Application Master`` 向``Resource Manager``傳送心跳,如果``Application Master``發生故障,``Resource Manager``將檢測故障,並在新的容器中啟動執行``Application Master``的新例項 > 在MapReduce,它可以使用作業歷史記錄來恢復(失敗的)應用程式已經執行的任務的狀態,這樣它們就不必重新執行。 預設情況下恢復是啟用的,但是可以通過設定``yarn.app.mapreduce.am.job.recovery``來禁用。
``MapReduce client``輪詢``Application Master``的進度報告, 但如果它的``Application Master``失敗,客戶端需要定位新的例項。 在``Job``初始化期間,``client``向``Resource Manager``請求``Application Master``的地址,然後對其進行快取,這樣在每次需要輪詢``Application Master``時, 就不會向``Resource Manager``發出請求,從而使``Resource Manager``負擔過重。 但是,如果``Application Master``失敗,``client``將發出狀態更新時超時,此時``client``將向``Resource Manager``請求新的``Application Master``的地址。 --- ##### Node Manager 失敗 如果``Node Manager``因崩潰或執行緩慢而發生故障,它將停止向``Resource Manager``傳送心跳(或傳送頻率非常低)。 ``Resource Manager`` 如果在10分鐘內(``yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms ``)沒有收到``Node Manager``的心跳資訊, 就會告訴該``Node Manager``,停止傳送心跳,並將它從自己的``Nodes``池中移除。 在此``Node Manager``失敗的``task`` 或 ``Application Master`` 都會按照之前的說的方式恢復。 此外,即使``map tasks``在失敗的``Node Manager``上執行併成功完成但屬於未完成的``job``, ``Application Master``也會安排它們重新執行, 因為它們的中間輸出駐留在故障``Node Manager``的本地檔案系統上,``reduce``任務可能無法訪問。 如果一個``Node Manager``失敗任務次數過多,該``Node Manager``會被``Application Master``拉入黑名單。 > 對於 MapReduce,如果一個Job在某個``Node Manager``失敗3個任務(`` mapreduce.job.maxtaskfailures.per.tracker``),就會嘗試在其他的結點進行排程。 注意,``Resource Manager``不會跨應用程式執行黑名單(編寫時), 因此來自新作業的任務可能會在壞節點上排程,即使它們已被執行較早作業的應用程式主程式列入黑名單。 --- ##### Resource Manager 失敗 ``Resource Manager``失敗是很嚴重的,一旦它失敗, ``job``和``task``容器都無法啟動。 在預設配置中,``Resource Manager``是一個單故障點,因為在(不太可能的)機器故障的情況下,所有正在執行的作業都失敗了並且無法恢復。 要實現高可用性(HA),必須在一個``active-standby``配置中執行一對``Resource Manager``。 如果 ``active Resource Manager``發生故障,則``standby Resource Manager``可以接管,而不會對``client``造成重大中斷。 通過將執行中的應用程式資訊儲存在高可用的狀態儲存區中(通過``ZooKeeper/HDFS``備份),實現``standby Resource Manager ``恢復``active Resource Manager``(失敗)的關鍵狀態。 ``Node Manager`` 資訊沒有儲存在狀態儲存中,因為當``Node Manager`` 發出第一次心跳時,新的``Resource Manager``可以相對較快地對其進行重構。 > 因為``task``由``Application Master``管理,所以``task``不屬於``Resource Manager``的狀態,因此於``Resource Manager``儲存的狀態比``jobtracker``中的狀態更容易管理。 目前,有兩種持久化RMStateStore的方式,分別為:``FileSystemRMStateStore``和``ZKRMStateStore``。 整體架構如下: ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182120677-1528908421.png) 我們可以通過手動或自動重啟``ResourceManager``。 被提升為active 狀態的ResourceManager載入ResourceManager內部狀態,並根據ResourceManager restart特性儘可能從上一個active Resource Manager 離開的地方繼續操作。 對於以前提交給ResourceManager的每個託管Application,都會產生一個新的嘗試。 應用程式可以定期checkpoint,以避免丟失任何工作。 狀態儲存必須在兩個Active/Standby Resource Managers中都可見。 從上圖可以看到我們可以選擇的狀態儲存介質有兩個``FileSystemRMStateStore`` 和 ``ZKRMStateStore``。 ``ZKRMStateStore``隱式地允許在任何時間點對單``ResourceManagers``進行寫訪問, 因此在HA叢集中推薦使用ZKRMStateStore。 在使用``ZKRMStateStore``時,不需要單獨的防禦機制來解決可能出現的腦裂情況,即多個``Resource Manager``可能扮演``active``角色。 並且``ResourceManager``可以選擇嵌入基於``zookeeper``的``ActiveStandbyElector``來決定哪個``Resource Manager``應該是``active``的。 當``active``的``ResourceManager``關閉或失去響應時,另一個``Resource Manager``會自動被選為``active``,然後由它接管。 >注意,不需要像HDFS那樣執行一個單獨的``ZKFC``守護程序,因為嵌入在``Resource Manager``中的``ActiveStandbyElector``充當故障檢測器和``leader elector``,所以不需要單獨的``ZKFC``守護程序。 --- 關於Yarn的內容就介紹到這裡,更詳細的內容可以參考[官網](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html) 之後會更新一些Hdfs讀寫的原始碼追蹤相關文章,有興趣可以關注【兔八哥
``Application Master``
``timeline server`` ``tasktracker`` | ``Nonde Manager`` ``Slot`` | ``container`` 對於二者的區別,心血來潮想了個例子,希望能夠幫助理解。 有三個角色:皇帝、大臣、打工人 ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182006633-24275832.png) 現在有兩個情況, 1:發生水災,需要賑災 2:敵寇入侵,邊疆告急 在這種情況下 ``Mapreduce 1`` 的做法是: ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182023815-584840748.png) ``Yarn``的做法: ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182035651-81530409.png) 簡單的說,就是``Yarn``讓專業的人做專業的事情。 遇到事情找個專家,我只負責提需求和提供資源, 其他的讓專家去做。 > 這個專家就是``MRAppMaster(Mapreduce)``,而對應的Spark也有自己的專家 由此也總結下``Yarn``帶來的優勢: 1. 可拓展性(``Scalability``) ``Yarn``可以在比``MapReduce 1``更大的叢集上執行。 ``MapReduce 1``在4000個節點和40000個任務的時候達到拓展性的瓶頸。 主要是因為``jobtracker``需要管理作業和任務。 ``Yarn``就拆分了這個,將作業與任務拆分,由``Manager/Application Master``分別負責,可以輕鬆將拓展至10,000 個節點 100,000 個任務。 2. 可用性(``Availability)`` 高可用性(HA)通常是通過複製另一個守護程序所需的狀態來實現的,以便在活躍狀態的守護程序掛掉時接管提供服務所需的工作。 但是,``jobtracker``的記憶體中有大量快速變化的複雜狀態(例如,每個任務狀態每隔幾秒更新一次),這使得將在``jobtracker``服務配置HA非常困難。 而對於``Yarn``而言,由於職責被拆分,那麼``HA``也隨之變成了分治問題。 可以先提供``Resource Manager``的HA,同時如果有需要可以為每個人應用也提供HA。 > 實際上對於``Mapreduce 2`` 對``Resource Manager`` 和 ``Application Master``都提供了HA,稍候介紹。 3. 利用率(``Utilization``) ``MapReduce 1`` 中,每個``tasktracker``都靜態配置若干個``slot``,在配置的時候被劃分為``map slot`` 和``reduce slot``,只能執行固定的任務。 而在``Yarn``中,``Node Manager``管理一個資源池,只要有資源,任務就可以執行。 同時資源是精細化管理的,任務可以按需申請資源。 4. 多租戶(``Multitenancy``) 其實,某種程度上來說,統一的資源管理,向其他分散式應用開放``Hadoop``才是``Yarn``的最大優勢。 比如我們可以部署``Spark、Flink``等等。此時``MapReduce``也僅僅是在這之上的一個應用罷了 --- ### High Availability 接下來再說一下HA吧。 這裡主要結合``Mapreduce 2`` 來說明 HA 針對的都是出現錯誤或失敗的情況。 在我們這裡,出現錯誤或失敗的場景有以下幾個 1. 任務失敗 2. ``Application Master``失敗 3. ``Node Manager``失敗 4. ``Resource Manager``失敗 接下來我們分別看看這些情況怎麼解決。 --- #### task 失敗 任務失敗的情況有可能出現下面的情況: 1. 使用者``map、reduce task``程式碼問題,這種失敗最常見,此時在``task JVM``在退出前會向``Application Master``傳送錯誤報告,該報告會被計入使用者日誌。最後``Application Master``會將該任務將任務嘗試標記為``failed``,並釋放容器,使其資源可供另一個任務使用。 2. 另一種情況是``task JVM``突然退出,可能存在一個``JVM bug``,導致JVM在特定環境下退出``MapReduce``的使用者程式碼。 這種情況下,``Node Manager``發現程序已經退出,會告知``Application Master``,並將任務嘗試標記為``failed``。 3. 還有一種是任務超時或者掛起,一旦``Application Master``注意到有一段時間沒有收到任務進度更新了,就會將該任務標記為``failed``,由引數``mapreduce.task.timeout``(預設10分鐘,0表示關閉超時,此時長時間執行任務永遠不會標記為``failed``,慎用)設定。 task 失敗的處理方式: 1. ``Application Master``發現任務失敗後,會重新排程該任務,會進行避免在之前失敗的``Node Manager``上排程該任務。 如果一個任務連續失敗四次(``mapreduce.map.maxattempts``,``mapreduce.reduce.maxattempts``),就不會繼續重調,整個Job也就失敗。 2. 而有些場景在少數任務失敗,結果仍舊可以使用,那麼此時我們不希望停止任務,可以配置一個允許任務失敗的閥值(百分比),此時不會觸發Job失敗。 通過``mapreduce.map.failures.maxpercent``、``mapreduce.reduce.failures.maxpercent``設定。 3. 還有一個情況是任務嘗試被kill,這種情況``Application Master``制動標記``killed``不屬於任務失敗。 > 推測機制(Speculative Execution),如果發現task執行的時間執行速度明顯慢於平均水平,就會在其他的結點啟動一個相同的任務,稱為推測執行。 這個不一定有效,僅僅是投機性的嘗試。 當任務成功完成時,任何正在執行的重複任務都將被終止,因為不再需要它們。 就是推測任務與原始任務誰能上位就看誰先完成了。 --- #### Application Master 失敗 當遇到``Application Master``失敗時,``Yarn``也會進行嘗試。 可以通過配置``mapreduce.am.max-attempts property``(預設:2)配置重試次數, 同時,``Yarn``對於叢集中執行的``Application Master``最大嘗試次數加了限制,也需要通過`` yarn.resourcemanager.am.max-attempts``(預設:2)進行配置。 重試的流程如下: ``Application Master`` 向``Resource Manager``傳送心跳,如果``Application Master``發生故障,``Resource Manager``將檢測故障,並在新的容器中啟動執行``Application Master``的新例項 > 在MapReduce,它可以使用作業歷史記錄來恢復(失敗的)應用程式已經執行的任務的狀態,這樣它們就不必重新執行。 預設情況下恢復是啟用的,但是可以通過設定``yarn.app.mapreduce.am.job.recovery``來禁用。
``MapReduce client``輪詢``Application Master``的進度報告, 但如果它的``Application Master``失敗,客戶端需要定位新的例項。 在``Job``初始化期間,``client``向``Resource Manager``請求``Application Master``的地址,然後對其進行快取,這樣在每次需要輪詢``Application Master``時, 就不會向``Resource Manager``發出請求,從而使``Resource Manager``負擔過重。 但是,如果``Application Master``失敗,``client``將發出狀態更新時超時,此時``client``將向``Resource Manager``請求新的``Application Master``的地址。 --- ##### Node Manager 失敗 如果``Node Manager``因崩潰或執行緩慢而發生故障,它將停止向``Resource Manager``傳送心跳(或傳送頻率非常低)。 ``Resource Manager`` 如果在10分鐘內(``yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms ``)沒有收到``Node Manager``的心跳資訊, 就會告訴該``Node Manager``,停止傳送心跳,並將它從自己的``Nodes``池中移除。 在此``Node Manager``失敗的``task`` 或 ``Application Master`` 都會按照之前的說的方式恢復。 此外,即使``map tasks``在失敗的``Node Manager``上執行併成功完成但屬於未完成的``job``, ``Application Master``也會安排它們重新執行, 因為它們的中間輸出駐留在故障``Node Manager``的本地檔案系統上,``reduce``任務可能無法訪問。 如果一個``Node Manager``失敗任務次數過多,該``Node Manager``會被``Application Master``拉入黑名單。 > 對於 MapReduce,如果一個Job在某個``Node Manager``失敗3個任務(`` mapreduce.job.maxtaskfailures.per.tracker``),就會嘗試在其他的結點進行排程。 注意,``Resource Manager``不會跨應用程式執行黑名單(編寫時), 因此來自新作業的任務可能會在壞節點上排程,即使它們已被執行較早作業的應用程式主程式列入黑名單。 --- ##### Resource Manager 失敗 ``Resource Manager``失敗是很嚴重的,一旦它失敗, ``job``和``task``容器都無法啟動。 在預設配置中,``Resource Manager``是一個單故障點,因為在(不太可能的)機器故障的情況下,所有正在執行的作業都失敗了並且無法恢復。 要實現高可用性(HA),必須在一個``active-standby``配置中執行一對``Resource Manager``。 如果 ``active Resource Manager``發生故障,則``standby Resource Manager``可以接管,而不會對``client``造成重大中斷。 通過將執行中的應用程式資訊儲存在高可用的狀態儲存區中(通過``ZooKeeper/HDFS``備份),實現``standby Resource Manager ``恢復``active Resource Manager``(失敗)的關鍵狀態。 ``Node Manager`` 資訊沒有儲存在狀態儲存中,因為當``Node Manager`` 發出第一次心跳時,新的``Resource Manager``可以相對較快地對其進行重構。 > 因為``task``由``Application Master``管理,所以``task``不屬於``Resource Manager``的狀態,因此於``Resource Manager``儲存的狀態比``jobtracker``中的狀態更容易管理。 目前,有兩種持久化RMStateStore的方式,分別為:``FileSystemRMStateStore``和``ZKRMStateStore``。 整體架構如下: ![](https://img2020.cnblogs.com/blog/1293390/202101/1293390-20210129182120677-1528908421.png) 我們可以通過手動或自動重啟``ResourceManager``。 被提升為active 狀態的ResourceManager載入ResourceManager內部狀態,並根據ResourceManager restart特性儘可能從上一個active Resource Manager 離開的地方繼續操作。 對於以前提交給ResourceManager的每個託管Application,都會產生一個新的嘗試。 應用程式可以定期checkpoint,以避免丟失任何工作。 狀態儲存必須在兩個Active/Standby Resource Managers中都可見。 從上圖可以看到我們可以選擇的狀態儲存介質有兩個``FileSystemRMStateStore`` 和 ``ZKRMStateStore``。 ``ZKRMStateStore``隱式地允許在任何時間點對單``ResourceManagers``進行寫訪問, 因此在HA叢集中推薦使用ZKRMStateStore。 在使用``ZKRMStateStore``時,不需要單獨的防禦機制來解決可能出現的腦裂情況,即多個``Resource Manager``可能扮演``active``角色。 並且``ResourceManager``可以選擇嵌入基於``zookeeper``的``ActiveStandbyElector``來決定哪個``Resource Manager``應該是``active``的。 當``active``的``ResourceManager``關閉或失去響應時,另一個``Resource Manager``會自動被選為``active``,然後由它接管。 >注意,不需要像HDFS那樣執行一個單獨的``ZKFC``守護程序,因為嵌入在``Resource Manager``中的``ActiveStandbyElector``充當故障檢測器和``leader elector``,所以不需要單獨的``ZKFC``守護程序。 --- 關於Yarn的內容就介紹到這裡,更詳細的內容可以參考[官網](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html) 之後會更新一些Hdfs讀寫的原始碼追蹤相關文章,有興趣可以關注【兔八哥