1. 程式人生 > >Hadoop_YARN資源管理系統原始碼解析

Hadoop_YARN資源管理系統原始碼解析

目錄

一、YARN產生的背景(MRv1的侷限性)

二、YARN原始碼結構

三、YARN基本架構

四、YARN各模組詳細分析

五、MRAppMaster-MapReduce On YARN實現

六、YarnChild-MR引擎啟動入口

七、總結

一、YARN產生的背景(MRv1的侷限性)

YARN 是在 MRv1 基礎上演化而來的,它克服了 MRv1 中的各種侷限性。在正式介紹 YARN 之前,我們先要了解 MRv1 的一些侷限性,這可概括為以下幾個方面:

擴充套件性差。在 MRv1 中,JobTracker 同時兼備了資源管理和作業控制兩個功能,這成為系統的一個最大瓶頸,嚴重製約了Hadoop叢集擴充套件性。

可靠性差。MRv1 採用了Master/Slave結構,其中,Master(JobTracker)存在單點故障問題,一旦它出現故障將導致整個叢集不可用。

資源利用率低。MRv1 採用了基於槽位的資源分配模型,槽位是一種粗粒度的資源 劃分單位,通常一個任務不會用完槽位對應的資源,且其他任務也無法使用這些空 閒資源。此外,Hadoop 將槽位分為 Map Slot 和 Reduce Slot 兩種,且不允許它們之 間共享,常常會導致一種槽位資源緊張而另外一種閒置(比如一個作業剛剛提交時, 只會執行Map Task,此時Reduce Slot 會閒置)。

無法支援多種計算框架。隨著網際網路高速發展,MapReduce這種基於磁碟的離線計算框架已經不能滿足應用要求,從而出現了一些新的計算框架,包括記憶體計算框架、 流式計算框架和迭代式計算框架等,而這些新的計算框架是無法執行在MRv1上的。

為了克服以上幾個缺點,Apache開始嘗試對Hadoop進行升級改造,進而誕生了更加先進的下一代MapReduce計算框架 MRv2。正是由於MRv2將資源管理功能抽象成了一個獨立的通用系統YARN,直接導致下一代MapReduce的核心從單一的計算框架MapReduce轉移為通用的資源管理系統YARN。為了進一步理解以YARN為核心的軟體棧, 我們將之與以MapReduce為核心的軟體棧進行對比,在以MapReduce為核心的軟體棧中,資源管理系統 YARN 是可插拔替換的,比如選擇 Mesos替換YARN,一旦MapReduce介面改變,所有的資源管理系統的實現均需要跟著改變 ;但以 YARN 為核心的 軟體棧則不同,所有框架都需要實現 YARN 定義的對外介面以執行在YARN之上,這意味著Hadoop 2.0可以打造一個以YARN為核心的生態系統。

                         

                圖1 MRV1的基本架構                                                 圖2 YARN作為通用資源管理系統的的基本架構

MRv1主要由兩類服務組成,分別是 JobTracker 和TaskTracker。其中,JobTracker負責叢集資源資源和作業管理,TaskTracker負責單個節點的資源管理和任務管理。MRv1將資源管理和作業管理兩部分混雜在一 起,使得它在擴充套件性、容錯性和多框架支援等方面存在明顯缺陷。而 MRv2 則通過將資源管理和作業管理兩部分剝離開,分別由YARN和ApplicationMaster負責,其中,YARN專管資源管理和排程,而ApplicationMaster則負責應用程式的生命週期。總結:MRv2將MRv1中JobTracker的叢集資源管理功能抽出來變成YARN中的ResourceManager,將MRv1中的TaskTracker的節點資源管理抽出來變成YARN中的NodeManager,將MRv1中JobTracker的作業管理及TaskTracker的任務管理抽出來變成YARN中的ApplicationMaster,使得YARN在擴充套件性、容錯性和多框架支援等方面相比MRv1都有明顯的改進

二、YARN原始碼結構

在 Hadoop 的JAR壓縮包解壓後的目錄 hadoop-{VERSION} 中包含了 Hadoop 全部的 管理指令碼和JAR包,下面簡單對這些檔案或目錄進行介紹。

❑ bin:Hadoop 最基本的管理指令碼和使用指令碼所在目錄,這些指令碼是 sbin 目錄下管理 指令碼的基礎實現,使用者可以直接使用這些指令碼管理和使用 Hadoop。

❑ etc:Hadoop 配置檔案所在的目錄,包括 core-site.xml、hdfs-site.xml、mapred-site. xml 等從 Hadoop 1.0 繼承而來的配置檔案和 yarn-site.xml 等 Hadoop 2.0 新增的配置 檔案。

❑ include:對外提供的程式設計庫標頭檔案(具體動態庫和靜態庫在 lib 目錄中),這些頭文 件均是用 C++ 定義的,通常用於 C++ 語言訪問 HDFS 或者編寫 MapReduce 程式。

❑ lib:該目錄包含了 Hadoop 對外提供的程式設計動態庫和靜態庫,與 include 目錄中的頭 檔案結合使用。

❑ libexec:各個服務對應的 Shell 配置檔案所在目錄,可用於配置日誌輸出目錄、啟 動引數(比如 JVM 引數)等基本資訊。

❑ sbin:Hadoop 管理指令碼所在目錄,主要包含 HDFS 和 YARN 中各類服務的啟動/關閉指令碼。

❑ share:Hadoop 各個模組編譯後的 JAR 包所在目錄。

在 Hadoop 原始碼壓縮包解壓後的目錄 hadoop-{VERSION}-src 中,可看到如圖1

示的目錄結構,其中,比較重要的目錄有 :hadoop-common-project、hadoop-mapreduce- project、hadoop-hdfs-project 和 hadoop-yarn-project 等,下面分別介紹這幾個目錄的作用。

❑ hadoop-common-project:Hadoop 基礎庫所在目錄,該目錄中包含了其他所有模組可 能會用到的基礎庫,包括 RPC、Metrics、Counter 等。  

                                                                     

                                                                       圖3 Hadoop原始碼目錄結構

❑ hadoop-mapreduce-project :MapReduce 框架的實現,在 MRv1 中,MapReduce 由程式設計模 型(map/reduce)、排程系統(JobTracker 和 TaskTracker)和資料處理引擎(MapTask 和 ReduceTask)等模組組成,而此處的 MapReduce 則不同於 MRv1 中的實現,它的資 源排程功能由新增的 YARN 完成(程式設計模型和資料處理引擎不變),自身僅包含非 常簡單的任務分配功能。

❑ hadoop-hdfs-project :Hadoop 分散式檔案系統實現,不同於 Hadoop 1.0 中單 NameNode 實現,Hadoop 2.0 支援多 NameNode,同時解決了 NameNode 單點故障問題。

❑ hadoop-yarn-project :Hadoop 資源管理系統 YARN 實現。這是 Hadoop 2.0 新引入的 分支,該系統能夠統一管理系統中的資源,並按照一定的策略分配給各個應用程式。

下面就對Hadoop YARN原始碼組織結構進行介紹。

