1. 程式人生 > >spark原始碼分析之Master原始碼主備切換機制分析

spark原始碼分析之Master原始碼主備切換機制分析

                                Master原始碼分析之主備切換機制

1.當選為leader之後的操作

//ElectedLeader 當選leader
    case ElectedLeader => {
      //從持久化引擎中獲取資料,driver,worker,app 的資訊
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
        //如果APP、driver、worker是空的,recoverystate設定為alive
        RecoveryState.ALIVE
      } else
{ //有一個不為空則設定為recovering RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { //判斷狀態如果為recovering 恢復中,將storedApps,storedDrier,storeWorkers重新註冊到master內部快取結構中 beginRecovery(storedApps, storedDrivers, storedWorkers) recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def
run():
Unit = Utils.tryLogNonFatalError { //呼叫自己的CompleteRecovery()方法 self.send(CompleteRecovery) } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) } }

2.呼叫beginRecovery()方法

 //開始恢復
  private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
      storedWorkers: Seq[WorkerInfo]) {
    for
(app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { //重新註冊application registerApplication(app) //將application狀態設定為unknown app.state = ApplicationState.UNKNOWN //向work中的driver傳送masterChanged訊息 app.driver.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } //將storedDrivers重新加入記憶體快取中 for (driver <- storedDrivers) { // Here we just read in the list of drivers. Any drivers associated with now-lost workers // will be re-launched when we detect that the worker is missing. drivers += driver } //將storedWorkers重新加入記憶體快取中 for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { //重新註冊worker registerWorker(worker) //將worker狀態修改為unknown worker.state = WorkerState.UNKNOWN //向work發用masterChanged worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }

3.呼叫registerApplication()方法,重新註冊APP

  /**
    * 註冊application
    * @param app
    */
  private def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.address
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }
    //spark測量系統通註冊appsource
    applicationMetricsSystem.registerSource(app.appSource)
    //將APP加入記憶體快取中
    apps += app
    idToApp(app.id) = app
    endpointToApp(app.driver) = app
    addressToApp(appAddress) = app
    //等待排程的佇列,FIFO的演算法
    waitingApps += app
  }

4.重新註冊worker

private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    //在同一個節點上可能有一個或多個死掉的worker(不同ID),刪除它們。

    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      workers -= w
    }

    val workerAddress = worker.endpoint.address
    if (addressToWorker.contains(workerAddress)) {
      val oldWorker = addressToWorker(workerAddress)
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
        // The old worker must thus be dead, so we will remove it and accept the new worker.
          //從UNKNOWN註冊的worker意味著worker在恢復期間重新啟動。
        //因此,老worker必須死亡,所以我們會把它刪除並接受新的worker。
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }

    //儲存workerInfo到wokers(hashmap)中
    workers += worker
    //儲存worker的id到idToWorker(hashmap)中
    idToWorker(worker.id) = worker
    //將work端點的地址儲存起來
    addressToWorker(workerAddress) = worker
    true
  }

5.刪除舊的worker 呼叫removeworker()方法

private def removeWorker(worker: WorkerInfo) {
    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
      //將work狀態修改為dead
    worker.setState(WorkerState.DEAD)
    //從idToWorker(hashmap)中去掉workid,
    idToWorker -= worker.id
    //從addressToWorker(hashmap)中去掉worker.endpoint.address
    addressToWorker -= worker.endpoint.address
    for (exec <- worker.executors.values) {
      logInfo("Telling app of lost executor: " + exec.id)
      //向driver中傳送executor狀態改變
      exec.application.driver.send(ExecutorUpdated(
        exec.id, ExecutorState.LOST, Some("worker lost"), None))
      //從application中刪除掉這些executor
      exec.application.removeExecutor(exec)
    }
    for (driver <- worker.drivers.values) {
      if (driver.desc.supervise) {
        logInfo(s"Re-launching ${driver.id}")
        //重新啟動
        relaunchDriver(driver)
      } else {
        logInfo(s"Not re-launching ${driver.id} because it was not supervised")
        //刪除driver
        removeDriver(driver.id, DriverState.ERROR, None)
      }
    }
    //持久化引擎刪除worker
    persistenceEngine.removeWorker(worker)
  }

6.呼叫completeRecovery()方法

recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            //呼叫自己的CompleteRecovery()方法
            self.send(CompleteRecovery)
          }
        }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

7.以下是completeRecovery()的具體實現

