1. 程式人生 > >3--Master註冊機制源碼分析和狀態改變機制源碼分析

3--Master註冊機制源碼分析和狀態改變機制源碼分析

地址 mis 清空 finish accep same properly values pad

這部分直接看源碼會比較直觀!!!

[註]本篇是對第二篇中間的Master狀態改變以及註冊機制進行剖析 master註冊機制原理圖如下 , 其實就是將Application信息 , Driver信息和所有的Worker信息加入緩存隊列中. 技術分享圖片 1. Application的註冊 case RegisterApplication(description, driver) => { // 如果當前master為standByMaster , 不是ActiveMaster , 那麽Application來註冊則什麽都不會做 if (state == RecoveryState.STANDBY) { // ignore, don‘t send response } else { logInfo("Registering app " + description.name) // 通過接收到的application desc信息創建Application對象 val app = createApplication(description, driver) // 註冊Application對象 registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) // 持久化Application信息 persistenceEngine.addApplication(app) // 向master發送註冊Application的信息 , // 也就是反向向SparkDeploySchedulerBackend的AppClient的ClientActor發送已經註冊的RegisteredApplication消息 driver.send(RegisteredApplication(app.id, self)) // 開始資源調度 schedule() } } 1.1 createApplication()方法過接收到的application desc信息創建Application對象 private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = { val now = System.currentTimeMillis() val date = new Date(now) val appId = newApplicationId(date) new ApplicationInfo(now, appId, desc, date, driver, defaultCores) } 1.2 registerApplication()註冊Application對象 /** * 將讀取到的持久化的app信息.重新註冊到standby master的內存緩存結構中 * 處理Application註冊請求信息 * @param app 參數是ApplicationInfo-->ApplicationDescription * 其中的private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] */ private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address // 如果新的Master中已經包含了這個要註冊的Application的話,嘗試重啟即可 if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } // 通過接收到的application desc信息創建Application對象 // 這個app的appSource()方法是對appDesc的基本信息進行相關的計算 // 這裏就不向下跟蹤了 applicationMetricsSystem.registerSource(app.appSource) // 加入到內存緩存中去--->HastSet(去重) // val apps = new HashSet[ApplicationInfo] apps += app // val idToApp = new HashMap[String, ApplicationInfo] idToApp(app.id) = app endpointToApp(app.driver) = app addressToApp(appAddress) = app // 將app加入到等待調度隊列中 -->waitingApps其實就是一個ArrayBuffer waitingApps += app } 2.Worker的註冊原理和Application相似 case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) // 檢查Master的狀態 if (state == RecoveryState.STANDBY) { context.reply(MasterInStandby) } else if (idToWorker.contains(id)) { context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { // 創建WorkerInfo信息對象 , 封裝相關的worker信息 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) // 註冊worker , 調用registerWorker()方法進行註冊 , // 若註冊成功進行worker信息持久化並向master發送註冊的消息 if (registerWorker(worker)) { // worker信息持久化 persistenceEngine.addWorker(worker) // 向master發送註冊消息 context.reply(RegisteredWorker(self, masterWebUiUrl)) // 開始調度 schedule() } else { // 註冊失敗的話向master發送註冊失敗的消息 val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } } } 2.1 registerWorker()註冊worker的相關信息 /** * 註冊worker的相關信息 * @param worker * @return */ private def registerWorker(worker: WorkerInfo): Boolean = { // There may be one or more refs to dead workers on this same node (w/ different ID‘s), // remove them. // 這裏過濾掉已經死掉的worker , 將他們從緩存隊列中移除 workers.filter { w => (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) }.foreach { w => workers -= w } // 獲取worker的url地址 val workerAddress = worker.endpoint.address // 檢查worker的地址緩存隊列中是否已經有了該worker的地址信息 if (addressToWorker.contains(workerAddress)) { // 從worker的地址緩存隊列(HashMap)中獲取已經存在的worker的地址信息 , 稱之為oldworker val oldWorker = addressToWorker(workerAddress) // 若是oldworker為UNKNOW狀態的話,需要將其從緩存隊列中移除 if (oldWorker.state == WorkerState.UNKNOWN) { // A worker registering from UNKNOWN implies that the worker was restarted during recovery. // The old worker must thus be dead, so we will remove it and accept the new worker. // 這個方法,上一篇已將說了 removeWorker(oldWorker) } else { logInfo("Attempted to re-register worker at same address: " + workerAddress) return false } } // 將新增加的worker加入緩存隊列HashSet中 workers += worker // worker的id信息加入id緩存隊列 idToWorker(worker.id) = worker // 將worker的地址加入address緩存隊列中 addressToWorker(workerAddress) = worker true } 2.2 removeWorker private def removeWorker(worker: WorkerInfo) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) // 首先修改worker的狀態為DADE worker.setState(WorkerState.DEAD) // 1、從內存緩存結構中移除 idToWorker -= worker.id addressToWorker -= worker.endpoint.address // 2、從相關的組件的內存中移除-->executor和driver // 這個地方也都涉及到了supersiver機制問題 for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.LOST, Some("worker lost"), None)) exec.application.removeExecutor(exec) } for (driver <- worker.drivers.values) { if (driver.desc.supervise) { logInfo(s"Re-launching ${driver.id}") relaunchDriver(driver) } else { logInfo(s"Not re-launching ${driver.id} because it was not supervised") removeDriver(driver.id, DriverState.ERROR, None) } } 3. Dirver的註冊源碼也相似 /** * 註冊Driver */ case RequestSubmitDriver(description) => { // 檢查master狀態 if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { logInfo("Driver submitted " + description.command.mainClass) // 根據DriverDescription創建Driver val driver = createDriver(description) // 持久化Driver信息 persistenceEngine.addDriver(driver) // 將Driver加入等待調度的緩存隊列中 waitingDrivers += driver // 將Driver加入緩存隊列 drivers.add(driver) // 開始調度 schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it‘s simply "fire and forget". context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } } 3.1 參數DriverDescription private[deploy] case class DriverDescription( jarUrl: String, // jar包的名稱 mem: Int, // Dirver所需要的內存 cores: Int, // Driver所需要的cpu core數量 supervise: Boolean, // supervise機制 command: Command) { // 相關命令 override def toString: String = s"DriverDescription (${command.mainClass})" } 3.2 Driver狀態改變 /** * Driver的狀態改變 */ case DriverStateChanged(driverId, state, exception) => { state match { // 如果Driver的狀態為ERROR,FINISHED,KILLED,FAILED 那麽都會將Driver移除 case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } } 3.2 Driver移除 /** * 移除Driver */ private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) { // 用scala的高階函數find() 來匹配當前的driverId drivers.find(d => d.id == driverId) match { // 使用樣例類some(),來判斷是否有值 case Some(driver) => logInfo(s"Removing driver: $driverId") // 如果有值,將Driver從內存緩存<HashSet>中移除 drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } // Driver是一種資源,向已經完成的completedDrivers中加入Driver completedDrivers += driver // 持久化緩存信息 persistenceEngine.removeDriver(driver) // 設置Driver相應的state、exception driver.state = finalState driver.exception = exception // 將Driver從所在的Worker中移除 driver.worker.foreach(w => w.removeDriver(driver)) // 其實在我看就相當於清空刷新,等待調度 schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } } } 4 Executor的狀態的改變 /** * Executor狀態改變所需要的操作 */ case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { // 找到executor對應app,然後反過來通過app對應的executor緩存獲取executor信息 val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { // 成功找到 case Some(exec) => { // 設置executor的當前狀態 val appInfo = idToApp(appId) val oldState = exec.state exec.state = state if (state == ExecutorState.RUNNING) { assert(oldState == ExecutorState.LAUNCHING, s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } // 向driver同步發送executorUpdate的信息 exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus)) // 判斷 , 如果executor已經完成了 if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") // If an application has already finished, preserve its // state to display its information properly on the UI // 從Application中移除掉executor if (!appInfo.isFinished) { appInfo.removeExecutor(exec) } // 從worker中移除executor exec.worker.removeExecutor(exec) // 如果executor的狀態退出異常 val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don‘t go into an infinite loop. if (!normalExit) { // 判斷Application當前的重試次數是否達到了最大值 , 最大值默認為10 if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { // 沒有達到最大值 則繼續調度 schedule() } else { // 沒有達到最大值那就認為executor調度失敗 , // 並同時認為Application也是失敗了 , 將Application也從緩存隊列移除掉 val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") // 移除掉executor所在的Application removeApplication(appInfo, ApplicationState.FAILED) } } } } } case None => logWarning(s"Got status update for unknown executor $appId/$execId") } }

3--Master註冊機制源碼分析和狀態改變機制源碼分析