總體上看,Hadoop YARN 分為 5 部分 :API、Common、Applications、Client 和 Server, 它們的內容具體如下:

❑ YARN API(hadoop-yarn-api 目錄):給出了 YARN 內部涉及的 4 個主要 RPC 協議的 Java 宣告和 Protocol Buffers 定義,這 4 個 RPC 協議分別是 ApplicationClientProtocol、 ApplicationMasterProtocol、ContainerManagementProtocol 和 ResourceManagerAdministrationProtocol。

❑ YARN Common(hadoop-yarn-common 目錄):該部分包含了 YARN 底層庫實現, 包括事件庫、服務庫、狀態機庫、Web 介面庫等。

❑ YARN Applications(hadoop-yarn-applications 目錄):該部分包含了兩個 Application 程式設計例項,分別是 distributedshell 和 Unmanaged AM。

❑ YARN Client(hadoop-yarn-client 目錄):該部分封裝了幾個與 YARN RPC 協議互動 相關的庫,方便使用者開發應用程式。

❑ YARN Server(hadoop-yarn-server 目錄):該部分給出了 YARN 的核心實現,包括ResourceManager、NodeManager、資源管理器等核心元件的實現。

                                                               

                                                                圖4 Hadoop YARN目錄組織結構

三、YARN基本架構

        YARN是Hadoop 2.0 中的資源管理系統,它的基本設計思想是將MRv1中的JobTracker拆分成了兩個獨立的服務 :一個全域性的資源管理器ResourceManager和每個應用程式特有的     ApplicationMaster。其中ResourceManager負責全域性的資源管理及排程,而 ApplicationMaster負責單個應用程式的生命週期管理。

3.1、YARN基本組成結構

       YARN 總體上仍然是 Master/Slave 結構,在整個資源管理框架中,ResourceManager 為 Master,NodeManager 為 Slave,ResourceManager 負責對各個 NodeManager 上的資源進行 統一管理和排程。當用戶提交一個應用程式時,需要提供一個用以跟蹤和管理這個程式的 ApplicationMaster,它負責向 ResourceManager 申請資源,並要求 NodeManger 啟動可以佔 用一定資源的任務。由於不同的 ApplicationMaster 被分佈到不同的節點上,因此它們之間 不會相互影響。在本小節中,我們將對 YARN 的基本組成結構進行介紹。

        圖 2-9 描述了 YARN 的基本組成結構,YARN 主要由 ResourceManager、NodeManager、 ApplicationMaster(圖中給出了 MapReduce 和 MPI 兩種計算框架的 ApplicationMaster,分 別為 MR AppMstr 和 MPI AppMstr)和 Container 等幾個元件構成。

                                                        

1. ResourceManager(RM)

RM是一個全域性的資源管理器,負責整個叢集的資源管理與排程(個人理解類似於MRv1時的JobTracker,只是把JobTracker作業管理相關的這塊移到了ApplicationMaster中,只負責純粹通用的資源管理與排程,做到了計算框架無關)。主要功能是包括資源管理與排程、ApplicationMaster管理(啟動應用程式的AM)、NodeManager管理等。

❑ 與客戶端進行互動,處理來自於客戶端的請求,如提交應用程式、查詢應用的執行情況等。

❑ 啟動和管理各個應用程式的ApplicationMaster,並且為ApplicationMaster申請第一個Container用於啟動和在它執行失敗時將它重新啟動。

❑ 管理NodeManager,接收來自NodeManager的資源和節點健康情況彙報,並向NodeManager下達資源管理命令,例如kill掉某個container。

❑ 資源管理與排程,接收來自ApplicationMaster的資源申請請求,併為之分配資源。這個是它最重要的職能。YARN提供了多種直接可用的排程器,比如Fair Scheduler和Capacity Scheduler。

2. ApplicationMaster(AM)

AM是一個應用程式管理器,負責一個應用程式的生命週期管理(管理Job中的各個task,申請資源並管理Job內的資源分配)。使用者提交的每個應用程式均包含一個 AM,主要功能包括: 

❑ 與 RM 排程器協商以獲取資源(用 Container 表示);

❑ 將得到的資源進一步分配給內部的任務;

❑ 與 NM 通訊以啟動 / 停止任務;

❑ 監控所有任務執行狀態,並在任務執行失敗時重新為任務申請資源以重啟任務。

當 前 YARN 自 帶 了 兩 個 AM 實 現, 一 個 是 用 於 演 示 AM 編 寫 方 法 的 實 例 程 序

distributedshell,它可以申請一定數目的 Container 以並行執行一個 Shell 命令或者 Shell 指令碼 ; 另一個是執行 MapReduce 應用程式的 AM— MRAppMaster。此外,一些其他的計算框架對應的 AM 正在開發中,比如 Open MPI、Spark 等  。

3. NodeManager(NM)

NM 是單個節點上的資源管理器(個人理解類似於MRv1時的TaskTracker,只是把TaskTracker任務管理這塊移到了ApplicationMaster中,只負責純粹通用的資源管理與排程,做到了計算框架無關),一方面,它會定時地向 RM 彙報本節點上的 資源使用情況和各個 Container 的執行狀態;另一方面,它接收並處理來自 AM 的 Container 啟動 / 停止等各種請求。

4. Container

Container 是 YARN 中的資源抽象,它封裝了某個節點上的多維度資源,如記憶體、 CPU、磁碟、網路等,當 AM 向 RM 申請資源時,RM 為 AM 返回的資源便是用 Container 表示的。YARN 會為每個任務分配一個 Container,且該任務只能使用該 Container 中描述的 資源。需要注意的是,Container 不同於 MRv1 中的 slot,它是一個動態資源劃分單位,是 根據應用程式的需求動態生成的。YARN 目前僅支援 CPU 和記憶體兩種資源, 且使用了輕量級資源隔離機制 Cgroups 進行資源隔離   。

3.2、YARN通訊協議

       RPC 協議是連線各個元件的“大動脈”,瞭解不同元件之間的 RPC 協議有助於我們更 深入地學習 YARN 框架。在 YARN 中,任何兩個需相互通訊的元件之間僅有一個 RPC 協 議,而對於任何 一個 RPC 協議,通訊雙方有一端是 Client,另一端為 Server,且 Client 總 是主動連線 Server 的,因此,YARN 實際上採用的是拉式(pull-based)通訊模型。如圖 2-10 所示,箭頭指向的元件是 RPC Server,而箭頭尾部的元件是 RPC Client,YARN 主要由以 下幾個 RPC 協議組成 :

❑ JobClient(作業提交客戶端)與 RM 之間的協議 — ApplicationClientProtocol : JobClient 通過該 RPC 協議向RM提交應用程式、查詢應用程式狀態等。

❑ Admin(管理員)與 RM 之間的通訊協議— ResourceManagerAdministrationProtocol : Admin 通過該 RPC 協議更新系統配置檔案,比如節點黑白名單、使用者佇列許可權等。

❑ AM 與 RM 之間的協議— ApplicationMasterProtocol :AM 通過該 RPC 協議向 RM 註冊和撤銷自己,併為各個任務申請資源。

