spark 啟動訊息通訊基本過程原始碼閱讀(一)
spark 啟動訊息通訊基本過程
spark啟動過程主要是程序maste和worker之間的通訊:
1、worker節點向master節點發送註冊訊息
2、註冊成功後,返回註冊成功訊息或者失敗訊息。
3、worker定時傳送心跳給master。
具體流程圖如下所示:
1、
a)、當master啟動後,隨之啟動各worker,worker啟動時會建立通訊環境RpcEnv和終端點Endpoint,
並向Master傳送註冊Worker的訊息RegisterWorker。
由於Worker可能需要註冊多個Master(HA),在Worker類的tryRegisterAllMasters方法中建立註冊執行緒池 registerMasterThreadPool,把需要註冊的請求,放入執行緒池中,然後通過啟動執行緒池來註冊。
b)、註冊過程:
獲取master終端引用,呼叫registerWithMaster(2.1.1版本是此方法,但是2.2.0版本用的是sendRegisterMessageToMaster方法)
2.1.1:
類:worker:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
//獲取Master終端點引用
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 呼叫registerWithMaster 方法註冊資訊
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
2.2.0:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
////獲取Master終端點引用
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
//// 呼叫registerWithMaster 方法註冊信
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
sendRegisterMessageToMaster和registerWithMaster方法的不同是一個send*用了send方法,而registerWithMaster用的是ask方法,其中一個區別是ask方法有返回,而send方法沒有返回。2.2版本為什麼這麼修改,待學習。
c) Master收到訊息後,需要對Worker傳送的資訊進行驗證、記錄。如果註冊成功,則傳送RegisteredWorker訊息給對應的Worker,告訴Worker已經完成註冊,隨之程序步驟3,即Worker定期傳送心跳資訊給Master;如果註冊失敗,則會發送RegisterWorkerFailed訊息,Worker打印出錯誤日誌並結束worker啟動。
d) 在Master中,Master接收到Worker註冊資訊後,先判斷Master當前狀態是處於standby狀態,如果是則忽略該訊息,如果在註冊列表中發現了該worker的編號,則傳送註冊失敗的訊息。判斷完畢後,使用registerWorker方法把該Worker加入到列表中,使用者叢集進行處理任務時進行排程。Master.receiveAndReply方法中註冊Worer程式碼:
類:master
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
//registerWorker方法中註冊Worker,該方法中會把Worker放到列表中
//用於後續執行任務時使用
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
e) 當worker接收到註冊成功後,會定時傳送heartbeat給Master,以便Master瞭解Worker的實時狀態。間隔時間可以在spark.worker.timer中設定,注意的是 ,該設定值為1/4為心跳間隔。
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
當 Worker獲取到註冊成功訊息後,先記錄日誌並更新Master資訊,然後啟動定時排程程序傳送心跳資訊,該排程程序時間間隔為上面所所定義的HEARTBEAT_MILLIS 值。
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
if (preferConfiguredMasterAddress) {
logInfo("Successfully registered with master " + masterAddress.toSparkURL)
} else {
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
}
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
//如果設定清理以前的應用使用的資料夾,則設定spark.worker.cleanup.enabled引數,將CLEANUP_ENABLED設定為True。
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
//向maste彙報Worker中Excutor最新狀態
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))