1. 程式人生 > >spark 啟動訊息通訊基本過程原始碼閱讀(一)

spark 啟動訊息通訊基本過程原始碼閱讀(一)

spark 啟動訊息通訊基本過程

spark啟動過程主要是程序maste和worker之間的通訊:

1、worker節點向master節點發送註冊訊息

2、註冊成功後,返回註冊成功訊息或者失敗訊息。

3、worker定時傳送心跳給master。

具體流程圖如下所示:

 

1、

a)、當master啟動後,隨之啟動各worker,worker啟動時會建立通訊環境RpcEnv和終端點Endpoint,

並向Master傳送註冊Worker的訊息RegisterWorker。

由於Worker可能需要註冊多個Master(HA),在Worker類的tryRegisterAllMasters方法中建立註冊執行緒池 registerMasterThreadPool,把需要註冊的請求,放入執行緒池中,然後通過啟動執行緒池來註冊。

b)、註冊過程:

獲取master終端引用,呼叫registerWithMaster(2.1.1版本是此方法,但是2.2.0版本用的是sendRegisterMessageToMaster方法)

2.1.1:

類:worker:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {

masterRpcAddresses.map { masterAddress =>

registerMasterThreadPool.submit(new Runnable {

override def run(): Unit = {

try {

logInfo("Connecting to master " + masterAddress + "...")

//獲取Master終端點引用

val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)

// 呼叫registerWithMaster 方法註冊資訊

registerWithMaster(masterEndpoint)

} catch {

case ie: InterruptedException => // Cancelled

case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)

}

}

})

}

}

 

2.2.0:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {

masterRpcAddresses.map { masterAddress =>

registerMasterThreadPool.submit(new Runnable {

override def run(): Unit = {

try {

logInfo("Connecting to master " + masterAddress + "...")

////獲取Master終端點引用

val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)

//// 呼叫registerWithMaster 方法註冊信

sendRegisterMessageToMaster(masterEndpoint)

} catch {

case ie: InterruptedException => // Cancelled

case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)

}

}

})

}

}

 

sendRegisterMessageToMaster和registerWithMaster方法的不同是一個send*用了send方法,而registerWithMaster用的是ask方法,其中一個區別是ask方法有返回,而send方法沒有返回。2.2版本為什麼這麼修改,待學習。

c) Master收到訊息後,需要對Worker傳送的資訊進行驗證、記錄。如果註冊成功,則傳送RegisteredWorker訊息給對應的Worker,告訴Worker已經完成註冊,隨之程序步驟3,即Worker定期傳送心跳資訊給Master;如果註冊失敗,則會發送RegisterWorkerFailed訊息,Worker打印出錯誤日誌並結束worker啟動。

d) 在Master中,Master接收到Worker註冊資訊後,先判斷Master當前狀態是處於standby狀態,如果是則忽略該訊息,如果在註冊列表中發現了該worker的編號,則傳送註冊失敗的訊息。判斷完畢後,使用registerWorker方法把該Worker加入到列表中,使用者叢集進行處理任務時進行排程。Master.receiveAndReply方法中註冊Worer程式碼:

類:master

case RegisterWorker(

id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

if (state == RecoveryState.STANDBY) {

workerRef.send(MasterInStandby)

} else if (idToWorker.contains(id)) {

workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))

} else {

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

workerRef, workerWebUiUrl)

//registerWorker方法中註冊Worker,該方法中會把Worker放到列表中

//用於後續執行任務時使用

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))

schedule()

} else {

val workerAddress = worker.endpoint.address

logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress))

}

}

 

e) 當worker接收到註冊成功後,會定時傳送heartbeat給Master,以便Master瞭解Worker的實時狀態。間隔時間可以在spark.worker.timer中設定,注意的是 ,該設定值為1/4為心跳間隔。

 

private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4

 

當 Worker獲取到註冊成功訊息後,先記錄日誌並更新Master資訊,然後啟動定時排程程序傳送心跳資訊,該排程程序時間間隔為上面所所定義的HEARTBEAT_MILLIS 值。

 

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {

msg match {

case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>

if (preferConfiguredMasterAddress) {

logInfo("Successfully registered with master " + masterAddress.toSparkURL)

} else {

logInfo("Successfully registered with master " + masterRef.address.toSparkURL)

}

registered = true

changeMaster(masterRef, masterWebUiUrl, masterAddress)

forwordMessageScheduler.scheduleAtFixedRate(new Runnable {

override def run(): Unit = Utils.tryLogNonFatalError {

self.send(SendHeartbeat)

}

}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)

//如果設定清理以前的應用使用的資料夾,則設定spark.worker.cleanup.enabled引數,將CLEANUP_ENABLED設定為True。

if (CLEANUP_ENABLED) {

logInfo(

s"Worker cleanup enabled; old application directories will be deleted in: $workDir")

forwordMessageScheduler.scheduleAtFixedRate(new Runnable {

override def run(): Unit = Utils.tryLogNonFatalError {

self.send(WorkDirCleanup)

}

}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)

}

//向maste彙報Worker中Excutor最新狀態

val execs = executors.values.map { e =>

new ExecutorDescription(e.appId, e.execId, e.cores, e.state)

}

masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))