❑ AM 與 NM 之間的協議 — ContainerManagementProtocol :AM 通過該 RPC 要求 NM 啟動或者停止Container,獲取各個Container的使用狀態等資訊。

❑ NM 與 RM 之間的協議— ResourceTracker :NM 通過該 RPC 協議向 RM 註冊,並 定時傳送心跳資訊彙報當前節點的資源使用情況和Container執行情況。

                                                 

                                                                            圖5 Hadoop YARN的RPC協議

為了提高 Hadoop 的向後相容性和不同版本之間的相容性,YARN 中的序列化框架採用 了 Google 開源的 Protocol Buffers。

3.3、YARN工作流程

       執行在 YARN 上的應用程式主要分為兩類 :短應用程式和長應用程式,其中,短應用程式是指一定時間內(可能是秒級、分鐘級或小時級,儘管天級別或者更長時間的也存在,但非常少)可執行完成並正常退出的應用程式,比如 MapReduce 作業、Tez DAG 作業等,長應用程式是指不出意外,永不終止執行的應用程式,通常是一些服務,比如 Storm Service(主要包括 Nimbus 和 Supervisor 兩類服務),HBase Service(包括 Hmaster 和 RegionServer 兩類服務)   等,而它們本身作為一個框架提供了程式設計介面供使用者使用。儘管這兩類應用程式作用不同,一類直接執行資料處理程式,一類用於部署服務(服務之上再執行資料處理程式),但執行在 YARN 上的流程是相 同的。當用戶向 YARN 中提交一個應用程式後,YARN 將分兩個階段執行該應用程式 :第一 個階段是啟動 ApplicationMaster ;第二個階段是由 ApplicationMaster 建立應用程式,為它 申請資源,並監控它的整個執行過程,直到執行完成。如圖 2-11 所示,YARN 的工作流程 分為以下幾個步驟:

                                                   

                                                                          圖6 Hadoop YARN的工作流程

步驟 1 使用者向 YARN 中提交應用程式,其中包括 ApplicationMaster 程式、啟動 ApplicationMaster 的命令、使用者程式等。

步驟 2 ResourceManager 為該應用程式分配第一個 Container,並與對應的 Node- Manager 通訊,要求它在這個 Container 中啟動應用程式的 ApplicationMaster。

步驟 3 ApplicationMaster 首先向 ResourceManager 註冊,這樣使用者可以直接通過 ResourceManage 檢視應用程式的執行狀態,然後它將為各個任務申請資源,並監控它的運 行狀態,直到執行結束,即重複步驟 4~7。

步驟 4 ApplicationMaster 採用輪詢的方式通過 RPC 協議向 ResourceManager 申請和 領取資源。

步驟 5 一旦 ApplicationMaster 申請到資源後,便與對應的 NodeManager 通訊,要求 它啟動任務。

步驟 6 NodeManager 為任務設定好執行環境(包括環境變數、JAR 包、二進位制程式 等)後,將任務啟動命令寫到一個指令碼中,並通過執行該指令碼啟動任務。

步驟 7 各個任務通過某個 RPC 協議向 ApplicationMaster 彙報自己的狀態和進度,以 讓 ApplicationMaster 隨時掌握各個任務的執行狀態,從而可以在任務失敗時重新啟動任務。

在應用程式執行過程中,使用者可隨時通過 RPC 向 ApplicationMaster 查詢應用程式的當 前執行狀態。

步驟 8 應用程式執行完成後,ApplicationMaster 向 ResourceManager 登出並關閉自己。

四、YARN各模組詳細分析

接下來將分析各個重要模組,ResourceManager、NodeManager、MRAppMaster、YarnChild。如下如所示:

                                  

                                                                                    圖7 Hadoop YARN詳細模組

4.1、ResourceManager

        同MRv1一樣,YARN也採用了Master/Slave結構,其中,Master實現為ResourceManager,負責整個叢集的資源管理與排程;Slave實現為NodeManager,負責單個節點的資源管理與排程及任務啟動。ResourceManager是整個YARN叢集中最重要的元件之一,主要功能是包括資源管理與排程、ApplicationMaster管理(啟動應用程式的AM)、Application管理等、NodeManager管理等。在YARN中,ResourceManager負責叢集中所有資源的統一管理和分配(個人理解類似於MRv1時的JobTracker,只是把JobTracker作業管理這塊移到了ApplicationMaster中,只負責純粹通用的資源管理與排程,做到了計算框架無關),它接收來自各個節點(NodeManager)的資源彙報資訊,並把這些資訊按照一定的策略分配給各個應用程式(ApplicationMaster)。接下來就來分析下RM內部組成的各個功能元件和他們相互的互動方式。

 一、ResourceManager的互動協議與基本職能

 1、ResourceManager互動協議

    在整個Yarn框架中主要涉及到7個協議,分別是ApplicationClientProtocol、MRClientProtocol、ContainerManagementProtocol、ApplicationMasterProtocol、ResourceTracker、LocalizationProtocol、TaskUmbilicalProtocol,這些協議封裝了各個元件互動的資訊。ResourceManager現實功能需要和NodeManager以及ApplicationMaster進行資訊互動,其中涉及到的RPC協議有ResourceTrackerProtocol、ApplicationMasterProtocol和ResourceTrackerProtocol。

  • ResourceTracker

    NodeManager通過該協議向ResourceManager中註冊、彙報節點健康情況以及Container的執行狀態,並且領取ResourceManager下達的重新初始化、清理Container等命令。NodeManager和ResourceManager這種RPC通訊採用了和MRv1類似的“pull模型”(ResourceManager充當RPC server角色,NodeManager充當RPC client角色),NodeManager週期性主動地向ResourceManager發起請求,並且領取下達給自己的命令。

  • ApplicationMasterProtocol

    應用程式的ApplicationMaster同過該協議向ResourceManager註冊、申請和釋放資源。該協議和上面協議同樣也是採用了“pull模型”,其中在RPC機制中,ApplicationMaster充當RPC client角色,ResourceManager充當RPC server角色。

  • ApplicationClientProtocol

          客戶端通過該協議向ResourceManager提交應用程式、控制應用程式(如殺死job)以及查詢應用程式的執行狀態等。在該RPC 協議中應用程式客戶端充當RPC client角色,                                ResourceManager充當RPC server角色。

整理一下ResourceManager與NodeManager、ApplicationMaster和客戶端RPC協議互動的資訊:

                                                  

                                                                         圖8 ResourceManager相關的RPC協議

上圖中的ResourceTrackeService、ApplicationMasterService 、ClientRMService是ResourceManager中處理上述功能的元件,充當RPC_Servser。

2、ResourceManager基本職能

ResourceManager基本職能概括起來就以下幾方面:

  • 與客戶端進行互動,處理來自於客戶端的請求,如提交應用程式、查詢應用的執行情況等。

  • 啟動和管理各個應用程式的ApplicationMaster,並且為ApplicationMaster申請第一個Container用於啟動和在它執行失敗時將它重新啟動。

  • 管理NodeManager,接收來自NodeManager的資源和節點健康情況彙報,並向NodeManager下達資源管理命令,例如kill掉某個container。

  • 資源管理與排程,接收來自ApplicationMaster的資源申請請求,併為之分配資源。這個是它最重要的職能。

