1. 程式人生 > >Flink on Yarn模式啟動流程原始碼分析

Flink on Yarn模式啟動流程原始碼分析

此文已由作者嶽猛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從原始碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。

--> 1.命令列啟動yarn session

bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
我們去看下啟動指令碼

  $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR" $log_setting  org.apache.flink.yarn.cli.FlinkYarnSessionCli  -j "$FLINK_LIB_DIR"/flink-dist*.jar "
[email protected]
"

主要是用java -cp的方式啟動主類** *org.apache.flink.yarn.cli.FlinkYarnSessionCli * , [email protected] 就是我們傳入的哪些引數 " -n 3 -jm 1024 -nm 1024 -st" **。

1. FlinkYarnSessionCli 的啟動流程分析

首先看下Main函式

public static void main(String[] args) {   
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
   System.exit(cli.run(args));
}

主要是構造FlinkYarnSessionCli,然後執行其run方法,這裡主要介紹主要流程的程式碼。

public int run(String[] args)

  • 1.解析命令列引數

    cmd = parser.parse(options, args)
  • 2.根據命令列引數決定執行那種模式。

     # 第一種,判斷命令是否包含 -q



** 如: **


# 第二種,判斷是否有-id引數


這裡我們看下互動模式是啥樣的,共有兩個可選項,help和stop,如果我們敲入stop,則應用對應的所有程序會退出。


# 第三種,為正常模式




** 這裡主要為構造YarnClusterDescriptor,然後呼叫其deploy方法啟動叢集 ,接著將Jobmanager和web ui地址寫入到out檔案中去,如果採用分離模式,則等待叢集啟動之後yarn session自動退出,如果不是則進入互動模式,我們可以通過互動控制這個Applitcation **


接著看下是如何構造YarnClusterDescriptor的

----------------- **1   creat  YarnClusterDescriptor  ** ----------------------

直接new YarnClusterDescriptor物件,然後將依賴jar地址,配置引數如taskmanager個數,jar地址,配置檔案地址,配置引數等設定到YarnClusterDescriptor物件中去,然後返回這個物件。

------------** 2 YarnClusterDescriptor deploy  ** -------------------------

由於YarnClusterDescriptor沒有重寫depoy方法則直接呼叫其父類AbstractYarnClusterDescriptor的deploy方法,但是最終呼叫的是其deployInternal方法.

接著看下deployInternal方法,簡單的描述下流程,後續程式碼分析下面的github地址

  • 檢查是否具備Deploy的條件,如配置檔案,jar路徑是否為空

  • 獲取yarn的client,使用者和RM進行通訊

  • 增加動態的配置屬性到配置conf物件中去,解析配置conf物件為kv對

  • 獲取HDFS FileSyetem,這裡用於將本地jar及配置檔案上傳到HDFS,

  • 判斷JobManager和TaskManager申請的資源是否滿足yarn分配單個container的最小分配,如果小於則將container最小分配用來初始化jobMananger和TaskMananer

  • 通過yarn client建立Application,返回GetNewApplicationResponse物件用於跟RM進行RPC通訊。

  • 通過GetNewApplicationResponse物件獲取RM能夠為這個應用分配的最大資源,如果最大資源不能夠滿足jobManagerMemoryMb和taskManagerMemoryMb則報錯,計算總的jobmanager和所有taskmanager總共需要的資源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),計算RM中總共的空閒資源,判斷空閒資源是否滿足前面計算需要的需求,如果不滿足,則可能先啟動yarn session,task manager等到有資源再進行啟動;先為jobManager分配一個nm,然後再在其他的nm上啟動taskmanager

  • 設定啟動ApplicationMaster的 lanchcontext,這裡主要是設定java home,主類,jvm引數數,log檔案配置。ApplicationMaster的主類 YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。