/**
    *completeRecovery  完成恢復 主備恢復機制
    */
  private def completeRecovery() {
    // Ensure "only-once" recovery semantics using a short synchronization period.
    //使用短的同步時間確保“只有一次”恢復語義。

    //清理機制:1.從記憶體快取結構中移除。2.從相關的元件的記憶體中移除。3.從持久化儲存中移除
    if (state != RecoveryState.RECOVERING) { return }
    //將狀態修改為正在恢復
    state = RecoveryState.COMPLETING_RECOVERY

    // Kill off any workers and apps that didn't respond to us.
    // 過濾出來任何對我們沒有迴應的worker和Apps,根據workstate和applicationstate判斷是否為unknown
    //然後分別執行removerWorker和finishApplication,來刪除worker和application
    //刪除worker
    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
    //刪除application
    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

    // Reschedule drivers which were not claimed by any workers
    //重新排程 那些沒有迴應worker的 drivers
    drivers.filter(_.worker.isEmpty).foreach { d =>
      logWarning(s"Driver ${d.id} was not found after master recovery")
      if (d.desc.supervise) {
        logWarning(s"Re-launching ${d.id}")
        //重新啟動driver
        relaunchDriver(d)
      } else {
         //刪除driver
        removeDriver(d.id, DriverState.ERROR, None)
        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
      }
    }

    //將state轉為alive,代表恢復完成
    state = RecoveryState.ALIVE
    //重新呼叫schedule()恢復完成
    schedule()
    logInfo("Recovery complete - resuming operations!")
  }

8.呼叫removWorker()方法,具體實現看第5條

9.master的finishApplication()方法呼叫了自己的removeApplication(app, ApplicationState.FINISHED)方法

//刪除application
  def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
    if (apps.contains(app)) {
      logInfo("Removing app " + app.id)
      //從application佇列(hashset)中刪除當前application
      apps -= app

      idToApp -= app.id
      endpointToApp -= app.driver
      addressToApp -= app.driver.address
      if (completedApps.size >= RETAINED_APPLICATIONS) {
        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
        completedApps.take(toRemove).foreach( a => {
          Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
          applicationMetricsSystem.removeSource(a.appSource)
        })
        completedApps.trimStart(toRemove)
      }
      //加入已完成的application佇列
      completedApps += app // Remember it in our history
      //從當前等待執行的application佇列中刪除當前APP
      waitingApps -= app

      // If application events are logged, use them to rebuild the UI
      asyncRebuildSparkUI(app)

      for (exec <- app.executors.values) {
        //停止executor
        killExecutor(exec)
      }
      app.markFinished(state)
      if (state != ApplicationState.FINISHED) {
        //從driver中刪除application
        app.driver.send(ApplicationRemoved(state.toString))
      }
      //從持久化引擎中刪除application
      persistenceEngine.removeApplication(app)
      //從新排程任務
      schedule()

      // Tell all workers that the application has finished, so they can clean up any app state.
      //告訴說有的worker,APP已經啟動完成了,所以他們可以清空APP state
      workers.foreach { w =>
        w.endpoint.send(ApplicationFinished(app.id))
      }
    }
  }

10。removeApplication()中呼叫了relaunchDriver()方法

 /**
    * 重新啟動driver
    * @param driver
    */
  private def relaunchDriver(driver: DriverInfo) {
      //將driver的worker設定為None
    driver.worker = None
    //將driver的狀態設定為relaunching(重新排程)
    driver.state = DriverState.RELAUNCHING
    //將當前的driver重新加入waitingDrivers佇列
    waitingDrivers += driver
    //重新開始任務排程
    schedule()
  }

11.removeDriver()方法

 //刪除driver
  private def removeDriver(
      driverId: String,
      finalState: DriverState,
      exception: Option[Exception]) {
    //用Scala高階函式find()根據driverId,查詢到driver
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //將driver將記憶體快取中刪除
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        //將driver加入到已經完成的completeDrivers
        completedDrivers += driver
        //從持久化引擎中刪除driver
        persistenceEngine.removeDriver(driver)
        //設定drier狀態設定為完成
        driver.state = finalState
        driver.exception = exception
        //從worker中遍歷刪除傳入的driver
        driver.worker.foreach(w => w.removeDriver(driver))
        //重新呼叫schedule
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }
}

12.到處處master的主備切換基本上已經完成。
大體流程這裡寫圖片描述
13.如有錯誤之處還請及時指正,共同學習!!!

相關推薦

spark原始碼分析Master原始碼切換機制分析

Master原始碼分析之主備切換機制 1.當選為leader之後的操作 //ElectedLeader 當選leader case ElectedLeader => {

Keepalived中Master和Backup切換機制淺析

keepalived priority weight BACKUP nginx 在keepalived的VRRP實例配置中會一般會設置Master和Backup來指定初始狀態,但是這並不意味著此節點一直就是Master角色。控制節點角色的是Keepalived配置文件中的“pr

大話Spark(7)-原始碼Master切換

Master作為Spark Standalone模式中的核心,如果Master出現異常,則整個叢集的執行情況和資源都無法進行管理,整個叢集將處於無法工作的狀態。 Spark在設計的時候考慮到了這種情況,Master可以起一個或者多個Standby Master,當Master出現異常的時候,Standy Ma