二、ResourceManager內部組成架構分析

ResourceManager在底層程式碼實現上將各個功能模組分的比較細,各個模組功能具有很強的獨立性。下圖所示的是ResourceManager中的大概的功能模組組成:

                                            

                                                                                   圖9 ResourceManager內部架構圖

ResourceManager內部架構程式碼概況如下:

public class ResourceManager extends CompositeService implements Recoverable {
  /**
   * Priority of the ResourceManager shutdown hook.
   */
  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
  private static final Log LOG = LogFactory.getLog(ResourceManager.class);
  public static final long clusterTimeStamp = System.currentTimeMillis();
 
  // ApplicationClientProtocol協議的RPC_Server
  private ClientRMService clientRM;
  
  // ContainerManagementProtocol協議的RPC_Client,以啟動應用程式的ApplicationMaster
  private ApplicationMasterLauncher applicationMasterLauncher;
  
  // ResourceTracker協議的RPC_Server
  protected ResourceTrackerService resourceTracker;

  // ApplicationMasterProtocol協議的RPC_Server
  protected ApplicationMasterService masterService;
  
  // 資源管理器
  protected ResourceScheduler scheduler;
  
  private Dispatcher rmDispatcher;
  ...
}

1、使用者互動模組

    使用者互動模組即上圖顯示的User Service管理模組。在這裡邊還可以看到根據不同的使用者型別啟用了不同的服務進行處理,AdminService處理管理員相關請求,ClientRMService處理普通客戶相關請求,這樣使得管理員不會因為普通客戶請求太多而造成堵塞。下面看看這2個服務的具體實現程式碼:

  • ClientRMService

public class ClientRMService extends AbstractService implements
    ApplicationClientProtocol {
  private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>();
  private static final Log LOG = LogFactory.getLog(ClientRMService.class);
  final private AtomicInteger applicationCounter = new AtomicInteger(0);
  final private YarnScheduler scheduler; // 資源排程器
  final private RMContext rmContext; // RM上下文物件,其包含了RM大部分執行時資訊,如節點列表、佇列列表、應用程式列表等
  private final RMAppManager rmAppManager; //應用程式管理器
  private Server server; // ApplicationClientProtocol協議的RPC_Server
  // 訪問控制物件,例如,一些應用程式在提交時設定了檢視許可權的話,其他普通使用者就無法檢視。
  private final ApplicationACLsManager applicationsACLsManager;
  private final QueueACLsManager queueACLsManager;
  @Override
  protected void serviceStart() throws Exception {
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);
    this.server =  // 構建ApplicationClientProtocol協議的RPC_Server
      rpc.getServer(ApplicationClientProtocol.class, this,
            clientBindAddress,
            conf, this.rmDTSecretManager,
            conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 
                YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
    // 啟動ApplicationClientProtocol協議的RPC_Server
    this.server.start();
  }
  @Override
  public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException {
    ApplicationSubmissionContext submissionContext = request
        .getApplicationSubmissionContext();
    ApplicationId applicationId = submissionContext.getApplicationId();
    String user = null;
    try {
      // call RMAppManager to submit application directly
      rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), false, user);
    } catch (YarnException e) {
      LOG.info("Exception in submitting application with id " +
          applicationId.getId(), e);
    }
    SubmitApplicationResponse response = recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
    return response;
  }
  ...
}

從上面ClientRMService的基本程式碼架構我們可以看出:

(1)ClientRMService是一個RPC Server,主要為來自於普通客戶端的各種RPC請求。從程式碼實現的角度看,ClientRMService實現了RPC協議ApplicationClientProtocol。

(2)之前我們已經說了,普通使用者可以通過該服務來獲得正在執行應用程式的相關資訊,如進度情況、應用程式列表等。上面程式碼中都將ResourceManager執行資訊封裝在RMContxt介面中了,下面來看看這個介面的一個實現物件RMContextImpl:

public class RMContextImpl implements RMContext {

  // 中央非同步排程器。RM中的各個服務和元件以及它們處理和輸出的事件型別都是通過中央非同步排程器組織在一起的,這樣可以有效提高系統的吞吐量。

  private final Dispatcher rmDispatcher;

  private final ConcurrentMap<ApplicationId, RMApp> applications // 應用程式列表

    = new ConcurrentHashMap<ApplicationId, RMApp>();

  private final ConcurrentMap<NodeId, RMNode> nodes // 節點列表

    = new ConcurrentHashMap<NodeId, RMNode>();

  private final ConcurrentMap<String, RMNode> inactiveNodes // 非活躍節點列表

    = new ConcurrentHashMap<String, RMNode>();

  private AMLivelinessMonitor amLivelinessMonitor; // 正在執行中的AP心跳監控物件

  private AMLivelinessMonitor amFinishingMonitor; // 執行完畢後的AM心跳監控物件

  private RMStateStore stateStore = null; // 用於儲存ResourceManager執行狀態

  // 用於Container的超時監控,應用程式必須在一定時間內(預設10Min)使用分配到的Container去執行task,否則會被回收

  private ContainerAllocationExpirer containerAllocationExpirer;

  // 下面變數都是與安全管理相關的物件

  private final DelegationTokenRenewer delegationTokenRenewer;

  private final AMRMTokenSecretManager amRMTokenSecretManager;

  private final RMContainerTokenSecretManager containerTokenSecretManager;

  private final NMTokenSecretManagerInRM nmTokenSecretManager;

  private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;

  private ClientRMService clientRMService;

  private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;

...

}
  • AdminService

AdminService和ClientRMService一樣都是作為RPC的服務端,它針對的處理管理員RPC請求,負責訪問許可權的控制,中Yarn中管理員許可權的設定可以在yarn-site.xml中yarn.admi.acl項進行設定,該項的預設值是*,也就是說如果不進行設定的話就當所有的使用者都是管理員。從程式碼上看,它是ResourceManagerAdministrationProtocol協議的一個實現:

public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol {

  private static final Log LOG = LogFactory.getLog(AdminService.class);

  private final Configuration conf;

  private final ResourceScheduler scheduler;

  private final RMContext rmContext;

  private final NodesListManager nodesListManager;



  private final ClientRMService clientRMService;

  private final ApplicationMasterService applicationMasterService;

  private final ResourceTrackerService resourceTrackerService;



  private Server server; // ResourceManagerAdministrationProtocol協議的RPC_Server

  private InetSocketAddress masterServiceAddress;



  @Override

  protected void serviceStart() throws Exception {

    Configuration conf = getConfig();

    YarnRPC rpc = YarnRPC.create(conf);

    this.server = // 構建ResourceManagerAdministrationProtocol協議的RPC_Server

      rpc.getServer(ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,

          conf, null,

          conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,

              YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));

    this.server.start();

    super.serviceStart();

  }

}

AdminService程式碼和ClientRMService比較相似,它各類功能物件也差不多。

