1. 程式人生 > >Flink on Yarn模式啟動流程源代碼分析

Flink on Yarn模式啟動流程源代碼分析

www and *** err wap `` dem 註冊 contex

此文已由作者嶽猛授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。


Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從源碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。

--> 1.命令行啟動yarn session

bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
我們去看下啟動腳本

  $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR" $log_setting  org.apache.flink.yarn.cli.FlinkYarnSessionCli  -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

主要是用java -cp的方式啟動主類** *org.apache.flink.yarn.cli.FlinkYarnSessionCli * , $@ 就是我們傳入的哪些參數 " -n 3 -jm 1024 -nm 1024 -st" **。

1. FlinkYarnSessionCli 的啟動流程分析

首先看下Main函數

public static void main(String[] args) {   
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
   System.exit(cli.run(args));
}

主要是構造FlinkYarnSessionCli,然後執行其run方法,這裏主要介紹主要流程的代碼。

public int run(String[] args)

  • 1.解析命令行參數

    cmd = parser.parse(options, args)
  • 2.根據命令行參數決定執行那種模式。

     # 第一種,判斷命令是否包含 -q


技術分享圖片


** 如: **

技術分享圖片


# 第二種,判斷是否有-id參數

技術分享圖片


這裏我們看下交互模式是啥樣的,共有兩個可選項,help和stop,如果我們敲入stop,則應用對應的所有進程會退出。

技術分享圖片


# 第三種,為正常模式


技術分享圖片


技術分享圖片


** 這裏主要為構造YarnClusterDescriptor,然後調用其deploy方法啟動集群 ,接著將Jobmanager和web ui地址寫入到out文件中去,如果采用分離模式,則等待集群啟動之後yarn session自動退出,如果不是則進入交互模式,我們可以通過交互控制這個Applitcation **


接著看下是如何構造YarnClusterDescriptor的

----------------- **1 creat YarnClusterDescriptor ** ----------------------

技術分享圖片

直接new YarnClusterDescriptor對象,然後將依賴jar地址,配置參數如taskmanager個數,jar地址,配置文件地址,配置參數等設置到YarnClusterDescriptor對象中去,然後返回這個對象。

------------** 2 YarnClusterDescriptor deploy ** -------------------------

由於YarnClusterDescriptor沒有重寫depoy方法則直接調用其父類AbstractYarnClusterDescriptor的deploy方法,但是最終調用的是其deployInternal方法.

技術分享圖片

接著看下deployInternal方法,簡單的描述下流程,後續代碼分析下面的github地址

  • 檢查是否具備Deploy的條件,如配置文件,jar路徑是否為空

  • 獲取yarn的client,用戶和RM進行通信

  • 增加動態的配置屬性到配置conf對象中去,解析配置conf對象為kv對

  • 獲取HDFS FileSyetem,這裏用於將本地jar及配置文件上傳到HDFS,

  • 判斷JobManager和TaskManager申請的資源是否滿足yarn分配單個container的最小分配,如果小於則將container最小分配用來初始化jobMananger和TaskMananer

  • 通過yarn client創建Application,返回GetNewApplicationResponse對象用於跟RM進行RPC通信。

  • 通過GetNewApplicationResponse對象獲取RM能夠為這個應用分配的最大資源,如果最大資源不能夠滿足jobManagerMemoryMb和taskManagerMemoryMb則報錯,計算總的jobmanager和所有taskmanager總共需要的資源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),計算RM中總共的空閑資源,判斷空閑資源是否滿足前面計算需要的需求,如果不滿足,則可能先啟動yarn session,task manager等到有資源再進行啟動;先為jobManager分配一個nm,然後再在其他的nm上啟動taskmanager

  • 設置啟動ApplicationMaster的 lanchcontext,這裏主要是設置java home,主類,jvm參數數,log文件配置。ApplicationMaster的主類 YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。

