spark master註冊機制和主備切換原始碼
阿新 • • 發佈:2019-01-26
master啟動程式碼如下
override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") //啟動web頁面 webUi = new MasterWebUI(this, webUiPort) webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) } }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) if (restServerEnabled) { val port = conf.getInt("spark.master.rest.port", 6066) restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) } restServerBoundPort = restServer.map(_.start()) //啟動指標監控 masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() // Attach the master and app metrics servlet handler to the web ui after the metrics systems are // started. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val serializer = new JavaSerializer(conf) //建立持久化引擎 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ }
在spark中,Application,driver,worker都可能會註冊到master上。
過程如下
1、Application註冊
case RegisterApplication(description, driver) => { // TODO Prevent repeated registrations from some driver //如果master狀態為standby,就什麼都不做 if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) //根據ApplicationDesc資訊和driver建立Application val app = createApplication(description, driver) //註冊ApplicationInfoDriver registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) //持久化ApplicationInfo到持久化引擎中 persistenceEngine.addApplication(app) //向driver傳送,已經註冊的訊息 driver.send(RegisteredApplication(app.id, self)) //執行排程方法 schedule() } }
通過程式碼可以看到,如果master是standby狀態,不做操作。
這個方法就是把app資訊存入記憶體中。private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } applicationMetricsSystem.registerSource(app.appSource) apps += app idToApp(app.id) = app endpointToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app }
driver.send(RegisteredApplication(app.id, self)) 這行程式碼,就是發訊息讓AppClient執行RegisteredApplication方法,告訴AppClient註冊完了。
關於這塊可以看另一篇部落格http://blog.csdn.net/zlx510tsde/article/details/78814717
case RegisteredApplication(appId_, masterRef) =>
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
作用就是把master和appId存入記憶體。
schedule方法用於啟動driver
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
//隨機打亂所有worker
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
//雙重迴圈,所有等待的driver,如果worker的記憶體和cpu核數,大於driver需要的記憶體和cpu
//就執行launchDriver方法
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
//啟動worker上的executor
startExecutorsOnWorkers()
}
2、driver註冊
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
//建立driver
val driver = createDriver(description)
//持久化driver
persistenceEngine.addDriver(driver)
//等待執行的driver列表+1
waitingDrivers += driver
drivers.add(driver)
//執行schedule方法,啟動driver
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
3、worker註冊
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
//註冊worker
if (registerWorker(worker)) {
//持久化worker
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
//執行schedule方法
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
當master主備切換時,master呼叫如下方法
case ElectedLeader => {
//從持久化引擎中讀取持久化的app,driver,worker資訊
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
//開始執行recovery
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
//呼叫master的CompleteRecovery
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
}
在biginRecovery方法中
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
//app資訊加入master記憶體
registerApplication(app)
//App狀態都設定為UNKNOWN
app.state = ApplicationState.UNKNOWN
//向appClient傳送MasterChanged訊息,告訴app,master改變了
//appClient收到訊息後,改變app的master地址,然後向Master傳送訊息MasterChangeAcknowledged
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver
}
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
//worker資訊加入master記憶體
registerWorker(worker)
//狀態設定為UNKNOWN
worker.state = WorkerState.UNKNOWN
//向worker傳送訊息MasterChanged,worker收到訊息後,改變worker的master地址
//worker向master傳送訊息WorkerSchedulerStateResponse
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
MasterChangeAcknowledged和WorkerSchedulerStateResponse方法被呼叫,說明worker和app收到了master狀態改變的訊息,
並做了響應,所以在這兩個方法中,把worker和app的狀態設定為ALIVE。
最後在這兩個方法中都會呼叫completeRecovery方法,會刪除狀態仍為UNKNOWN的worker和app(如果現在狀態仍為UNKNOWN,
說明worker已經死了,或者app已經死了或者完成了)
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}