2、NodeManager管理模組

    NodeManager管理部分主要是通過ResourceTrackerService、NMLivelinessMonitorNodeListManager這三個服務元件來共同管理NodeManager的生命週期。

1)ResourceTrackerService

    ResourceTrackerService是RPC協議ResourceTracker的一個實現,它作為一個RPC Server端接收NodeManager的RPC請求,請求主要包含2種資訊,註冊NodeManager和處理心跳資訊。ResourceTrackerService具體的核心程式碼如下:

public class ResourceTrackerService extends AbstractService implements ResourceTracker {

  private final RMContext rmContext;

  private final NodesListManager nodesListManager; // NodesListManager

  private final NMLivelinessMonitor nmLivelinessMonitor; // NMLivelinessMonitor

  private final RMContainerTokenSecretManager containerTokenSecretManager;

  private final NMTokenSecretManagerInRM nmTokenSecretManager;



  private long nextHeartBeatInterval;

  private Server server; // ResourceTracker協議的RPC_Server

  private InetSocketAddress resourceTrackerAddress;



  @Override

  protected void serviceStart() throws Exception {

    super.serviceStart();

    Configuration conf = getConfig();

    YarnRPC rpc = YarnRPC.create(conf);

    this.server = // 構建ResourceTracker協議的RPC_Server

      rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,

          conf, null,

          conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,

              YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));

    this.server.start();

  }

  ...

}

NodeManger啟動時第一件事就是像ResourceManager註冊,註冊時NodeManager發給ResourceTrackerService的RPC包主要包含NodeManager所在節點的可用資源總量、對外開放的htpp埠、節點的host和port等資訊,具體程式碼看ResourceTrackerService#registerNodeManager方法:  

@Override

  public RegisterNodeManagerResponse registerNodeManager(

      RegisterNodeManagerRequest request) throws YarnException,

      IOException {

    NodeId nodeId = request.getNodeId(); // NodeManager的NodeID

    String host = nodeId.getHost(); // NodeManager所在節點的host

    int cmPort = nodeId.getPort(); // NodeManager所在節點的port

    int httpPort = request.getHttpPort(); // 對外開放的http埠

    Resource capability = request.getResource(); // 獲得NodeManager所在節點的資源上限

    RegisterNodeManagerResponse response = recordFactory

        .newRecordInstance(RegisterNodeManagerResponse.class);

    response.setContainerTokenMasterKey(containerTokenSecretManager

        .getCurrentKey());

    response.setNMTokenMasterKey(nmTokenSecretManager

        .getCurrentKey());   

    RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,

        resolve(host), capability);

    RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);

    if (oldNode == null) {

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMNodeEvent(nodeId, RMNodeEventType.STARTED));

    } else {

      LOG.info("Reconnect from the node at: " + host);

      this.nmLivelinessMonitor.unregister(nodeId);

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMNodeReconnectEvent(nodeId, rmNode));

    }

    this.nmTokenSecretManager.removeNodeKey(nodeId);

    this.nmLivelinessMonitor.register(nodeId);

    response.setNodeAction(NodeAction.NORMAL);

    response.setRMIdentifier(ResourceManager.clusterTimeStamp);

    return response;

  }

ResourceTrackerService另外一種功能就是處理心跳資訊了,當NodeManager啟動後,它會週期性地呼叫RPC函式ResourceTracker#nodeHeartbeat彙報心跳,心跳資訊主要包含該節點的各個Container的執行狀態、正在執行的Application列表、節點的健康狀況等,隨後ResourceManager為該NodeManager返回需要釋放的Container列表、Application列表等資訊。其中心跳資訊處理的流程:首先,從NodeManager發來的心跳包中獲得節點的狀態狀態資訊,然後檢測該節點是否已經註冊過,然後檢測該節點的host名稱是否合法,例如是否在excluded列表中,然後再檢測該次心跳是不是第一次心跳資訊,這點非常重要,因為關係到心跳的重複傳送與應答的相關問題。其實ResourceTrackerService和NodeManager的心跳處理機制和之前Hadoop1.x中的JobTracker與TaskTacker之間的心跳處理很相像,再然後,為NodeManager返回心跳應答資訊,最後,向RMNode傳送該NodeManager的狀態資訊並且儲存最近一次心跳應答資訊。再具體看看ResourceTrackerService#nodeHeart方法: 

@Override

  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)

      throws YarnException, IOException {

    //從RPC Clinet中獲得nodeManager所在節點的健康狀況

    NodeStatus remoteNodeStatus = request.getNodeStatus();

    NodeId nodeId = remoteNodeStatus.getNodeId();

    // 1. Check if it's a registered node

    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);

    if (rmNode == null) {

      /* node does not exist */

      String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();

      LOG.info(message);

      resync.setDiagnosticsMessage(message);

      return resync;

    }

    // Send ping

    this.nmLivelinessMonitor.receivedPing(nodeId);

    // 2. Check if it's a valid (i.e. not excluded) node

    if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {

      String message =

          "Disallowed NodeManager nodeId: " + nodeId + " hostname: "

              + rmNode.getNodeAddress();

      LOG.info(message);

      shutDown.setDiagnosticsMessage(message);

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));

      return shutDown;

    }

    NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();

    if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse

        .getResponseId()) {

      LOG.info("Received duplicate heartbeat from node "

          + rmNode.getNodeAddress());

      return lastNodeHeartbeatResponse;

    } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse

        .getResponseId()) {

      String message =

          "Too far behind rm response id:"

              + lastNodeHeartbeatResponse.getResponseId() + " nm response id:"

              + remoteNodeStatus.getResponseId();

      LOG.info(message);

      resync.setDiagnosticsMessage(message);

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));

      return resync;

    }

    // Heartbeat response

    NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils

        .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.

            getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,

            nextHeartBeatInterval);

    rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);

    populateKeys(request, nodeHeartBeatResponse);

    // 4. Send status to RMNode, saving the latest response.

    this.rmContext.getDispatcher().getEventHandler().handle(

        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),

            remoteNodeStatus.getContainersStatuses(),

            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));

    return nodeHeartBeatResponse;

  }

(2)NodeListManager

    NodeListManager主要分管黑名單(include列表)和白名單(exlude列表)管理功能,分別有yarnresouecemanager.nodes.include-path和yarnresourcemanager.nodes.exclude-path指定。黑名單列表中的nodes不能夠和RM直接通訊(直接丟擲RPC異常),管理員可以對這兩個列表進行編輯,然後使用$HADOOP_HOME/bin/yarn rmadmin -refreshNodes動態載入修改後的列表,使之生效。

(3)NMLivelinessMonitor

    NMLivelinessMonitor主要是分管心跳異常請求。該服務會週期性地遍歷叢集中的所有NodeManager,如果某個NodeManager在一定時間內(預設10min,可以有引數yarn.nm.liveness-monitor.expiry-interval-ms配置)沒有進行心跳彙報,那麼則認為它已經死掉,同時在該節點上執行的Container也會被置為執行失敗釋放資源。那麼這些被置為失敗的Container是不會直接被RM分配執行的,RM只是負責將這些被置為失敗的Container資訊告訴它們所對應的ApplicationMaster,需不需要重新執行它說的算,如果需要重新執行的話,該ApplicationMaster要重新向RM申請資源,然後由ApplicationMaster與對應的NodeManager通訊以重新執行之前失敗的Container。