spark master註冊機制切換原始碼

master啟動程式碼如下 override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version

Spark原始碼分析Master資源排程演算法原理

Master是通過schedule方法進行資源排程,告知worker啟動executor等。 一schedule方法 1判斷master狀態,只有alive狀態的master才可以進行資源排程,sta

Spark原始碼分析Master註冊機制原理

一 Worker向Master註冊 1.1 Worker啟動,呼叫registerWithMaster,向Master註冊 當worker啟動的時候,會呼叫registerWithMaster方法

【kubernetes/k8s原始碼分析】kubelet原始碼分析cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

springMVC原始碼學習addFlashAttribute原始碼分析

本文主要從falshMap初始化,存,取,消毀來進行原始碼分析,springmvc版本4.3.18。關於使用及驗證請參考另一篇https://www.cnblogs.com/pu20065226/p/10032048.html 1.初始化和呼叫,首先是入springMVC 入口webmvc包中org.spr

go 原始碼學習---Tail 原始碼分析

已經有兩個月沒有寫部落格了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大資料相關的內容,導致最近工作較忙,部落格停止了更新,正好想撿起之前go的東西,所以找了一個原始碼學習 這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就學

SpringMVC原始碼學習request處理流程 springMVC原始碼學習地址 springMVC原始碼學習addFlashAttribute原始碼分析 java reflect反射呼叫方法invoke

目的:為看原始碼提供呼叫地圖,最長呼叫邏輯深度為8層,反正我是springMVC原始碼學習地址看了兩週才理出來的。 1.處理流程(版本為4.3.18) 入口為spring-webmvc-4.3.18.RELEASE.jar中org.springframework.web.servlet.Dispatche

Spring原始碼分析ProxyFactoryBean方式實現Aop功能的分析

實現Aop功能有兩種方式, 1. ProxyFactoryBean方式: 這種方式是通過配置實現 2. ProxyFactory方式:這種方式是通過程式設計實現 這裡只說ProxyFactoryBean方式 首先說下具體的配置,一個例子如下: <bean id="t

Android Wi-Fi原始碼分析wpa_supplicant初始化(四):wpa_supplicant_init_iface函式分析

wpa_config_read分析 路徑為:external\wpa_supplicant_8\wpa_supplicant\config_file.c struct wpa_config * wpa_config_read(const char *na

SOFABoot原始碼解析啟動原理(1)-註解分析

一 、概述        SOFABoot是螞蟻金服開源的基於 Spring Boot 的研發框架,它在Spring Boot 的基礎上,提供了諸如 Readiness Check,類隔離,日誌空間隔離等等能力。在增強了 Spring Boot 的同時,SOFABoot 提供

分散式訊息佇列RocketMQ原始碼分析2 -- Broker與NameServer心跳機制

我們知道,Kafka是通過ZK的臨時節點來監測Broker的死亡的。當一個Broker掛了之後,ZK上面對應的臨時節點被刪除,同時其他Broker收到通知。 那麼在RocketMQ中,對應的NameServer是如何判斷一個Broker的死亡呢? 有興趣朋友

Android Wi-Fi原始碼分析wpa_supplicant初始化(三):wpa_supplicant_add_iface函式分析

路徑為:external\wpa_supplicant_8\wpa_supplicant\wpa_supplicant.c /** * wpa_supplicant_add_iface - Add a new network interface * @

Android原始碼分析Glide原始碼分析&基礎版ImageLoader框架

1 Glide原始碼分析   Glide是一款由Bump Technologies開發的圖片載入框架,使得我們可以在Android平臺上以極度簡單的方式載入和展示圖片。本部落格基於Glide 3.7.0版本來進行講解,這個版本的Glide相當成熟和穩定。

Mybatis深入原始碼分析SqlSessionFactoryBuilder原始碼分析

一:原始碼分析程式碼片段 public static void main(String[] args) {

webpack4.0原始碼解析CommonJS規範打包後js檔案分析

首先,init之後建立一個簡單的webpack基本的配置,在src目錄下建立兩個js檔案(一個主入口檔案和一個非主入口檔案)和一個html檔案,package.json,webpack.config.js程式碼如下: var name=require('./index1.js') console.log('

第九篇:Spark SQL 源碼分析 In-Memory Columnar Storage源碼分析 cache table

gravity base field eof 授權 葉子節點 command ref gist /** Spark SQL源碼分析系列文章*/ Spark SQL 可以將數據緩存到內存中,我們可以見到的通過調用cache table tableName即可將一張表緩

第十篇:Spark SQL 源碼分析 In-Memory Columnar Storage源碼分析 query

pro .net asn 解析 partition store exec attr_ array /** Spark SQL源碼分析系列文章*/ 前面講到了Spark SQL In-Memory Columnar Storage的存儲結構是基於列存儲的。 那