protected Class<?> getApplicationMasterClass() {   
return YarnApplicationMasterRunner.class;
}```
- 設定ApplicationSubmissionContext,獲取ApplicationId
- 設定session需要的hdfs路徑,然後將本地jar包及配置檔案,配置檔案上傳到HDFS
- 設定AM啟動的token資訊,設定AM啟動的過程中需要從hdfs下載那些依賴的jar和配置檔案,設定ApplicationMaster及Flink及其他程序的classpath,不多說
- 設定鉤子函式在deploy的時候清理上傳到hdfs的檔案及本地下載的依賴檔案
- *** 重點,提交Applicaiton到RM;設定這個Application的狀態為NEW,然後監控這個應用,如果不是之前的NEW狀態,則列印當前狀態,如果Running狀態則跳出這個迴圈,如果是其他狀態,則丟擲YarnDeploymentException異常,上層呼叫捕獲處理吧,不然250ms判斷一次 ***
- depoly成功,鉤子函式刪除臨時檔案,如依賴的jar包和配置檔案等,返回YarnClusterClient物件,包含了這YarnClusterDescriptor,ApplicationReport等重要的屬性。
***
***deploy 成功以後進入互動模式,在runInteractiveCli裡面最重要的一步是構造ApplicationClient Actor用於和JobManager Actor進行通訊,但是如果傳送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等訊息,將會由jobmanager actor將forward方法路由到flink resource manager actor去處理,此時jobmanager作為flink resource manager的代理,此時收到這兩個訊息的時候,由於是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接給application client返回訊息***
***
> ------------ ** 3 程式碼展示主要流程**------
![](//upload-images.jianshu.io/upload_images/3249301-d22456f0939a8365.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-9c80aa18467d4e10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-85a2f462ff96e5fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3c81e3dc9b23db0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-bf190e6a72366f0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-57eb01f090d38dd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d548d544dbd1b713.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2013feca33032c46.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d0d8c8c1a56f28ff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-be3a228edeffad9d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)*** ---- ApplicationClient 和JobManager Actor通訊程式碼 --***

![](//upload-images.jianshu.io/upload_images/3249301-56371ec18930ba4f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-ed28091d44dc3906.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1a0df11e1a57941d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-913d82bf6d5825b8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6309a49886d0cc4e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-47f86bcfdee4f967.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a483d4af26931cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-4de478f435cb1356.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-27986c3659bd96cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a27f89acbe3406de.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-f04a3da4f97a08dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)## 2. YarnApplicationMasterRunner 啟動流程分析
*** RM首先分配一個NM的container去啟動YarnApplicationMasterRunner ,接著下來我們看下是怎麼做的***
首先是進入main函式裡面,構造一個YarnApplicationMasterRunner物件,直接呼叫其Run方法。
> run方法主要步驟
- 獲取當前使用者的UGI及遠端UGI
- 將當前使用者ugi裡面的token傳遞到遠端的UGI中,用於資料和服務訪問
- 在遠端的UGI裡面執行runApplicationMaster啟動ApplicationMaster
![](//upload-images.jianshu.io/upload_images/3249301-7d3ac2af1bf091f6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)> runApplicationMaster主要過程,這裡註釋很清楚,我只撿重要的提示下
- 1) load and parse / validate all configurations
- 2) start the actor system,try to start the actor system, JobManager and JobManager actor system
- 3) Generate the configuration for the TaskManagers,這裡主要是JobManager的地址,taskManager註冊的超時時間,slot個數,這裡還有最重要的一步是構造TaskManager的ContainerLaunchContext,這個context裡面包含了啟動TaskManager的啟動命令,***主類是YarnTaskManager***。
-  start the actors and components in this order:  1) JobManager & Archive (in non-HA case, the leader service takes this),啟動JobManagerActor,這裡主類是***YarnJobManager***  2) Web Monitor (we need its port to register) 啟動WEB監控頁面,建立LeaderRetrievalService物件,這個主要用於啟動TaskManager的時候,告訴TaskManager JobManager得akka url,用於TaskManager向JobManager進行註冊。  3) Resource Master for YARN   啟動YarnFlinkResourceManager Actor,這裡主要用於Flink container資源的管理包括申請與釋放等。  4) Process reapers for the JobManager and Resource Master
***這裡主要介紹YarnApplicationMasterRunner 是如何通過YarnFlinkResourceManager去完成container的申請與啟動TaskManager的,這裡相對來說,比較複雜,我跟到Yarn的程式碼裡才算整明白***

![YarnFlinkResourceManager的繼承關係](//upload-images.jianshu.io/upload_images/3249301-cbc8215f8c356913.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)說明YarnFlinkResourceManager其實是一個actor,在runApplicationMaster方法中,通過下面的程式碼啟動這個Actor

Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),//YarnFlinkResourceManager
config,
yarnConfig,
leaderRetriever,
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//啟動YarnFlinkResourceManager actor

接著看下YarnFlinkResourceManager 的構造方法,這裡主要有三個成員變數比較重要

//在yarn 的rm端會呼叫該回物件的回撥函式進行container申請,resourceManagerCallbackHandler裡面只有該actor的actor ref,所以回撥的過程中能夠與該actor進行通訊
/** Callback handler for the asynchronous resourceManagerClient /
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
//AM與RM通訊的client,resourceManagerClient物件持有resourceManagerCallbackHandler
/* Client to communicate with the Resource Manager (YARN's master) /
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
//AM與NM的通訊client
/* Client to communicate with the Node manager and launch TaskManager processes */
private NMClient nodeManagerClient;

YarnFlinkResourceManager 啟動的過程先執行preStart方法,自己沒有實現則執行其父類FlinkResourceManager的preStart方法。接著呼叫YarnFlinkResourceManager 的initialize方法。
> ***在initialize方法裡面***
*** resourceManagerClient.start() ----> 
  AMRMClientAsyncImpl.serviceStart()--->
  CallbackHandlerThread.start()(守護執行緒)--->
YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster為YarnFlinkResourceManager ActorRef) -->
YarnFlinkResourceManager .containersAllocated -->
NMClient.startContainer(container, taskManagerLaunchContext)
至此通知各個NM啟動container。***
![](//upload-images.jianshu.io/upload_images/3249301-3a5b6377e93742b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2c532578dc7a508d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a2cd3067970fe39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e14cfee9289c57bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2e3b73f252030ea8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3239382a6bde777.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-0a52a463c263bbd7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-52cecffb2670cc67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)***至此,YarnApplicationMasterRunner 重要的流程已經說完,細節東西太多,就不再說了,有時間再看,接下來看YarnTaskManager的部分***## 3. YarnTaskManager啟動流程分析***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)將通知NM去啟動container,NM根據taskManagerLaunchContext的啟動資訊,從HDFS下載YarnTaskManager啟動過程依賴的jar和配置檔案
(container_tokens  default_container_executor_session.sh  default_container_executor.sh  flink-conf.yaml  flink.jar  launch_container.sh  lib  log4j.properties  logback.xml),然後shell執行launch_container.sh,最終用java -cp啟動YarnTaskManager程序,啟動程序的時候首先執行YarnTaskManager run方法,TaskManager會拿到JobManager的akka地址,然後傳送註冊訊息,JobManager收到註冊訊息以後,註冊成功之後就傳送ack確認註冊資訊給TaskManager,然後TaskManger根據配置以及JobManager返回過來的資訊構建一些真正幹活的成員變數。過程:***
> 
YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])-->
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)-->
TaskManager.runTaskManager -->
TaskManager.startTaskManagerComponentsAndActor-->
actorSystem.actorOf(tmProps, actorName)-->
TaskManager.preStart-->
StandaloneLeaderRetrievalService.start(TaskManager)-->
TaskManger.notifyLeaderAddress-->
TaskManager.handleJobManagerLeaderAddress-->
TaskManager.triggerTaskManagerRegistration()
TaskManager.handleRegistrationMessage-->
instanceManager.registerTaskManager-->
jobManager 傳送訊息AcknowledgeRegistration給TaskManager
TaskManager.associateWithJobManager-->



![](//upload-images.jianshu.io/upload_images/3249301-8797abe4fead540e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6ada492f0e25718b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-7d9e8a6893af056a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e4116544f6c7a677.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-03e551a6ebf785c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-155d80ccd8b5bed4.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-40bbe88a86f090c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-2226948e829a5add.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-b9c444d706737b6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-3ff9f4e9243c1264.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-08823728261241ec.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![associateWithJobManager](//upload-images.jianshu.io/upload_images/3249301-f3aa8048869975c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1bb10a80436aa199.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-442effae29613f2c.png)###基本上Flink on yarn的流程就是這樣,細節需要深入,有不正確的地方,希望給予指正。




連結:https://www.jianshu.com/p/8a3177095072


免費體驗雲安全(易盾)內容安全、驗證碼等服務


更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 virtualenv簡介以及一個比較折騰的scrapy安裝方法