3、ApplicationMaster管理模組

ApplicationMaster的管理主要是用ResouceManager內部的3個服務元件來完成:ApplicationMasterLauncher、AMLivelinessMonitor、ApplicationMasterService,他們共同管理ApplicationMaster的生命週期。

(1)在介紹這三個服務元件之前,先說說ApplicationMaster和ResourceManager整個互動流程:

                                                     

                                                           圖10 ApplicationMaster與ResourceManager互動流程

步驟一:

    當ResourceManager接收到客戶端提交應用程式請求時就會立馬向資源管理器申請一個資源用於啟動該應用程式所對應的ApplicationMaster,申請到資源後由ApplicationMasterLaucher與對應的NodeManager進行通訊,要求該NodemManager在其所在節點啟動該ApplicationMaster。

步驟二:

    ApplicationMaster啟動完畢後,ApplicationMasterLuacher通過事件的形式將剛剛啟動的ApplicationMaster註冊到AMLivelinessMonitor,以啟動心跳監控。

步驟三:

    ApplicationMaster啟動後主動向ApplicationMasterService註冊,並將自己所在host、埠等資訊向其彙報。

步驟四:

    ApplicationMaster在執行的過程中不斷向ApplicationMasterService傳送心跳。

步驟五:

    ApplicationMasterService每次收到ApplicationMaster的心跳資訊後,會同時AMLivelinessMonitor更新其最近一次傳送心跳的時間。

步驟六:

    當應用程式執行完畢後,ApplicationMaster向ApplicationMasterService請求登出自己。

步驟七:

    ApplicationMasterService收到登出請求後,會將該應用程式的執行狀態標註為完成,並且同時AMLivelinessMonitor移除對該ApplicationMaster的心跳監控。

(2)接下來詳細介紹下這三個服務元件的一些執行機制

  • ApplicationMasterLauncher

    ApplicationMasterLaucher是以執行緒池方式實現的一個事件處理器,其主要處理AMLaucherEvent型別的事件,包括啟動(LAUNCH)和清除(CLEANUP)一個ApplicationMaster的事件。

    當接收到LAUNCH型別的事件,ApplicationMasterLaucher立馬會和對應的NodeManager進行通訊,並且帶上啟動該ApplicationMaster所需要的各種資訊,包括:啟動命令、JAR包、環境變數等資訊。NodeManager接收到來自ApplicationMasterLaucher的啟動命令就會啟動ApplicationMaster。

    當接收到CLEANUP型別事件,ApplicationMasterLaucher立馬會和對應的NodeManager進行通訊,要求NodeManager殺死該ApplicationMaster,並釋放掉資源。

    ApplicationMasterLauncher相關的核心原始碼如下:

public class ApplicationMasterLauncher extends AbstractService implements

    EventHandler<AMLauncherEvent> {

  private static final Log LOG = LogFactory.getLog(

      ApplicationMasterLauncher.class);

  private final ThreadPoolExecutor launcherPool;

  private LauncherThread launcherHandlingThread;

  private final BlockingQueue<Runnable> masterEvents

    = new LinkedBlockingQueue<Runnable>();

  protected final RMContext context;

  public ApplicationMasterLauncher(RMContext context) {

    super(ApplicationMasterLauncher.class.getName());

    this.context = context;

    this.launcherPool = new ThreadPoolExecutor(10, 10, 1,

        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());

    this.launcherHandlingThread = new LauncherThread();

  }

  private void launch(RMAppAttempt application) {

    Runnable launcher = createRunnableLauncher(application,

        AMLauncherEventType.LAUNCH);

    masterEvents.add(launcher);

  }

  protected Runnable createRunnableLauncher(RMAppAttempt application,

      AMLauncherEventType event) {

    Runnable launcher =

        new AMLauncher(context, application, event, getConfig());

    return launcher;

  }

  private void cleanup(RMAppAttempt application) {

    Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP);

    masterEvents.add(launcher);

  }

  private class LauncherThread extends Thread {

    public LauncherThread() {

      super("ApplicationMaster Launcher");

    }

    @Override

    public void run() {

      while (!this.isInterrupted()) {

        Runnable toLaunch;

        try {

          toLaunch = masterEvents.take();

          launcherPool.execute(toLaunch);

        } catch (InterruptedException e) {

          LOG.warn(this.getClass().getName() + " interrupted. Returning.");

          return;

        }

      }

    }

  }   

接下來再來看看AMLauncher,本質其實就是一個ContainerManagementProtocol協議的RPC_Client,讓NodeManager去啟動ApplicationMaster,相關的核心程式碼如下:

public class AMLauncher implements Runnable {

  private static final Log LOG = LogFactory.getLog(AMLauncher.class);

  private ContainerManagementProtocol containerMgrProxy;  // ContainerManagementProtocol協議的RPC_Client

  private final RMAppAttempt application;

  private final Configuration conf;

  private final AMLauncherEventType eventType;

  private final RMContext rmContext;

  private final Container masterContainer;

  @SuppressWarnings("rawtypes")

  private final EventHandler handler;

  public AMLauncher(RMContext rmContext, RMAppAttempt application,

      AMLauncherEventType eventType, Configuration conf) {

    this.application = application;

    this.conf = conf;

    this.eventType = eventType;

    this.rmContext = rmContext;

    this.handler = rmContext.getDispatcher().getEventHandler();

    this.masterContainer = application.getMasterContainer();

  }
 
  private void connect() throws IOException {

    ContainerId masterContainerID = masterContainer.getId();

    containerMgrProxy = getContainerMgrProxy(masterContainerID);

  }

