spark原始碼分析之Master原始碼主備切換機制分析
Master原始碼分析之主備切換機制
1.當選為leader之後的操作
//ElectedLeader 當選leader
case ElectedLeader => {
//從持久化引擎中獲取資料,driver,worker,app 的資訊
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
//如果APP、driver、worker是空的,recoverystate設定為alive
RecoveryState.ALIVE
} else {
//有一個不為空則設定為recovering
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
//判斷狀態如果為recovering 恢復中,將storedApps,storedDrier,storeWorkers重新註冊到master內部快取結構中
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
//呼叫自己的CompleteRecovery()方法
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
}
2.呼叫beginRecovery()方法
//開始恢復
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
//重新註冊application
registerApplication(app)
//將application狀態設定為unknown
app.state = ApplicationState.UNKNOWN
//向work中的driver傳送masterChanged訊息
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
//將storedDrivers重新加入記憶體快取中
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver
}
//將storedWorkers重新加入記憶體快取中
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
//重新註冊worker
registerWorker(worker)
//將worker狀態修改為unknown
worker.state = WorkerState.UNKNOWN
//向work發用masterChanged
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
3.呼叫registerApplication()方法,重新註冊APP
/**
* 註冊application
* @param app
*/
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
//spark測量系統通註冊appsource
applicationMetricsSystem.registerSource(app.appSource)
//將APP加入記憶體快取中
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
//等待排程的佇列,FIFO的演算法
waitingApps += app
}
4.重新註冊worker
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
//在同一個節點上可能有一個或多個死掉的worker(不同ID),刪除它們。
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
//從UNKNOWN註冊的worker意味著worker在恢復期間重新啟動。
//因此,老worker必須死亡,所以我們會把它刪除並接受新的worker。
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
//儲存workerInfo到wokers(hashmap)中
workers += worker
//儲存worker的id到idToWorker(hashmap)中
idToWorker(worker.id) = worker
//將work端點的地址儲存起來
addressToWorker(workerAddress) = worker
true
}
5.刪除舊的worker 呼叫removeworker()方法
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
//將work狀態修改為dead
worker.setState(WorkerState.DEAD)
//從idToWorker(hashmap)中去掉workid,
idToWorker -= worker.id
//從addressToWorker(hashmap)中去掉worker.endpoint.address
addressToWorker -= worker.endpoint.address
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
//向driver中傳送executor狀態改變
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
//從application中刪除掉這些executor
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
//重新啟動
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
//刪除driver
removeDriver(driver.id, DriverState.ERROR, None)
}
}
//持久化引擎刪除worker
persistenceEngine.removeWorker(worker)
}
6.呼叫completeRecovery()方法
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
//呼叫自己的CompleteRecovery()方法
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
7.以下是completeRecovery()的具體實現
/**
*completeRecovery 完成恢復 主備恢復機制
*/
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
//使用短的同步時間確保“只有一次”恢復語義。
//清理機制:1.從記憶體快取結構中移除。2.從相關的元件的記憶體中移除。3.從持久化儲存中移除
if (state != RecoveryState.RECOVERING) { return }
//將狀態修改為正在恢復
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
// 過濾出來任何對我們沒有迴應的worker和Apps,根據workstate和applicationstate判斷是否為unknown
//然後分別執行removerWorker和finishApplication,來刪除worker和application
//刪除worker
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
//刪除application
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
//重新排程 那些沒有迴應worker的 drivers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
//重新啟動driver
relaunchDriver(d)
} else {
//刪除driver
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
//將state轉為alive,代表恢復完成
state = RecoveryState.ALIVE
//重新呼叫schedule()恢復完成
schedule()
logInfo("Recovery complete - resuming operations!")
}
8.呼叫removWorker()方法,具體實現看第5條
9.master的finishApplication()方法呼叫了自己的removeApplication(app, ApplicationState.FINISHED)方法
//刪除application
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
//從application佇列(hashset)中刪除當前application
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
//加入已完成的application佇列
completedApps += app // Remember it in our history
//從當前等待執行的application佇列中刪除當前APP
waitingApps -= app
// If application events are logged, use them to rebuild the UI
asyncRebuildSparkUI(app)
for (exec <- app.executors.values) {
//停止executor
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
//從driver中刪除application
app.driver.send(ApplicationRemoved(state.toString))
}
//從持久化引擎中刪除application
persistenceEngine.removeApplication(app)
//從新排程任務
schedule()
// Tell all workers that the application has finished, so they can clean up any app state.
//告訴說有的worker,APP已經啟動完成了,所以他們可以清空APP state
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}
10。removeApplication()中呼叫了relaunchDriver()方法
/**
* 重新啟動driver
* @param driver
*/
private def relaunchDriver(driver: DriverInfo) {
//將driver的worker設定為None
driver.worker = None
//將driver的狀態設定為relaunching(重新排程)
driver.state = DriverState.RELAUNCHING
//將當前的driver重新加入waitingDrivers佇列
waitingDrivers += driver
//重新開始任務排程
schedule()
}
11.removeDriver()方法
//刪除driver
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
//用Scala高階函式find()根據driverId,查詢到driver
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//將driver將記憶體快取中刪除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
//將driver加入到已經完成的completeDrivers
completedDrivers += driver
//從持久化引擎中刪除driver
persistenceEngine.removeDriver(driver)
//設定drier狀態設定為完成
driver.state = finalState
driver.exception = exception
//從worker中遍歷刪除傳入的driver
driver.worker.foreach(w => w.removeDriver(driver))
//重新呼叫schedule
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
12.到處處master的主備切換基本上已經完成。
大體流程
13.如有錯誤之處還請及時指正,共同學習!!!
相關推薦
spark原始碼分析之Master原始碼主備切換機制分析
Master原始碼分析之主備切換機制 1.當選為leader之後的操作 //ElectedLeader 當選leader case ElectedLeader => {
Keepalived中Master和Backup主備切換機制淺析
keepalived priority weight BACKUP nginx 在keepalived的VRRP實例配置中會一般會設置Master和Backup來指定初始狀態,但是這並不意味著此節點一直就是Master角色。控制節點角色的是Keepalived配置文件中的“pr
大話Spark(7)-原始碼之Master主備切換
Master作為Spark Standalone模式中的核心,如果Master出現異常,則整個叢集的執行情況和資源都無法進行管理,整個叢集將處於無法工作的狀態。 Spark在設計的時候考慮到了這種情況,Master可以起一個或者多個Standby Master,當Master出現異常的時候,Standy Ma
spark master註冊機制和主備切換原始碼
master啟動程式碼如下 override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version
Spark原始碼分析之Master資源排程演算法原理
Master是通過schedule方法進行資源排程,告知worker啟動executor等。 一schedule方法 1判斷master狀態,只有alive狀態的master才可以進行資源排程,sta
Spark原始碼分析之Master註冊機制原理
一 Worker向Master註冊 1.1 Worker啟動,呼叫registerWithMaster,向Master註冊 當worker啟動的時候,會呼叫registerWithMaster方法
【kubernetes/k8s原始碼分析】kubelet原始碼分析之cdvisor原始碼分析
資料流 UnsecuredDependencies -> run 1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv
springMVC原始碼學習之addFlashAttribute原始碼分析
本文主要從falshMap初始化,存,取,消毀來進行原始碼分析,springmvc版本4.3.18。關於使用及驗證請參考另一篇https://www.cnblogs.com/pu20065226/p/10032048.html 1.初始化和呼叫,首先是入springMVC 入口webmvc包中org.spr
go 原始碼學習之---Tail 原始碼分析
已經有兩個月沒有寫部落格了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大資料相關的內容,導致最近工作較忙,部落格停止了更新,正好想撿起之前go的東西,所以找了一個原始碼學習 這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就學
SpringMVC原始碼學習之request處理流程 springMVC原始碼學習地址 springMVC原始碼學習之addFlashAttribute原始碼分析 java reflect反射呼叫方法invoke
目的:為看原始碼提供呼叫地圖,最長呼叫邏輯深度為8層,反正我是springMVC原始碼學習地址看了兩週才理出來的。 1.處理流程(版本為4.3.18) 入口為spring-webmvc-4.3.18.RELEASE.jar中org.springframework.web.servlet.Dispatche
Spring原始碼分析之ProxyFactoryBean方式實現Aop功能的分析
實現Aop功能有兩種方式, 1. ProxyFactoryBean方式: 這種方式是通過配置實現 2. ProxyFactory方式:這種方式是通過程式設計實現 這裡只說ProxyFactoryBean方式 首先說下具體的配置,一個例子如下: <bean id="t
Android Wi-Fi原始碼分析之wpa_supplicant初始化(四):wpa_supplicant_init_iface函式分析
wpa_config_read分析 路徑為:external\wpa_supplicant_8\wpa_supplicant\config_file.c struct wpa_config * wpa_config_read(const char *na
SOFABoot原始碼解析之啟動原理(1)-註解分析
一 、概述 SOFABoot是螞蟻金服開源的基於 Spring Boot 的研發框架,它在Spring Boot 的基礎上,提供了諸如 Readiness Check,類隔離,日誌空間隔離等等能力。在增強了 Spring Boot 的同時,SOFABoot 提供
分散式訊息佇列RocketMQ原始碼分析之2 -- Broker與NameServer心跳機制
我們知道,Kafka是通過ZK的臨時節點來監測Broker的死亡的。當一個Broker掛了之後,ZK上面對應的臨時節點被刪除,同時其他Broker收到通知。 那麼在RocketMQ中,對應的NameServer是如何判斷一個Broker的死亡呢? 有興趣朋友
Android Wi-Fi原始碼分析之wpa_supplicant初始化(三):wpa_supplicant_add_iface函式分析
路徑為:external\wpa_supplicant_8\wpa_supplicant\wpa_supplicant.c /** * wpa_supplicant_add_iface - Add a new network interface * @
Android原始碼分析之Glide原始碼分析&基礎版ImageLoader框架
1 Glide原始碼分析 Glide是一款由Bump Technologies開發的圖片載入框架,使得我們可以在Android平臺上以極度簡單的方式載入和展示圖片。本部落格基於Glide 3.7.0版本來進行講解,這個版本的Glide相當成熟和穩定。
Mybatis深入原始碼分析之SqlSessionFactoryBuilder原始碼分析
一:原始碼分析程式碼片段 public static void main(String[] args) {
webpack4.0原始碼解析之CommonJS規範打包後js檔案分析
首先,init之後建立一個簡單的webpack基本的配置,在src目錄下建立兩個js檔案(一個主入口檔案和一個非主入口檔案)和一個html檔案,package.json,webpack.config.js程式碼如下: var name=require('./index1.js') console.log('
第九篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 cache table
gravity base field eof 授權 葉子節點 command ref gist /** Spark SQL源碼分析系列文章*/ Spark SQL 可以將數據緩存到內存中,我們可以見到的通過調用cache table tableName即可將一張表緩
第十篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 query
pro .net asn 解析 partition store exec attr_ array /** Spark SQL源碼分析系列文章*/ 前面講到了Spark SQL In-Memory Columnar Storage的存儲結構是基於列存儲的。 那