protected Class<?> getApplicationMasterClass() {   
return YarnApplicationMasterRunner.class;
}```
- 設置ApplicationSubmissionContext,獲取ApplicationId
- 設置session需要的hdfs路徑,然後將本地jar包及配置文件,配置文件上傳到HDFS
- 設置AM啟動的token信息,設置AM啟動的過程中需要從hdfs下載那些依賴的jar和配置文件,設置ApplicationMaster及Flink及其他進程的classpath,不多說
- 設置鉤子函數在deploy的時候清理上傳到hdfs的文件及本地下載的依賴文件
- *** 重點,提交Applicaiton到RM;設置這個Application的狀態為NEW,然後監控這個應用,如果不是之前的NEW狀態,則打印當前狀態,如果Running狀態則跳出這個循環,如果是其他狀態,則拋出YarnDeploymentException異常,上層調用捕獲處理吧,不然250ms判斷一次 ***
- depoly成功,鉤子函數刪除臨時文件,如依賴的jar包和配置文件等,返回YarnClusterClient對象,包含了這YarnClusterDescriptor,ApplicationReport等重要的屬性。
***
***deploy 成功以後進入交互模式,在runInteractiveCli裏面最重要的一步是構造ApplicationClient Actor用於和JobManager Actor進行通信,但是如果發送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等消息,將會由jobmanager actor將forward方法路由到flink resource manager actor去處理,此時jobmanager作為flink resource manager的代理,此時收到這兩個消息的時候,由於是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接給application client返回消息***
***
> ------------ ** 3 代碼展示主要流程**------
![](//upload-images.jianshu.io/upload_images/3249301-d22456f0939a8365.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-9c80aa18467d4e10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-85a2f462ff96e5fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3c81e3dc9b23db0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-bf190e6a72366f0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-57eb01f090d38dd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d548d544dbd1b713.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2013feca33032c46.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d0d8c8c1a56f28ff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-be3a228edeffad9d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)*** ---- ApplicationClient 和JobManager Actor通信代碼 --***

![](//upload-images.jianshu.io/upload_images/3249301-56371ec18930ba4f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-ed28091d44dc3906.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1a0df11e1a57941d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-913d82bf6d5825b8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6309a49886d0cc4e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-47f86bcfdee4f967.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a483d4af26931cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-4de478f435cb1356.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-27986c3659bd96cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a27f89acbe3406de.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-f04a3da4f97a08dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)## 2. YarnApplicationMasterRunner 啟動流程分析
*** RM首先分配一個NM的container去啟動YarnApplicationMasterRunner ,接著下來我們看下是怎麽做的***
首先是進入main函數裏面,構造一個YarnApplicationMasterRunner對象,直接調用其Run方法。
> run方法主要步驟
- 獲取當前用戶的UGI及遠端UGI
- 將當前用戶ugi裏面的token傳遞到遠端的UGI中,用於數據和服務訪問
- 在遠端的UGI裏面執行runApplicationMaster啟動ApplicationMaster
![](//upload-images.jianshu.io/upload_images/3249301-7d3ac2af1bf091f6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)> runApplicationMaster主要過程,這裏註釋很清楚,我只撿重要的提示下
- 1) load and parse / validate all configurations
- 2) start the actor system,try to start the actor system, JobManager and JobManager actor system
- 3) Generate the configuration for the TaskManagers,這裏主要是JobManager的地址,taskManager註冊的超時時間,slot個數,這裏還有最重要的一步是構造TaskManager的ContainerLaunchContext,這個context裏面包含了啟動TaskManager的啟動命令,***主類是YarnTaskManager***。
-  start the actors and components in this order:  1) JobManager & Archive (in non-HA case, the leader service takes this),啟動JobManagerActor,這裏主類是***YarnJobManager***  2) Web Monitor (we need its port to register) 啟動WEB監控頁面,創建LeaderRetrievalService對象,這個主要用於啟動TaskManager的時候,告訴TaskManager JobManager得akka url,用於TaskManager向JobManager進行註冊。  3) Resource Master for YARN   啟動YarnFlinkResourceManager Actor,這裏主要用於Flink container資源的管理包括申請與釋放等。  4) Process reapers for the JobManager and Resource Master
***這裏主要介紹YarnApplicationMasterRunner 是如何通過YarnFlinkResourceManager去完成container的申請與啟動TaskManager的,這裏相對來說,比較復雜,我跟到Yarn的代碼裏才算整明白***

![YarnFlinkResourceManager的繼承關系](//upload-images.jianshu.io/upload_images/3249301-cbc8215f8c356913.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)說明YarnFlinkResourceManager其實是一個actor,在runApplicationMaster方法中,通過下面的代碼啟動這個Actor

Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),//YarnFlinkResourceManager
config,
yarnConfig,
leaderRetriever,
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//啟動YarnFlinkResourceManager actor

接著看下YarnFlinkResourceManager 的構造方法,這裏主要有三個成員變量比較重要

//在yarn 的rm端會調用該回對象的回調函數進行container申請,resourceManagerCallbackHandler裏面只有該actor的actor ref,所以回調的過程中能夠與該actor進行通信
/** Callback handler for the asynchronous resourceManagerClient /
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
//AM與RM通信的client,resourceManagerClient對象持有resourceManagerCallbackHandler
/* Client to communicate with the Resource Manager (YARN‘s master) /
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
//AM與NM的通信client
/* Client to communicate with the Node manager and launch TaskManager processes */
private NMClient nodeManagerClient;

YarnFlinkResourceManager 啟動的過程先執行preStart方法,自己沒有實現則執行其父類FlinkResourceManager的preStart方法。接著調用YarnFlinkResourceManager 的initialize方法。
> ***在initialize方法裏面***
*** resourceManagerClient.start() ----> 
  AMRMClientAsyncImpl.serviceStart()--->
  CallbackHandlerThread.start()(守護線程)--->
YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster為YarnFlinkResourceManager ActorRef) -->
YarnFlinkResourceManager .containersAllocated -->
NMClient.startContainer(container, taskManagerLaunchContext)
至此通知各個NM啟動container。***
![](//upload-images.jianshu.io/upload_images/3249301-3a5b6377e93742b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2c532578dc7a508d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a2cd3067970fe39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e14cfee9289c57bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2e3b73f252030ea8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3239382a6bde777.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-0a52a463c263bbd7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-52cecffb2670cc67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)***至此,YarnApplicationMasterRunner 重要的流程已經說完,細節東西太多,就不再說了,有時間再看,接下來看YarnTaskManager的部分***## 3. YarnTaskManager啟動流程分析***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)將通知NM去啟動container,NM根據taskManagerLaunchContext的啟動信息,從HDFS下載YarnTaskManager啟動過程依賴的jar和配置文件
(container_tokens  default_container_executor_session.sh  default_container_executor.sh  flink-conf.yaml  flink.jar  launch_container.sh  lib  log4j.properties  logback.xml),然後shell執行launch_container.sh,最終用java -cp啟動YarnTaskManager進程,啟動進程的時候首先執行YarnTaskManager run方法,TaskManager會拿到JobManager的akka地址,然後發送註冊消息,JobManager收到註冊消息以後,註冊成功之後就發送ack確認註冊信息給TaskManager,然後TaskManger根據配置以及JobManager返回過來的信息構建一些真正幹活的成員變量。過程:***
> 
YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])-->
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)-->
TaskManager.runTaskManager -->
TaskManager.startTaskManagerComponentsAndActor-->
actorSystem.actorOf(tmProps, actorName)-->
TaskManager.preStart-->
StandaloneLeaderRetrievalService.start(TaskManager)-->
TaskManger.notifyLeaderAddress-->
TaskManager.handleJobManagerLeaderAddress-->
TaskManager.triggerTaskManagerRegistration()
TaskManager.handleRegistrationMessage-->
instanceManager.registerTaskManager-->
jobManager 發送消息AcknowledgeRegistration給TaskManager
TaskManager.associateWithJobManager-->



![](//upload-images.jianshu.io/upload_images/3249301-8797abe4fead540e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6ada492f0e25718b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-7d9e8a6893af056a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e4116544f6c7a677.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-03e551a6ebf785c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-155d80ccd8b5bed4.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-40bbe88a86f090c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-2226948e829a5add.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-b9c444d706737b6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-3ff9f4e9243c1264.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-08823728261241ec.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![associateWithJobManager](//upload-images.jianshu.io/upload_images/3249301-f3aa8048869975c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1bb10a80436aa199.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-442effae29613f2c.png)###基本上Flink on yarn的流程就是這樣,細節需要深入,有不正確的地方,希望給予指正。




鏈接:https://www.jianshu.com/p/8a3177095072


免費體驗雲安全(易盾)內容安全、驗證碼等服務


更多網易技術、產品、運營經驗分享請點擊。




相關文章:
【推薦】 Android TV 開發 (1)
【推薦】 針對雲主機卡死問題的定位分析方法
【推薦】 TheBeamModel:Stream&Tables翻譯(上)

Flink on Yarn模式啟動流程源代碼分析