  protected ContainerManagementProtocol getContainerMgrProxy(

      final ContainerId containerId) {

    final YarnRPC rpc = YarnRPC.create(conf);

    return currentUser

        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {

          @Override

          public ContainerManagementProtocol run() {

            return (ContainerManagementProtocol) rpc.getProxy( // 構建ContainerManagementProtocol協議的RPC_Client

                ContainerManagementProtocol.class,

                containerManagerBindAddress, conf);

          }

        });

  }
  • AMLivelinessMonitor

    AMLivelinessMonitor的功能和NMLivelinessMonitor的功能幾乎一樣,只不過AMLivelinessMonitor監控的是ApplicationMaster,而NMLivelinessMonitor監控的是NodeManager。

    AMLivelinessMonitor會週期性地遍歷叢集中的所有ApplicationMaster,如果某個ApplicationMaster在一定時間內(預設10min,可以有引數yarn.am.liveness-monitor.expiry-interval-ms配置)沒有進行心跳彙報,那麼則認為它已經死掉,同時該ApplicationMaster關聯執行的Container也會被置為執行失敗釋放資源。如果Application執行失敗,則有ResourceManager重新為它申請資源,並且在另外的節點上啟動它(AM啟動嘗試次數由引數yarn.resourcemanager.am.max-attempts控制,預設2)。那麼這些被置為失敗的Container是不會直接被RM分配執行的,RM只是負責將這些被置為失敗的Container資訊告訴它們所對應的ApplicationMaster,需不需要重新執行它說的算,如果需要從新執行的話,該ApplicationMaster要從新向RM申請資源,然後由ApplicationMaster與對應的NodeManager通訊以從新執行之前失敗的Container。

  • ApplicationMasterService

    ApplicationMasterService實現了RPC協議ApplicationMasterProtocol,負責處理來自ApplicationMaster的請求,請求主要包括註冊、心跳、清理三種,其中註冊是ApplicationMaster啟動時發生的行為,請求包中包含:AM所在的節點、RPC埠、tracking url等資訊;心跳是週期性行為,請求包中包含:請求資源的型別描述、待釋放的Container列表等;清理是應用程式執行結束時發生的行為,ApplicationMaster向RM傳送清理應用程式的請求,以回收資源和清理各種記憶體空間。

    ApplicationMasterLauncher啟動AM後,AM做的第一件事是向RM註冊,這是通過RPC函式ApplicationMasterProtocol#registerApplicationMaster實現的。

    AM執行過程中,需要週期性地通過RPC函式ApplicationMasterProtocol#allocate與RM通訊,進行資源請求及告訴RM自己還活著。

    AM執行結束後,需要通過RPC函式ApplicationMasterProtocol#finshApplicationMaster告訴RM自己執行結束,可以回收資源和清理各種資料結果。

    ApplicationMasterService的核心程式碼如下:

public class ApplicationMasterService extends AbstractService implements

    ApplicationMasterProtocol {

  private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);

  private final AMLivelinessMonitor amLivelinessMonitor;

  private YarnScheduler rScheduler;

  private InetSocketAddress bindAddress;

  private Server server; // ApplicationMasterProtocol協議的RPC_Server

  private final RecordFactory recordFactory =

      RecordFactoryProvider.getRecordFactory(null);

  private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =

      new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();

  private final AllocateResponse resync =

      recordFactory.newRecordInstance(AllocateResponse.class);

  private final RMContext rmContext;



  public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {

    super(ApplicationMasterService.class.getName());

    this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();

    this.rScheduler = scheduler;

    this.resync.setAMCommand(AMCommand.AM_RESYNC);

    this.rmContext = rmContext;

  }

  @Override

  protected void serviceStart() throws Exception {

    Configuration conf = getConfig();

    YarnRPC rpc = YarnRPC.create(conf);

    InetSocketAddress masterServiceAddress = conf.getSocketAddr(

        YarnConfiguration.RM_SCHEDULER_ADDRESS,

        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,

        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);

    Configuration serverConf = conf;

    serverConf = new Configuration(conf);

    serverConf.set(

        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,

        SaslRpcServer.AuthMethod.TOKEN.toString());

    this.server =

      rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress, // 構建ApplicationMasterProtocol協議的RPC_Server

          serverConf, this.rmContext.getAMRMTokenSecretManager(),

          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,

              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));

    this.server.start();

    super.serviceStart();

  }

  @Override

  public RegisterApplicationMasterResponse registerApplicationMaster(

      RegisterApplicationMasterRequest request) throws YarnException,

      IOException {

    ApplicationAttemptId applicationAttemptId = authorizeRequest();

    ApplicationId appID = applicationAttemptId.getApplicationId();

    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);

    // Allow only one thread in AM to do registerApp at a time.

    synchronized (lastResponse) {

      this.amLivelinessMonitor.receivedPing(applicationAttemptId);

      RMApp app = this.rmContext.getRMApps().get(appID);

      lastResponse.setResponseId(0);

      responseMap.put(applicationAttemptId, lastResponse);

      this.rmContext

        .getDispatcher()

        .getEventHandler()

        .handle(

          new RMAppAttemptRegistrationEvent(applicationAttemptId, request

            .getHost(), request.getRpcPort(), request.getTrackingUrl()));

      // Pick up min/max resource from scheduler...

      RegisterApplicationMasterResponse response = recordFactory

          .newRecordInstance(RegisterApplicationMasterResponse.class);

      response.setMaximumResourceCapability(rScheduler

          .getMaximumResourceCapability());

      response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)

          .getSubmissionContext().getAMContainerSpec().getApplicationACLs());

      return response;

    }

  }

  @Override

  public AllocateResponse allocate(AllocateRequest request)

      throws YarnException, IOException {

    ApplicationAttemptId appAttemptId = authorizeRequest();

    this.amLivelinessMonitor.receivedPing(appAttemptId);

    /* check if its in cache */

    AllocateResponse lastResponse = responseMap.get(appAttemptId);

    // Allow only one thread in AM to do heartbeat at a time.

    synchronized (lastResponse) {

      // Send the status update to the appAttempt.

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMAppAttemptStatusupdateEvent(appAttemptId, request

              .getProgress()));

      List<ResourceRequest> ask = request.getAskList();

      List<ContainerId> release = request.getReleaseList();  

      // Send new requests to appAttempt.

      Allocation allocation =

          this.rScheduler.allocate(appAttemptId, ask, release,

              blacklistAdditions, blacklistRemovals);

      RMApp app = this.rmContext.getRMApps().get(

          appAttemptId.getApplicationId());

      RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
     
      AllocateResponse allocateResponse =

          recordFactory.newRecordInstance(AllocateResponse.class);

      allocateResponse.setAllocatedContainers(allocation.getContainers());

      allocateResponse.setCompletedContainersStatuses(appAttempt

          .pullJustFinishedContainers());

      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);

      allocateResponse.setAvailableResources(allocation.getResourceLimit());

      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

      return allocateResponse;

    }

  }

  @Override

  public FinishApplicationMasterResponse finishApplicationMaster(

      FinishApplicationMasterRequest request) throws YarnException,

      IOException {

    ApplicationAttemptId applicationAttemptId = authorizeRequest();

    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);

    // Allow only one thread in AM to do finishApp at a time.

    synchronized (lastResponse) {

      this.amLivelinessMonitor.receivedPing(applicationAttemptId);

      rmContext.getDispatcher().getEventHandler().handle(

          new RMAppAttemptUnregistrationEvent(applicationAttemptId, request

              .getTrackingUrl(), request.getFinalApplicationStatus(), request

              .getDiagnostics()));

      if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())

          .isAppSafeToUnregister()) {

        return FinishApplicationMasterResponse.newInstance(true);

      } else {

        return FinishApplicationMasterResponse.newInstance(false);

      }

    }

  }

