Flink on Yarn模式啟動流程源代碼分析
此文已由作者嶽猛授權網易雲社區發布。
歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。
Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從源碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。
--> 1.命令行啟動yarn session
bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
我們去看下啟動腳本
$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
主要是用java -cp的方式啟動主類** *org.apache.flink.yarn.cli.FlinkYarnSessionCli * , $@ 就是我們傳入的哪些參數 " -n 3 -jm 1024 -nm 1024 -st" **。
1. FlinkYarnSessionCli 的啟動流程分析
首先看下Main函數
public static void main(String[] args) { FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session System.exit(cli.run(args)); }
主要是構造FlinkYarnSessionCli,然後執行其run方法,這裏主要介紹主要流程的代碼。
public int run(String[] args)
1.解析命令行參數
cmd = parser.parse(options, args)
2.根據命令行參數決定執行那種模式。
# 第一種,判斷命令是否包含 -q
** 如: **
# 第二種,判斷是否有-id參數
這裏我們看下交互模式是啥樣的,共有兩個可選項,help和stop,如果我們敲入stop,則應用對應的所有進程會退出。
# 第三種,為正常模式
** 這裏主要為構造YarnClusterDescriptor,然後調用其deploy方法啟動集群 ,接著將Jobmanager和web ui地址寫入到out文件中去,如果采用分離模式,則等待集群啟動之後yarn session自動退出,如果不是則進入交互模式,我們可以通過交互控制這個Applitcation **
接著看下是如何構造YarnClusterDescriptor的
----------------- **1 creat YarnClusterDescriptor ** ----------------------
直接new YarnClusterDescriptor對象,然後將依賴jar地址,配置參數如taskmanager個數,jar地址,配置文件地址,配置參數等設置到YarnClusterDescriptor對象中去,然後返回這個對象。
------------** 2 YarnClusterDescriptor deploy ** -------------------------
由於YarnClusterDescriptor沒有重寫depoy方法則直接調用其父類AbstractYarnClusterDescriptor的deploy方法,但是最終調用的是其deployInternal方法.
接著看下deployInternal方法,簡單的描述下流程,後續代碼分析下面的github地址
檢查是否具備Deploy的條件,如配置文件,jar路徑是否為空
獲取yarn的client,用戶和RM進行通信
增加動態的配置屬性到配置conf對象中去,解析配置conf對象為kv對
獲取HDFS FileSyetem,這裏用於將本地jar及配置文件上傳到HDFS,
判斷JobManager和TaskManager申請的資源是否滿足yarn分配單個container的最小分配,如果小於則將container最小分配用來初始化jobMananger和TaskMananer
通過yarn client創建Application,返回GetNewApplicationResponse對象用於跟RM進行RPC通信。
通過GetNewApplicationResponse對象獲取RM能夠為這個應用分配的最大資源,如果最大資源不能夠滿足jobManagerMemoryMb和taskManagerMemoryMb則報錯,計算總的jobmanager和所有taskmanager總共需要的資源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),計算RM中總共的空閑資源,判斷空閑資源是否滿足前面計算需要的需求,如果不滿足,則可能先啟動yarn session,task manager等到有資源再進行啟動;先為jobManager分配一個nm,然後再在其他的nm上啟動taskmanager
設置啟動ApplicationMaster的 lanchcontext,這裏主要是設置java home,主類,jvm參數數,log文件配置。ApplicationMaster的主類 YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。
protected Class<?> getApplicationMasterClass() { return YarnApplicationMasterRunner.class; }``` - 設置ApplicationSubmissionContext,獲取ApplicationId - 設置session需要的hdfs路徑,然後將本地jar包及配置文件,配置文件上傳到HDFS - 設置AM啟動的token信息,設置AM啟動的過程中需要從hdfs下載那些依賴的jar和配置文件,設置ApplicationMaster及Flink及其他進程的classpath,不多說 - 設置鉤子函數在deploy的時候清理上傳到hdfs的文件及本地下載的依賴文件 - *** 重點,提交Applicaiton到RM;設置這個Application的狀態為NEW,然後監控這個應用,如果不是之前的NEW狀態,則打印當前狀態,如果Running狀態則跳出這個循環,如果是其他狀態,則拋出YarnDeploymentException異常,上層調用捕獲處理吧,不然250ms判斷一次 *** - depoly成功,鉤子函數刪除臨時文件,如依賴的jar包和配置文件等,返回YarnClusterClient對象,包含了這YarnClusterDescriptor,ApplicationReport等重要的屬性。 *** ***deploy 成功以後進入交互模式,在runInteractiveCli裏面最重要的一步是構造ApplicationClient Actor用於和JobManager Actor進行通信,但是如果發送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等消息,將會由jobmanager actor將forward方法路由到flink resource manager actor去處理,此時jobmanager作為flink resource manager的代理,此時收到這兩個消息的時候,由於是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接給application client返回消息*** *** > ------------ ** 3 代碼展示主要流程**------ ![](//upload-images.jianshu.io/upload_images/3249301-d22456f0939a8365.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-9c80aa18467d4e10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-85a2f462ff96e5fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3c81e3dc9b23db0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-bf190e6a72366f0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-57eb01f090d38dd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d548d544dbd1b713.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2013feca33032c46.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d0d8c8c1a56f28ff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-be3a228edeffad9d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)*** ---- ApplicationClient 和JobManager Actor通信代碼 --*** ![](//upload-images.jianshu.io/upload_images/3249301-56371ec18930ba4f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-ed28091d44dc3906.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1a0df11e1a57941d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-913d82bf6d5825b8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6309a49886d0cc4e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-47f86bcfdee4f967.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a483d4af26931cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-4de478f435cb1356.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-27986c3659bd96cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a27f89acbe3406de.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-f04a3da4f97a08dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)## 2. YarnApplicationMasterRunner 啟動流程分析 *** RM首先分配一個NM的container去啟動YarnApplicationMasterRunner ,接著下來我們看下是怎麽做的*** 首先是進入main函數裏面,構造一個YarnApplicationMasterRunner對象,直接調用其Run方法。 > run方法主要步驟 - 獲取當前用戶的UGI及遠端UGI - 將當前用戶ugi裏面的token傳遞到遠端的UGI中,用於數據和服務訪問 - 在遠端的UGI裏面執行runApplicationMaster啟動ApplicationMaster ![](//upload-images.jianshu.io/upload_images/3249301-7d3ac2af1bf091f6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)> runApplicationMaster主要過程,這裏註釋很清楚,我只撿重要的提示下 - 1) load and parse / validate all configurations - 2) start the actor system,try to start the actor system, JobManager and JobManager actor system - 3) Generate the configuration for the TaskManagers,這裏主要是JobManager的地址,taskManager註冊的超時時間,slot個數,這裏還有最重要的一步是構造TaskManager的ContainerLaunchContext,這個context裏面包含了啟動TaskManager的啟動命令,***主類是YarnTaskManager***。 - start the actors and components in this order: 1) JobManager & Archive (in non-HA case, the leader service takes this),啟動JobManagerActor,這裏主類是***YarnJobManager*** 2) Web Monitor (we need its port to register) 啟動WEB監控頁面,創建LeaderRetrievalService對象,這個主要用於啟動TaskManager的時候,告訴TaskManager JobManager得akka url,用於TaskManager向JobManager進行註冊。 3) Resource Master for YARN 啟動YarnFlinkResourceManager Actor,這裏主要用於Flink container資源的管理包括申請與釋放等。 4) Process reapers for the JobManager and Resource Master ***這裏主要介紹YarnApplicationMasterRunner 是如何通過YarnFlinkResourceManager去完成container的申請與啟動TaskManager的,這裏相對來說,比較復雜,我跟到Yarn的代碼裏才算整明白*** ![YarnFlinkResourceManager的繼承關系](//upload-images.jianshu.io/upload_images/3249301-cbc8215f8c356913.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)說明YarnFlinkResourceManager其實是一個actor,在runApplicationMaster方法中,通過下面的代碼啟動這個Actor
Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),//YarnFlinkResourceManager
config,
yarnConfig,
leaderRetriever,
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//啟動YarnFlinkResourceManager actor
接著看下YarnFlinkResourceManager 的構造方法,這裏主要有三個成員變量比較重要
//在yarn 的rm端會調用該回對象的回調函數進行container申請,resourceManagerCallbackHandler裏面只有該actor的actor ref,所以回調的過程中能夠與該actor進行通信
/** Callback handler for the asynchronous resourceManagerClient /
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
//AM與RM通信的client,resourceManagerClient對象持有resourceManagerCallbackHandler
/* Client to communicate with the Resource Manager (YARN‘s master) /
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
//AM與NM的通信client
/* Client to communicate with the Node manager and launch TaskManager processes */
private NMClient nodeManagerClient;
YarnFlinkResourceManager 啟動的過程先執行preStart方法,自己沒有實現則執行其父類FlinkResourceManager的preStart方法。接著調用YarnFlinkResourceManager 的initialize方法。 > ***在initialize方法裏面*** *** resourceManagerClient.start() ----> AMRMClientAsyncImpl.serviceStart()---> CallbackHandlerThread.start()(守護線程)---> YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster為YarnFlinkResourceManager ActorRef) --> YarnFlinkResourceManager .containersAllocated --> NMClient.startContainer(container, taskManagerLaunchContext) 至此通知各個NM啟動container。*** ![](//upload-images.jianshu.io/upload_images/3249301-3a5b6377e93742b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2c532578dc7a508d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a2cd3067970fe39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e14cfee9289c57bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2e3b73f252030ea8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3239382a6bde777.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-0a52a463c263bbd7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-52cecffb2670cc67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)***至此,YarnApplicationMasterRunner 重要的流程已經說完,細節東西太多,就不再說了,有時間再看,接下來看YarnTaskManager的部分***## 3. YarnTaskManager啟動流程分析***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)將通知NM去啟動container,NM根據taskManagerLaunchContext的啟動信息,從HDFS下載YarnTaskManager啟動過程依賴的jar和配置文件 (container_tokens default_container_executor_session.sh default_container_executor.sh flink-conf.yaml flink.jar launch_container.sh lib log4j.properties logback.xml),然後shell執行launch_container.sh,最終用java -cp啟動YarnTaskManager進程,啟動進程的時候首先執行YarnTaskManager run方法,TaskManager會拿到JobManager的akka地址,然後發送註冊消息,JobManager收到註冊消息以後,註冊成功之後就發送ack確認註冊信息給TaskManager,然後TaskManger根據配置以及JobManager返回過來的信息構建一些真正幹活的成員變量。過程:*** > YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])--> TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)--> TaskManager.runTaskManager --> TaskManager.startTaskManagerComponentsAndActor--> actorSystem.actorOf(tmProps, actorName)--> TaskManager.preStart--> StandaloneLeaderRetrievalService.start(TaskManager)--> TaskManger.notifyLeaderAddress--> TaskManager.handleJobManagerLeaderAddress--> TaskManager.triggerTaskManagerRegistration() TaskManager.handleRegistrationMessage--> instanceManager.registerTaskManager--> jobManager 發送消息AcknowledgeRegistration給TaskManager TaskManager.associateWithJobManager--> ![](//upload-images.jianshu.io/upload_images/3249301-8797abe4fead540e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6ada492f0e25718b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-7d9e8a6893af056a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e4116544f6c7a677.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-03e551a6ebf785c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-155d80ccd8b5bed4.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-40bbe88a86f090c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-2226948e829a5add.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-b9c444d706737b6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-3ff9f4e9243c1264.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-08823728261241ec.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![associateWithJobManager](//upload-images.jianshu.io/upload_images/3249301-f3aa8048869975c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1bb10a80436aa199.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-442effae29613f2c.png)###基本上Flink on yarn的流程就是這樣,細節需要深入,有不正確的地方,希望給予指正。
鏈接:https://www.jianshu.com/p/8a3177095072
免費體驗雲安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 Android TV 開發 (1)
【推薦】 針對雲主機卡死問題的定位分析方法
【推薦】 TheBeamModel:Stream&Tables翻譯(上)
Flink on Yarn模式啟動流程源代碼分析