1. 程式人生 > >spark啟動原理總結

spark啟動原理總結

lead term 技術 forward deploy ddp rbo 其他 sso

一般情況下,我們啟動spark集群都是start-all.sh或者是先啟動master(start-master.sh),然後在啟動slave節點(start-slaves.sh),其實翻看start-all.sh文件裏面的代碼,可以發現它裏面其實調用的執行的也是start-master.sh和start-slaves.sh文件的內容:

技術分享圖片

在start-master.sh中定義了CLASS="org.apache.spark.deploy.master.Master" ,最終調用其main方法啟動master服務,在start-slaves.sh文件中有調用了start-slave.sh內容,只是定義了

CLASS="org.apache.spark.deploy.worker.Worker"來啟動worker。

接下來先看master中的main方法,在main方法中調用了startRpcEnvAndEndpoint()方法,來定義並啟動消息通信。

技術分享圖片

在啟動服務端master通信的時候,會在inbox中調用master的onStart方法(關於spark RPC可以查閱其他博客);下面就分析master的onStart方法:

 1  override def onStart(): Unit = {
 2     logInfo("Starting Spark master at " + masterUrl)
3 logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") 4 webUi = new MasterWebUI(this, webUiPort) 5 webUi.bind() 6 masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort 7 if (reverseProxy) { 8 masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
9 webUi.addProxy() 10 logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + 11 s"Applications UIs are available at $masterWebUiUrl") 12 } 13 checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { 14 override def run(): Unit = Utils.tryLogNonFatalError { 15 self.send(CheckForWorkerTimeOut) 16 } 17 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 18 19 if (restServerEnabled) { 20 val port = conf.getInt("spark.master.rest.port", 6066) 21 restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) 22 } 23 restServerBoundPort = restServer.map(_.start()) 24 25 masterMetricsSystem.registerSource(masterSource) 26 masterMetricsSystem.start() 27 applicationMetricsSystem.start() 28 // Attach the master and app metrics servlet handler to the web ui after the metrics systems are 29 // started. 30 masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) 31 applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) 32 33 val serializer = new JavaSerializer(conf) 34 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { 35 case "ZOOKEEPER" => 36 logInfo("Persisting recovery state to ZooKeeper") 37 val zkFactory = 38 new ZooKeeperRecoveryModeFactory(conf, serializer) 39 (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) 40 case "FILESYSTEM" => 41 val fsFactory = 42 new FileSystemRecoveryModeFactory(conf, serializer) 43 (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) 44 case "CUSTOM" => 45 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) 46 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) 47 .newInstance(conf, serializer) 48 .asInstanceOf[StandaloneRecoveryModeFactory] 49 (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) 50 case _ => 51 (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) 52 } 53 persistenceEngine = persistenceEngine_ 54 leaderElectionAgent = leaderElectionAgent_ 55 }

在這個方法裏面首先針對webui進行了一系列的處理,然後啟動一個線程來檢查任何超時的worker,並且清除它;其次處理了一些關於Metrics的內容和在多個master下面的的關於元數據和master選舉的一些機制;

根據spark.deploy.recoveryMode設置的參數,可以是ZOOKEEPER,FILESYSTEM,CUSTOM,默認為NONE,當是zookeeper時,基於ZooKeeper選舉,元數據信息會持久化到ZooKeeper中。當是fileSystem時集群的元數據會保存到本地的文件系統中,而master啟動會立即成為集群的master。當是custom時,是用戶自定義,需要實現StandaloneRecoveryModeFactory,並將類的名字配置到spark.deploy.recoveryMode.factory;當是NONE的時候不會持久化元數據信息,master啟動會即是集群的master。

接下來看看worker中的處理,在worker的mian方法中調用的是startRpcEnvAndEndpoint()方法

技術分享圖片

上面的方法也是註冊啟動了worker的消息通信,同理也會調用worker的onStart方法。在onstart方法裏面會調用registerWithMaster()方法來註冊到master上。

技術分享圖片

在這個方法裏面會調用tryRegisterAllMasters來註冊到master,在其後面是關於重試的處理,主要是判斷registered的值來進行相應的處理。接下來是tryRegisterAllMasters方法:

技術分享圖片

這裏會創建一個註冊master的線程池來管理,發送的消息在sendRegisterMessageToMaster方法中,就是發送一個RegisterWorker的消息給master;接下倆看master對這個消息的處理:

技術分享圖片

master在接收到這個消息的時候會先判斷state狀態以及現有註冊的worker是否存在新的註冊的worker的id,若狀態和id沒有匹配到則新建一個workerInfo來保存worker的信息,最後調用registerWorker方法添加worker,早真正添加完成之後,會給worker發送RegisteredWorker消息,其後會調用schedule方法;下面先看worker接收到消息的處理:

技術分享圖片

worker在接收到來master的消息之後,先更新registered的值,然後更新master的信息,啟動一個線程定時給master發送心跳信息,如果配置了spark.worker.cleanup.enabled為true,則進行清理工作,最後會向master發送worker的exector的信息。

到這個時候master和worker已經完全啟動,接下來就是啟動worker中的exectors。這個改天再說!

spark啟動原理總結