4、資源排程器(

相關推薦

Hadoop_YARN資源管理系統原始碼解析

目錄 一、YARN產生的背景(MRv1的侷限性) 二、YARN原始碼結構 三、YARN基本架構 四、YARN各模組詳細分析 五、MRAppMaster-MapReduce On YARN實現 六、YarnChild-MR引擎啟動入口 七、總結 一、YARN

java Struts+Hibernate企業人力資源管理系統原始碼下載

一個簡單的人才管理系統 人力資源管理在管理學中是一個非常重要的領域,通過人力資源管理能夠對企業員工進行有效的管理。它的作用已經大大超過了人事管理,正因為如此,這種新型的 管理模式越來越被人們所重視到。有效的人力資源管理是社會各組織、各企業都需要的。在本章將一個Web

【U1結業機試題】新聞內容管理系統解析XML文件讀取Html模版生成網頁文件

repl att not 一個 class 新的 create hashmap exception 一、作業要求: 1.在xml文件中創建新聞節點news,包含標題、作者、日期、正文等信息 2.創建HTML模板文件 3.讀取xml中所有新聞信息,並使用新聞信息替換模板文件中

資源管理系統

方便 對比 gem per 定時任務 splay base 固定 data- RMS(Resource Management System)是基於Tecs Director 和Tecs Openstack的資源管理系統。所處位置如下: 最底層是Tecs OpenStac

你的數據安全麽?Hadoop再曝安全漏洞| 黑客利用Hadoop Yarn資源管理系統未授權訪問漏洞

分布式摘要: 4月30日,阿裏雲發現,俄羅斯黑客利用Hadoop Yarn資源管理系統REST API未授權訪問漏洞進行攻擊。 Hadoop是一款由Apache基金會推出的分布式系統框架,它通過著名的 MapReduce 算法進行分布式處理,Yarn是Hadoop集群的資源管理系統。4月30日,阿裏雲發現,俄

Python_從零開始學習_(30) 名片管理系統原始碼

cards_main.py  (主流程) import cards_tools while True: # 顯示功能選單 cards_tools.show_menu() action_str = input("請選擇希望執行的操作: ") pri

YARN——Hadoop的叢集資源管理系統

     相對於昨天的HDFS,YARN明顯難一些。    聽過大資料的人最多,聽過Hadoop的次之,聽過YARN的再次之。    本文參考官方文件以及百度百科,去除了一些跟上一代的資源排程管理系統的對比,只求略懂一二,為後面的MapReduce任務鋪路。  

靜態資源的虛擬路徑和獨立靜態資源管理系統的實現

一、什麼是虛擬路徑?         舉個例子:上傳一張圖片放到:D://group/29015054169244_投影.png        &nbs

Mybatis 快取系統原始碼解析

本文從以下幾個方面介紹: 相關文章 前言 快取的相關介面 一級快取的實現過程 二級快取的實現過程 如何保證快取的執行緒安全 快取的裝飾器 相關文章 Mybatis 解析 SQL 原始碼分析二 Mybatis Mapper.xml 配置檔案中 res

Backbone繼承系統、事件管理系統原始碼分析

Backbone繼承系統 Backbone的繼承機制使用es3規範,可以相容到ie8。繼承介面十分簡潔。 var childClass = BaseClass.extend(protoProps /* 子類prototype屬性 */, classProps /* 子類靜態屬性

SpringBoot+mybatis+SpringSecurity+redis許可權管理系統原始碼

技術選型 後端 基礎框架:Spring Boot 2.0.4.RELEASE 持久層框架:Mybatis 3.4.5 安全框架: Spring Security 5.0.7 摸板引擎:Thymeleaf 3.0.9.RELEASE 資料庫連線池:阿里巴巴Druid 1.1

SpringBoot+mybatis+Shiro+redis許可權管理系統原始碼

演示地址: http://111.230.157.133/login 技術選型 後端 基礎框架:Spring Boot 2.0.4.RELEASE 持久層框架:Mybatis 3.4.5 安全框架:Apache Shiro 1.4.0 (店鋪裡面有 Spring Se

HRMS(人力資源管理系統)-從單機應用到SaaS應用-系統介紹

上週釋出的《2018,全新出發(全力推動實現住有所居)》文章,其中記錄了個人在這5年過程中的成長和收穫,有幸認識了不少部落格園的朋友,大家一起學習交流,在這個過程當中好多朋友提出SaaS系統如何設計,架構方面如何下手,在這5年的過程中我參與規劃設計了很多的SaaS系統其中有不少的坑和痛苦的經驗,特別

jsp+ssm+mysql實現的投票管理系統原始碼附帶視訊指導配置執行教程

今天給大家演示的是一款由jsp+ssm框架+mysql實現的投票管理系統,系統分為前端和後臺管理模組,前端使用者可以登入註冊、檢視投票資訊,登入後可以進行投票,也可以檢視自己的歷史投票記錄,後臺管理模組管理員登入後可以管理使用者資訊、管理投票主題和子項、檢視投票詳情、檢視投票圖表統計資訊等,目前

HRMS(人力資源管理系統)-從單機應用到SaaS應用-架構分析(功能性、非功能性、關鍵約束)-上篇

一、開篇       上一篇《HRMS(人力資源管理系統)-從單機應用到SaaS應用-系統介紹》我們已經詳細的分析了HRMS系統具備的功能,並且從HRMS系統的概念、系統功能、HR行業管理現狀及痛點、發展趨勢及行業前景、行業內的服務提供商情況、HRMS系統的建設意義及價值等方面進行了系統化的分析梳理。我想大家

HRMS(人力資源管理系統)-從單機應用到SaaS應用-架構分析(功能性、非功能性、關鍵約束)-下篇

一、開篇        本篇主將具體結合HRMS系統進行架構概要分析,按照上篇的理論指導,開展具體的架構分析過程實踐,通過分析找到關鍵功能、關鍵非功能性需求(關鍵質量及約束)等。       在闡述具體的架構工作方法之前,請大家先檢視以下三方面的內容:      1、HRMS系統的介紹?(涵蓋哪些功能

HRMS(人力資源管理系統)-SaaS架構設計-概要設計實踐

一、開篇        本篇主將詳細的闡述架構設計過程中概要架構設計要點來和大家共同交流,掌握後續如何強化概要架構設計在架構設計中作用,幫助我們快速確認架構的方向及核心大框架。       在闡述具體的概要架構工作方法之前,還請大家先參考我們限定的業務場景:      1、HRMS系統的介紹?(涵蓋哪些功能

點位盤系統開發_時間盤搭建_紅酒點位盤微交易系統原始碼解析

微盤系統是也是最近比較流行的一款系統,模擬交易所,根據行情資料API介面,把資料接到自己的程式上,在此大盤資料的基礎上進行玩法開發,比如點位盤系統,根據點位買跌漲,時間盤系統,比較熟悉的就是決勝60秒,

SSH實戰之員工部門資訊管理系統(原始碼+視訊)

  學完SSH後就立馬做了這個專案,對專案的分層有了更加深入的瞭解,這裡有總結的很好的百度知道連結,可以學習。  &emspdao層負責資料庫的增刪改查,service層完成業務邏輯,例如專案中的分頁查詢,在service中完成分頁資訊的獲取,而如何獲取這些資料?就是通過Dao層的基

人力資源管理系統的演化

初期設計目標: 緊密聚焦於人力資源戰略價值 打造面向角色的服務平臺 通過移動與網際網路改變使用習慣 聚焦使用者體驗提升,降低使用者使用負擔 改變產品與使用者互動模式,更貼近使用者 痛點: 模組圖overview: 企業HR管理需求發