1. 程式人生 > >Spark on Yarn資源排程原始碼解析

Spark on Yarn資源排程原始碼解析

在命令列中呼叫SparkSubmit類啟動Spark自定義Jar包執行的時候當前節點呼叫的有主函式的類名childMainClass分為
-----------------
standalone: org.apache.spark.deploy.rest.RestSubmissionClient
client:jar包中main函式,預設形式
yarn-cluster:org.apache.spark.deploy.yarn.Client
memos:org.apache.spark.deploy.rest.RestSubmissionClient

在SparkContext檔案各排程模式的相關類


-----------------
local:LocalSparkCluster + SparkDeploySchedulerBackend + TaskSchedulerImpl
yarn-cluster: YarnClusterSchedulerBackend + YarnClusterScheduler
yarn-client:YarnClientSchedulerBackend + YarnScheduler
standalone:SparkDeploySchedulerBackend + TaskSchedulerImpl
memos:MesosSchedulerBackend + TaskSchedulerImpl

yarn-cluster模式各節點所執行的程式碼入口
-----------------
在客戶端節點做的事情:
SparkSubmit//提交自定義程式並在在客戶端機器本地建立執行環境
         submit.doRunMain.runMain
                  loader//設定類載入器
                  Client.main
                           YarnClient.submitApplication
在yarn叢集中選的Driver程式執行節點做的事情:
ApplicationMaster.main//yarn-cluster模式Driver所在節點
         run.runDriver

在yarn叢集其它節點做的事情:
ExecutorLauncher.main//yarn-cluster模式Executor所在節點
         ApplicationMaster.main
                  run.runExecutorLauncher//啟動executor程式

yarn-cluster模式使用者命令介面提交執行->初始化->排程->劃分stage->生成task和tasket並提交執行的流程

----------------

SparkSubmit//提交自定義程式並在在客戶端機器本地建立執行環境

         submit.doRunMain.runMain

                   loader//設定類載入器

                   mainMethod.invoke//呼叫類的main方法,這個類根據不同調度模式是不同的類,具體參考本頁面中關於childMainClass的內容

                            SparkContext//建構函式和成員變數初試化

                                     DAGScheduler//新建物件DAGScheduler

                                               new YarnClusterScheduler//TaskSchedulerImpl

                                               new YarnClusterSchedulerBackend//RPC通訊

                                     TaskScheduler.start

                                               YarnClusterSchedulerBackend.start

                                     textFile//產生RDD和RDD對應的Partition,以及底層儲存Block

                                     RDD.map//RDD變換為另一個RDD,用於構造關係暫不實際計算

                                     RDD.reduce//根據前面構造的依賴關係開始實際計算

                                               SparkContext.runJob

                                                        DAGScheduler.runJob.submitJob

                                                                 DAGSchedulerEventProcessLoop.post//觸發任務提交事件,任務排程開始

                                                                           eventQueue.put

                                                                           eventQueue.take

                                                                 DAGSchedulerEventProcessLoop.onReceive

                                                                           DAGScheduler.handleJobSubmitted

                                                                                    newResultStage//構造final stage

                                                                                             updateJobIdStageIdMaps//通過RDD之間的依賴構造Stage間依賴關係

                                                                                    submitStage

                                                                                             submitMissingTasks

                                                                                                       Serializer.serialize//對任務資訊編組

                                                                                                       partitionsToCompute//根據Stage型別對每個Partition建立對應Task

                                                                                                                new ShuffleMapTask/ResultTask

                                                                                                       TaskScheduler.submitTasks//進入Task級別的排程,在此處區分不同調度型別的行為

                                                                                                                new TaskSet

                                                                                                                createTaskSetManager

                                                                                                                         new TaskSetManager

                                                                                                                         SchedulableBuilder.addTaskSetManager//進入單機多工的排程

                                                                                                                                   FIFOSchedulableBuilder/FairSchedulableBuilder

                                                                                                                SchedulerBackend.reviveOffers//實際排程程式

                                                                                                                         SparkDeploySchedulerBackend/YarnClusterSchedulerBackend/MesosSchedulerBackend

                                                                                                                         RpcEndpointRef.send

                                                                                                                                   AkkaRpcEndpointRef.send

                                                                                                                                            ActorRef ! AkkaMessage

Driver和Executor節點註冊,分配資源和啟動的流程

----------------

YarnClusterSchedulerBackend.start

         Client

                   submitApplication

                            createContainerLaunchContext

                                     ApplicationMaster.main//yarn-cluster模式Driver和ApplicationMaster所在節點

                                               run.runDriver

                                                        startUserApplication//啟動遠端driver程式

                                                                 mainMethod.invoke

                                                        runAMEndpoint//ApplicationMaster通訊地址

                                                        registerAM//在ResourceManager上註冊ApplicationMaster

                                                                 YarnRMClient.register//

                                                                           AMRMClient.registerApplicationMaster//進入Yarn原生API

                                                                           new YarnAllocator//分配container的

                                                                 YarnAllocator.allocateResources

                                                                           AMRMClient.allocate//進入Yarn原生API

                                                                           handleAllocatedContainers

                                                                                    runAllocatedContainers

                                                                                             new ExecutorRunnable

                                                                                                       run

                                                                                                                NMClient.start

                                                                                                                startContainer

                                                                                                                         NMClient.startContainer//進入Yarn原生API

                                     ExecutorLauncher.main//yarn-cluster模式Executor所在節點

                                               ApplicationMaster.main

                                                        run.runExecutorLauncher//啟動executor程式

                                                                 waitForSparkDriver//等待客戶端driver程式就緒

                                                                          RpcEnv.create//建立ApplicationMaster和Driver通訊的RPC地址

                                                                           runAMEndpoint//

                                                                 registerAM

                            createApplicationSubmissionContext

                            YarnClient.submitApplication//進入Yarn原生API

JobScheduler//只在Streaming模組中使用,配合JobGenerator使用。