1. 程式人生 > >Spark:Master原理剖析與原始碼分析

Spark:Master原理剖析與原始碼分析

Master主備切換

Spark原生的standalone模式是支援主備切換的,也就是說master可以配置兩個,當Action Master因故障掛了的時候,系統會自動將Standby Master 切換成 Active Master。

Master的準備切換分為兩種:

  1. 一種是基於檔案系統的,spark提供目錄儲存spark Application和worker的註冊資訊,並將他們的恢復狀態寫入該目錄,當spark的master節點宕掉的時候,重啟master,就能獲取application和worker的註冊資訊。需要手動進行切換
  2. 一種是基於zookeeper的,用於生產模式。其基本原理是通過zookeeper來選舉一個Master,其他的Master處於Standby狀態。將Standalone叢集連線到同一個ZooKeeper例項並啟動多個Master,利用zookeeper提供的選舉和狀態儲存功能,可以使一個Master被選舉,而其他Master處於Standby狀態。如果現任Master死去,另一個Master會通過選舉產生,並恢復到舊的Master狀態,然後恢復排程。整個恢復過程可能要1-2分鐘。

在這裡插入圖片描述

流程:

  1. 在active Master宕掉之後,內部持久化(FileSystemPersistenceEngine和ZookeeperPersistenceEngine)引擎首先會讀取持久化的storedApps、storedDrivers、storedWorkers
  2. 判斷,如果storedApps、storedDrivers、storedWorkers有任何一個是有內容的
  3. 將持久化的Application、Driver、Worker資訊重新註冊,註冊到Master內部的記憶體快取結構中
  4. 將Application和Worker的狀態都修改為UNKNOWN,然後向Application所對應的Driver和Worker傳送StandBy Master的地址
  5. 如果Driver和Wroker是正常運轉的情況下,接收到Master傳送過來的地址後,就會返回響應訊息到新的Master
  6. Master陸續接收到Driver和Worker傳送過來的訊息後,會使用completeRecovery()方法對沒有傳送響應訊息的Driver和Worker進行處理,過濾掉他們的資訊。
  7. 呼叫Master的schedule()方法,對正在排程的Driver和Application進行排程。在worker上啟動driver,或者是為Applicaiton在worker上啟動executor

原始碼分析:
原始碼:org/apache/spark/deploy/master/Master.scala

override def receive: PartialFunction[Any, Unit] = {
    case ElectedLeader => {
      //讀取持久化的storedApps、storedDrivers、storedWorkers
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
      //判斷,如果storedApps、storedDrivers、storedWorkers有任何一個是有內容的
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
        RecoveryState.ALIVE
      } else {
        RecoveryState.RECOVERING
      }
      logInfo("I have been elected leader! New state: " + state)
      if (state == RecoveryState.RECOVERING) {
        //開始恢復master
        beginRecovery(storedApps, storedDrivers, storedWorkers)
        recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(CompleteRecovery)
          }
        }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
      }
    }
    //完成master的主備切換
    case CompleteRecovery => completeRecovery()

......

}

第一步:開始恢復master

/**
   *  開始恢復master
   */
  private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
    storedWorkers: Seq[WorkerInfo]) {
    for (app <- storedApps) {
      logInfo("Trying to recover app: " + app.id)
      try {
        // 註冊application
        registerApplication(app)
        app.state = ApplicationState.UNKNOWN
        app.driver.send(MasterChanged(self, masterWebUiUrl))
      } catch {
        case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
      }
    }

    for (driver <- storedDrivers) {
      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
      // will be re-launched when we detect that the worker is missing.
      drivers += driver
    }

    for (worker <- storedWorkers) {
      logInfo("Trying to recover worker: " + worker.id)
      try {
        // 註冊worker
        registerWorker(worker)
        worker.state = WorkerState.UNKNOWN
        worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
      } catch {
        case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
      }
    }
  }

第二步:點選第一步中registerApplication

  private def registerApplication(app: ApplicationInfo): Unit = {
    //拿到driver的地址
    val appAddress = app.driver.address
    //如果driver的地址存在的情況下,就直接返回,就相當於對driver進行重複註冊
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }

    applicationMetricsSystem.registerSource(app.appSource)
    
    //將Application的資訊加入到記憶體快取中
    apps += app
    idToApp(app.id) = app
    endpointToApp(app.driver) = app
    addressToApp(appAddress) = app
    
    //將Application的資訊加入到等待排程的佇列中,排程的演算法為FIFO
    waitingApps += app
  }

第三步:點選第一步中registerWorker

  /**
   * 註冊worker
   *
   * 遍歷所有管理的Worker,若有與新註冊的Worker相同的host,port且處於Dead(超時)狀態的Worker則直接從workers中移除。
   * 若管理的addressToWorker已經存在新註冊的Worker一樣的workerAddress,則獲取老Worker,若狀態是UNKNOWN說明Master 處於recovery,
   * Worker正處於恢復中,則將老Worker移除,將新Worker直接加入併成功返回,若老Worker是其他狀態則說明已經重複註冊了,返回失敗。
   *
   */
  private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    //過濾掉狀態為DEAD的worker
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      //  遍歷workers HashSet,移除該worker
      workers -= w
    }

    //worker地址
    val workerAddress = worker.endpoint.address
    //如果已存在,該worker為oldWorker
    if (addressToWorker.contains(workerAddress)) {
      val oldWorker = addressToWorker(workerAddress)
      //  如果oldWorker的狀態為UNKOWN,則移除該oldWorker
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
        // The old worker must thus be dead, so we will remove it and accept the new worker.
        //一個從UNKOWN狀態中註冊的worker,該worker在恢復過程被重啟
        // old worker必須被殺死,移除old worker,接收新的worker
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }
    //  將新的worker新增到HashSet中去
    workers += worker
    //  將worker id,address資訊新增到對應的HashMap中
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker
    //  返回True,worker註冊成功
    true
  }

第四步:完成master的主備切換

/**
   * 完成master的主備切換,也就是完成master的主備切換
   */
  private def completeRecovery() {
    // Ensure "only-once" recovery semantics using a short synchronization period.
    if (state != RecoveryState.RECOVERING) { return }
    state = RecoveryState.COMPLETING_RECOVERY

    // Kill off any workers and apps that didn't respond to us.
    //將Applicaiton和Worker都過濾出來,目前狀況還是UNKNOWN的
    //然後遍歷,分別呼叫removeWorker和finishApplication方法,對可能已經出故障,或者已經死掉的Application和Worker進行清理
    /**
     * 清理三點:
     *   1、從記憶體快取結構中移除
     *   2、從相關元件的記憶體快取中移除(比如說worker所在的driver也要移除)
     *   3、從持久化儲存中移除
     */
    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

    // Reschedule drivers which were not claimed by any workers
    drivers.filter(_.worker.isEmpty).foreach { d =>
      logWarning(s"Driver ${d.id} was not found after master recovery")
      // 重新啟動driver,對於sparkstreaming程式而言
      if (d.desc.supervise) {
        logWarning(s"Re-launching ${d.id}")
        relaunchDriver(d)
      } else {
        removeDriver(d.id, DriverState.ERROR, None)
        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
      }
    }

    state = RecoveryState.ALIVE
    //資源排程演算法
    schedule()
    logInfo("Recovery complete - resuming operations!")
  }

第五步:點選第四步中的removeWorker

/**
   * 移除worker
   */
  private def removeWorker(worker: WorkerInfo) {
    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
    worker.setState(WorkerState.DEAD)
    //將記憶體快取結構中移除(HashMap)
    idToWorker -= worker.id
    addressToWorker -= worker.endpoint.address
    for (exec <- worker.executors.values) {
      logInfo("Telling app of lost executor: " + exec.id)
      // 向driver傳送exeutor丟失了
      exec.application.driver.send(ExecutorUpdated(
        exec.id, ExecutorState.LOST, Some("worker lost"), None))
      // 將worker上的所有executor給清楚掉
      exec.application.removeExecutor(exec)
    }
    for (driver <- worker.drivers.values) {
      //spark自動監視,driver所在的worker掛掉的時候,也會把這個driver移除掉,如果配置supervise這個屬性的時候,driver也掛掉的時候master會重新啟動driver
      if (driver.desc.supervise) {
        logInfo(s"Re-launching ${driver.id}")
        relaunchDriver(driver)
      } else {
        logInfo(s"Not re-launching ${driver.id} because it was not supervised")
        removeDriver(driver.id, DriverState.ERROR, None)
      }
    }
    // 持久化引擎會移除worker
    persistenceEngine.removeWorker(worker)
  }

第六步:點選finishApplication

private def finishApplication(app: ApplicationInfo) {
    removeApplication(app, ApplicationState.FINISHED)
  }

  def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
    // 將資料從記憶體快取結果中移除
    if (apps.contains(app)) {
      logInfo("Removing app " + app.id)
      apps -= app
      idToApp -= app.id
      endpointToApp -= app.driver
      addressToApp -= app.driver.address
      if (completedApps.size >= RETAINED_APPLICATIONS) {
        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
        completedApps.take(toRemove).foreach(a => {
          appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
          applicationMetricsSystem.removeSource(a.appSource)
        })
        completedApps.trimStart(toRemove)
      }
      completedApps += app // Remember it in our history
      waitingApps -= app

      // If application events are logged, use them to rebuild the UI
      rebuildSparkUI(app)

      for (exec <- app.executors.values) {
        // 殺掉app對應的executor
        killExecutor(exec)
      }
      app.markFinished(state)
      if (state != ApplicationState.FINISHED) {
        app.driver.send(ApplicationRemoved(state.toString))
      }
      // 從持久化引擎中移除application
      persistenceEngine.removeApplication(app)
      schedule()

      // Tell all workers that the application has finished, so they can clean up any app state.
      workers.foreach { w =>
        w.endpoint.send(ApplicationFinished(app.id))
      }
    }
  }

第七步:點選relaunchDriver

  /**
   * 重新發布Driver
   */
  private def relaunchDriver(driver: DriverInfo) {
    driver.worker = None
    // 將driver的狀態設定為relaunching
    driver.state = DriverState.RELAUNCHING
    // 將driver加入到等待的隊列當中
    waitingDrivers += driver
    schedule()
  }

第八步:點選removeDriver

  /**
   * 移除driver
   */
  private def removeDriver(
    driverId: String,
    finalState: DriverState,
    exception: Option[Exception]) {
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //記憶體快取移除(HashSet)
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 將driver加入到已經完成的driver中
        completedDrivers += driver
        // 將driver從持久化引擎中移除掉
        persistenceEngine.removeDriver(driver)
        // 將driver的狀態設定為final
        driver.state = finalState
        driver.exception = exception
        // 將driver所在的worker中移除掉driver
        driver.worker.foreach(w => w.removeDriver(driver))
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

Master註冊機制

在這裡插入圖片描述

worker向master註冊

  1. worker在啟動之後,就會向master進行註冊
  2. 對於worker狀態為dead的,過濾掉。比如說master在一定時間期限已經完成了恢復,但是發現其中的worker為unknown的情況下,對worker進行remove,將worker的狀態設定為dead,如果過了很長時間worker又莫名其妙的向master進行註冊的情況下,直接過濾掉;對於worker狀態為unknown,master會將舊的worker資訊給清理掉,替換成新的worker資訊。比如說master剛啟動的時候,會向worker傳送新的地址的時候,master會將該worker狀態設定為unknown,worker向master返回註冊資訊的時候,master會將舊的worker資訊給清理掉,替換成新的worker資訊
  3. 將worker加入記憶體快取中(HashMap)
  4. 用持久化引擎將worker資訊持久化,可能是檔案系統,可能是zookeeper
  5. 呼叫schedule()方法進行排程

原始碼分析:
第一步:處理worker的註冊請求
原始碼位置:org.apache.spark.deploy.master.Master

/**
  * 處理worker的註冊請求
  * 注:
  * 這樣設計Worker註冊機制有一個很大的好處,在生產環境下,想要把新的Worker 加入到已經執行的Spark 叢集上,
  * 不需要重新啟動Spark 叢集,就能夠使用新加入的Worker 以提升處理效能
  * 
*/
case RegisterWorker(
      id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      //判斷Master的狀態是否為StandBy  
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response 忽略,不傳送響應
      } else if (idToWorker.contains(id)) {
        //  如果WorkerId存在,則回覆Worker註冊失敗"重複的worker id"
        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        //  將worker的id,host,埠,cpu數,記憶體等資訊封裝成一個WorkerInfo
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerUiPort, publicAddress)
        //  判斷註冊Worker註冊是否成功
        if (registerWorker(worker)) {
          // 持久化引擎新增worker的資訊
          persistenceEngine.addWorker(worker)
          workerRef.send(RegisteredWorker(self, masterWebUiUrl))
          // 資源排程演算法
          schedule()
        } else {
          //worker註冊失敗
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
}

第二步:點選第一步的registerWorke

  /**
   * 註冊worker
   */
  private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    //過濾掉狀態為DEAD的worker
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      //  遍歷workers HashSet,移除該worker
      workers -= w
    }
 
    //worker地址
    val workerAddress = worker.endpoint.address
    //如果已存在,該worker為oldWorker
    if (addressToWorker.contains(workerAddress)) {
      val oldWorker = addressToWorker(workerAddress)
      //  如果oldWorker的狀態為UNKOWN,則移除該oldWorker
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
        // The old worker must thus be dead, so we will remove it and accept the new worker.
        //一個從UNKOWN狀態中註冊的worker,該worker在恢復過程被重啟
        // old worker必須被殺死,移除old worker,接收新的worker
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }
    //  將新的worker新增到HashSet中去 
    workers += worker
    //  將worker id,address資訊新增到對應的HashMap中
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker
    //  返回True,worker註冊成功
    true
  }

第三步:點選第二步中的removeWorker

/**
   * 移除worker
   */
  private def removeWorker(worker: WorkerInfo) {
    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
    worker.setState(WorkerState.DEAD)
    //將記憶體快取結構中移除(HashMap)
    idToWorker -= worker.id
    addressToWorker -= worker.endpoint.address
    for (exec <- worker.executors.values) {
      logInfo("Telling app of lost executor: " + exec.id)
      // 向driver傳送exeutor丟失了
      exec.application.driver.send(ExecutorUpdated(
        exec.id, ExecutorState.LOST, Some("worker lost"), None))
      // 將worker上的所有executor給清楚掉
      exec.application.removeExecutor(exec)
    }
    for (driver <- worker.drivers.values) {
      //spark自動監視,driver所在的worker掛掉的時候,也會把這個driver移除掉,如果配置supervise這個屬性的時候,driver也掛掉的時候master會重新啟動driver
      if (driver.desc.supervise) {
        logInfo(s"Re-launching ${driver.id}")
        relaunchDriver(driver)
      } else {
        logInfo(s"Not re-launching ${driver.id} because it was not supervised")
        removeDriver(driver.id, DriverState.ERROR, None)
      }
    }
    // 持久化引擎會移除worker
    persistenceEngine.removeWorker(worker)
  }

第四步:點選第三步中的removeExecutor
原始碼位置:org.apache.spark.deploy.master.ApplicationInfo

 /**
   * 將executor從內部的記憶體緩衝結構中移除
   */
 private[master] def removeExecutor(exec: ExecutorDesc) {
    if (executors.contains(exec.id)) {
      removedExecutors += executors(exec.id)
      executors -= exec.id
      coresGranted -= exec.cores
    }
  }

第五步:點選第三步中的relaunchDriver

 /**
   * 重新發布Driver
   */
  private def relaunchDriver(driver: DriverInfo) {
    driver.worker = None
    // 將driver的狀態設定為relaunching
    driver.state = DriverState.RELAUNCHING
    // 將driver加入到等待的隊列當中
    waitingDrivers += driver
    schedule()
  }

第六步:點選第三步中的removeDriver

  /**
   * 移除driver
   */
  private def removeDriver(
    driverId: String,
    finalState: DriverState,
    exception: Option[Exception]) {
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //記憶體快取移除(HashSet)
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 將driver加入到已經完成的driver中
        completedDrivers += driver
        // 將driver從持久化引擎中移除掉
        persistenceEngine.removeDriver(driver)
        // 將driver的狀態設定為final
        driver.state = finalState
        driver.exception = exception
        // 將driver所在的worker中移除掉driver
        driver.worker.foreach(w => w.removeDriver(driver))
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

driver向master註冊

  1. 用spark-submit提交sparkApplication的時候,dirver首先就會向master進行註冊,將driver資訊放入到記憶體快取中,也就是hashmap中
  2. 加入等待排程佇列,也就是ArrayBuffer
  3. 用持久化引擎將driver資訊持久化,可能是檔案系統,可能是zookeeper
  4. 呼叫schedule()方法進行排程

原始碼分析:
第一步:請求提交Driver

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    /**
     * 請求提交Driver,引數 DriverDescription
     */
    case RequestSubmitDriver(description) => {
      // 判斷Master狀態是否為Alive
      if (state != RecoveryState.ALIVE) {
       //  如果不為Alive,則迴應提交Driver失敗,需要向Alive的Master提交
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
         //使用createDriver方法,建立driver
        val driver = createDriver(description)
         // 使用持久化引擎將driver的資訊持久化
        persistenceEngine.addDriver(driver)
        // 將driver加入到等待排程的佇列中(ArrayBuffer)
        waitingDrivers += driver
        // 在持久化記憶體中將driver加入到內部的記憶體緩衝結構中(HashSet)
        drivers.add(driver)
        //排程演算法
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".
        //註冊成功後,RpcCallContext迴應Driver註冊響應,已成功提交driver + id
        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }
    }
    
......

}

第二步:點選第一步中createDriver

  /**
   * 建立Driver,返回封裝好的DriverInfo
   */
  private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    //將時間,driver id,desc等資訊封裝成一個DriverInfo返回
    new DriverInfo(now, newDriverId(date), desc, date)
  }

application向master進行註冊(registerApplication()方法)

  1. Driver啟動好之後,會執行我們的application程式碼,執行sparkContext的初始化,底層的SparkDeploySchedulerBackend,會通過AppClient內部的執行緒,ClientActor傳送RegisterAppliction,到master進行Application進行註冊
  2. 將application資訊放入到記憶體快取中(HashmMap)
  3. 將application加入等待的排程佇列(ArrayBuffer)
  4. 用持久化引擎將application資訊持久化,可能是檔案系統,可能是zookeeper
  5. 呼叫schedule()方法進行排程

原始碼分析:
第一步:底層的SparkDeploySchedulerBackend,會通過AppClient內部的執行緒,ClientActor傳送Register Appliction,到master進行Application進行註冊
原始碼位置:org.apache.spark.deploy.client.AppClient

/**
 *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
 *  master支援兩種主備切換機制,一種的hdfs的,另外一種是基於zookeeper的(動態ha,熱切換的)
*/
 private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            //傳送RegisterApplication這個case cass,把appDescription傳送給master,進行向master註冊
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
 }

第二步:處理Applicaton的註冊的請求
原始碼位置:org.apache.spark.deploy.master.Master

 /**
  * 處理Applicaton的註冊的請求
 */
case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver
      // 如果master的狀態為standby,也就是當前的這個master,不是active
      // 那麼applicaiton來請求註冊,什麼都不會做
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        // 用ApplicationDescription資訊,建立ApplicationInfo
        val app = createApplication(description, driver)
        // 註冊Application ,將Application加入快取,將Application加入等待排程的佇列
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        // 用持久化引擎,將ApplicationInfo進行持久化
        persistenceEngine.addApplication(app)
        // 反向,向SparkDeploySchedulerBackend的AppClient的ClientActor傳送訊息,也就是registeredApplication,而不是registerApplication
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }
 }

第三步:點選第二步中的createApplication

private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    // 用當前時間戳給該application生成唯一的id
    new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
 }

第四步:點選registerApplication

private def registerApplication(app: ApplicationInfo): Unit = {
    //拿到driver的地址
    val appAddress = app.driver.address
    //如果driver的地址存在的情況下,就直接返回,就相當於對driver進行重複註冊
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }

    applicationMetricsSystem.registerSource(app.appSource)
    
    //將Application的資訊加入到記憶體快取中
    apps += app
    idToApp(app.id) = app
    endpointToApp(app.driver) = app
    addressToApp(appAddress) = app
    
    //將Application的資訊加入到等待排程的佇列中,排程的演算法為FIFO
    waitingApps += app
 }

第五步:反向,向SparkDeploySchedulerBackend的AppClient的ClientActor傳送訊息,點選SparkDeploySchedulerBackend的RegisteredApplication
原始碼位置:org.apache.spark.deploy.client.AppClient

override def receive: PartialFunction[Any, Unit] = {
      case RegisteredApplication(appId_, masterRef) =>
        // FIXME How to handle the following cases?
        // 1. A master receives multiple registrations and sends back multiple
        // RegisteredApplications due to an unstable network.
        // 2. Receive multiple RegisteredApplication from different masters because the master is
        // changing.
        
        /**
         * 如何修復這個bug?
         * 1.由於不穩定的網路,一個主接收多個註冊並返回多個註冊應用程式
         * 2.在master的變化的過程中,如何從不同的registeredApplication從不同的master中
         */
        appId = appId_
        registered = true
        master = Some(masterRef)
        listener.connected(appId)

      case ApplicationRemoved(message) =>
        markDead("Master removed our application: %s".format(message))
        stop()

      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
        val fullId = appId + "/" + id
        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
          cores))
        // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
        // guaranteed), `ExecutorStateChanged` may be sent to a dead master.
        sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))
        listener.executorAdded(fullId, workerId, hostPort, cores, memory)

      case ExecutorUpdated(id, state, message, exitStatus) =>
        val fullId = appId + "/" + id
        val messageText = message.map(s => " (" + s + ")").getOrElse("")
        logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
        if (ExecutorState.isFinished(state)) {
          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
        }

      case MasterChanged(masterRef, masterWebUiUrl) =>
        logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
        master = Some(masterRef)
        alreadyDisconnected = false
        masterRef.send(MasterChangeAcknowledged(appId))
}

Master狀態改變處理機制原理

Driver狀態改變(DriverStateChanged)

  1. 如果driver的狀態是錯誤的、完成的、被殺掉、失敗,那麼就移除driver
  2. 首先將driver加入到已經完成的driver中,然後將driver的狀態設定為final,最後從driver所對應的worker中移除driver,釋放資源

第一步:Driver狀態改變
原始碼分析:

    /**
     * Driver狀態改變
     */
    case DriverStateChanged(driverId, state, exception) => {
      state match {
        //如果Driver的狀態是錯誤,完成,被殺掉,失敗
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          //移除Driver
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
    }

第二步:點選第一步中removeDriver

/**
 1. 移除driver
*/
 private def removeDriver(
    driverId: String,
    finalState: DriverState,
    exception: Option[Exception]) {
    //scala的find高階函式,找到driverId對應的 driver
    drivers.find(d => d.id == driverId) match {
      //如果找到了,Some,樣例類(Option)
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //記憶體快取移除(HashSet)
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 將driver加入到已經完成的driver中
        completedDrivers += driver
        // 將driver從持久化引擎中移除掉
        persistenceEngine.removeDriver(driver)
        // 將driver的狀態設定為final
        driver.state = finalState
        driver.exception = exception
        // 將driver所在的worker中移除掉driver
        driver.worker.foreach(w => w.removeDriver(driver))
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

Executor狀態改變(ExecutorStateChanged)

  1. 找到executor對應的applicaiton,然後再反過來通過applicaiton內部的executors快取獲得executor資訊
  2. 設定executor的當前狀態為LAUNCHING狀態,並向driver同步傳送ExecutorUpdated訊息
  3. 如果executor的狀態已經發生變化,從application中移除executor,從執行中executor對應的worker中移除executor
  4. 判斷如果executor的狀態是異常的,進行applicatino重試操作,如果重試了10次,都是失敗,那麼就認為排程失敗,移除application。如果在重試的10次之內恢復正常了,就進行重新排程

原始碼分析:

/**
  * Executor狀態發生改變
  */
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      // 找到executor對應的App,然後再反過來通過App內部的executors快取獲得executor資訊
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        // 如果有值
        case Some(exec) => {
          // 設定executor的當前狀態為LAUNCHING狀態
          val appInfo = idToApp(appId)
          exec.state = state
          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }

          // 向driver同步傳送ExecutorUpdated訊息
          exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
          // 如果executor的狀態已經發生變化
          if (ExecutorState.isFinished(state)) {
            // Remove this executor from the worker and app
            logInfo(s"Removing executor ${exec.fullId} because it is $state")
            // If an application has already finished, preserve its
            // state to display its information properly on the UI
            if (!appInfo.isFinished) {
              // 從app快取中移除executor
              appInfo.removeExecutor(exec)
            }
            // 從執行executor的worker的快取中移除executor
            exec.worker.removeExecutor(exec)
            // 判斷,如果executor的狀態是非正常的
            val normalExit = exitStatus == Some(0)
            // Only retry certain number of times so we don't go into an infinite loop.
            if (!normalExit) {
              // 判斷applcation當前的重試次數,是否達到了最大值,最大值為10次
              if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
                //重新進行排程
                schedule()
              } else {
                // 否則就進行removeApplication操作
                // 也就是說,executor反覆排程都是失敗,那麼就認為application失敗了
                val execs = appInfo.executors.values
                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
                    s"${appInfo.retryCount} times; removing it")
                  removeApplication(appInfo, ApplicationState.FAILED)
                }
              }
            }
          }
        }
        case None =>
          logWarning(s"Got status update for unknown executor $appId/$execId")
      }
}

第二步:點選第一步的removeExecutor

  /**
   * 將executor從內部的記憶體緩衝結構中移除
   */
  private[master] def removeExecutor(exec: ExecutorDesc) {
    if (executors.contains(exec.id)) {
      removedExecutors += executors(exec.id)
      executors -= exec.id
      coresGranted -= exec.cores
    }
  }

第三步:點選第一步的removeApplication

def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
    // 將資料從記憶體快取結果中移除
    if (apps.contains(app)) {
      logInfo("Removing app " + app.id)
      apps -= app
      idToApp -= app.id
      endpointToApp -= app.driver
      addressToApp -= app.driver.address
      if (completedApps.size >= RETAINED_APPLICATIONS) {
        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
        completedApps.take(toRemove).foreach(a => {
          appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
          applicationMetricsSystem.removeSource(a.appSource)
        })
        completedApps.trimStart(toRemove)
      }
      completedApps += app // Remember it in our history
      waitingApps -= app

      // If application events are logged, use them to rebuild the UI
      rebuildSparkUI(app)

      for (exec <- app.executors.values) {
        // 殺掉app對應的executor
        killExecutor(exec)
      }
      app.markFinished(state)
      if (state != ApplicationState.FINISHED) {
        //driver傳送訊息
        app.driver.send(ApplicationRemoved(state.toString))
      }
      // 從持久化引擎中移除application
      persistenceEngine.removeApplication(app)
      schedule()

      // Tell all workers that the application has finished, so they can clean up any app state.
      workers.foreach { w =>
        w.endpoint.send(ApplicationFinished(app.id))
      }
    }
 }

Master資源排程演算法

原理:

  1. 首先判斷master的狀態不是alive的話直接返回,也就是說,standby master是不會進行application等資源的排程的
  2. 取出workers中所有註冊上來上的worker,進行過濾,必須是狀態為alive的worker,呼叫rondom的shuffle方法進行隨機的打亂(從第三個worker進行shuffle)
  3. 遍歷活著的worker,啟動driver,將driver加入都記憶體緩衝結構中,並將driver從等待的driver的佇列中移除
  4. 在workers上啟動executors,使用是預設SpreadOutApps演算法

原始碼分析:
第一步:呼叫schedule()方法
原始碼位置:org.apache.spark.deploy.master.Master

  /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   * 
   * 資源排程演算法
   * 
   */
  private def schedule(): Unit = {
    //先判斷master的狀態不是alive的話直接返回
    // 也就是說,standby master是不會進行application等資源的排程的
    if (state != RecoveryState.ALIVE) { return }
    // Drivers take strict precedence over executors
    
    /**
     * Random.shuffle的原理是對傳入的集合的元素進行隨機的打亂
     * 意思就是從ArrayBuffer的最後一個元素開始到第三個元素,對於每個元素,都會取出該範圍內的隨機數
     * 比如說buf.length為10,然後next會取0到10的一個隨機數,然後就會把buf隨機的一個位置和該數字進行交換
     */
    val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
    
     /**
     * 這裡是對取出workers中所有註冊上來上的worker,進行過濾,必須是狀態為alive的worker
     */
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      
      /**
       * 遍歷等待的driver,只有yarn-cluster和standalone的cluster模式提交的時候,才會註冊driver,其他方式都是在
       * 本地啟動driver,而不是來註冊driver,更不可能讓master來排程driver
       */
      for (driver <- waitingDrivers) {
        //判斷當前的worker的空閒記憶體量大於等於driver需要的記憶體量,並且判斷worker的空閒cpu數大於等於driver需要的cpu數量 
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 啟動driver
          launchDriver(worker, driver)
          //將driver從等待的driver的佇列中刪除
          waitingDrivers -= driver
        }
      }
    }
    //在workers上啟動和排程executor
    startExecutorsOnWorkers()
  }

第二步:呼叫第一步的shuffle方法
原始碼位置:scala.util.Random

/** Returns a new collection of the same type in a randomly chosen order.
   *  對傳入的集合的元素進行隨機的打亂
   *  @return         the shuffled collection
   */
  def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
    //傳遞的值賦值給ArrayBuffer
    val buf = new ArrayBuffer[T] ++= xs
    //位置替換
    def swap(i1: Int, i2: Int) {
      val tmp = buf(i1)
      buf(i1) = buf(i2)
      buf(i2) = tmp
    }
   //從ArrayBuffer的最後一個元素開始到第三個元素,對於每個元素,都會取出該範圍內的隨機數
   //比如說buf.length為10,然後next會取0到10的一個隨機數,然後就會把buf隨機的一個位置和該數字進行交換
    for (n <- buf.length to 2 by -1) {
      val k = nextInt(n)
      swap(n - 1, k)
    }

    (bf(xs) ++= buf).result
  }

第三步:呼叫第一步的launchDriver方法

/**
  * 在某一個worker上啟動driver
  */
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // 將driver加入到worker內部的緩衝結構中
    // 將worker中使用的記憶體和cpu的數量,都加上driver需要的記憶體和cpu的數量
    worker.addDriver(driver)
    // 將worker加入到driver的記憶體緩衝結構中
    driver.worker = Some(worker)
    // 呼叫worker的actor,給worker傳送註冊driver的資訊,讓worker啟動driver
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    //將driver狀態設定成執行中
    driver.state = DriverState.RUNNING
 }

第四步:呼叫第一步的startExecutorsOnWorkers方法

  /**
   * Schedule and launch executors on workers
   *
   * Application的排程機制(核心之核心)
   * ①spreadOutApps 將app要使用的資源分平均分配到workers上
   * ②非spreadOutApps 將app儘可能多的分配到一個或幾個worker上,這樣其他的worker就不用分配了,儘可能資源大的分配
   */
  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    // 首先遍歷waitingApps中ApplicationInfo,並且還需要判斷程式中定義的使用cpu的數量-啟動執行application上
    // worker上的excutor所使用的的cpu的要大於0
    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      // 從workers中,過濾出worker的狀態為alive的,按照cpu的數量進行倒序排序
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
      
      // Now that we've decided how many cores to allocate on each worker, let's allocate them
      // 給每個worker分配完application要求的cpu core之後,遍歷worker,只要判斷之前給這個worker分配到了core
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        // 就在worker上啟動executor
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

在這個方法中會為每一個application啟動一個executor,首先需要從workers中篩選出那些滿足條件的worker用來啟動executor,需要滿足的條件如下:

  1. worker中的剩餘的空閒的記憶體需要大於application中指定的每一個executor所需要的記憶體
  2. worker中的剩餘的空閒的cpu的核數必須大於application中指定的每一個executor所需要的cpu的核數(如果啟動的時候沒有指定coresPerExecutor,那麼就認為這個值是1)

第五步:呼叫第四步的scheduleExecutorsOnWorkers方法

/**
   * Schedule executors to be launched on the workers.
   * Returns an array containing number of cores assigned to each worker.
   *
   * There are two modes of launching executors. The first attempts to spread out an application's
   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
   * on as few workers as possible). The former is usually better for data locality purposes and is
   * the default.
   *
   * The number of cores assigned to each executor is configurable. When this is explicitly set,
   * multiple executors from the same application may be launched on the same worker if the worker
   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
   * worker by default, in which case only one executor may be launched on each worker.
   *
   * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
   * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
   * allocated at a time, 12 cores from each worker would be assigned to each executor.
   * Since 12 < 16, no executors would launch [SPARK-8881].
   *  
   *  這裡有一個bug問題,叢集中有4個worker,每一個worker是16核,我們分配3個executor,要求每一個executor分配16核,總共分配48核
   *  實際情況下,我們每一個worker中分配了12核,12核小於16核,報錯[SPARK-8881].
   *  
   *  為Application分配要在Worker上啟動的Executor返回一個數組,包括每個worker分配到的core數
   * 
   */
  private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
    //每一個executor上的core的數量
    val coresPerExecutor = app.desc.coresPerExecutor
    //app中每個executor所需要的最小cpu核數,如果沒有預設最小核數為1
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    //如果沒有設定coresPerExecutor則代表對於當前的application只能在一個worker上啟動該app的一個executor
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    //  每個Executor的記憶體
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    // 獲取可用worker數量
    val numUsable = usableWorkers.length
    //構建一個可用worker長度的陣列,用於存放每個worker節點可用於分配的cpu核數(16,16,16,16)
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    //構建一個可用worker長度的陣列,用於存放每一個worker上新分配的executor數量(1,2,1,0)
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    //app實際會分配的CPU的數量,取app剩餘的還沒有被分配的CPU的數量和可以使用的workers的cpu數量總和之間的最小值
   //防止超過當前可用的cpu核數
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

    /** Return whether the specified worker can launch an executor for this app. */

    /**
     * 判斷指定的worker是否可以為這個app啟動一個executor
     * 1.主要判斷是否還有core沒有被分配
     * 2.判斷worker上的core的數量是否充足
     * 3.if 新建一個executor 判斷worker上的記憶體是否足夠,判斷executor的數量是否達到了上限
     * 4.不是 新建一個executor 那麼就只需要判斷1和2
     */
    def canLaunchExecutor(pos: Int): Boolean = {

      /**
      * 判斷當前需要分配的cpu核數是否大於或者等於每個executor所需要的cpu核數,
      * 比如總共只能分配8核,但是每個executor所需要的cpu核數是12,那麼就不能發起executor了,因為資源不夠用
      */
      val keepScheduling = coresToAssign >= minCoresPerExecutor

      /**
       * 判斷該worker上是否還有足夠的cores,worker上剩餘的cores減去已經被分配出去的cores
       * 
       * 比如現在worker剩餘核數16,然後又給application他分配了12核,即還剩4核可用
       * 但是啟動一個executor需要12核,那麼4 < 12 表示核心不足使用了
       */
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.

     /**
       * 如果我們允許每一個worker啟動多個executor,然後我們就可以啟動一個新的executor
       * 否則如果worker已經啟動一個新executor,只需要將更多的核心分配給該executor即可
       */
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      // 如果需要發起新的executor,既需要判斷cpu核數是否足夠,還需要判斷 executor是否超過限制總數以及記憶體是否足夠
      if (launchingNewExecutor) {
        //該worker上已經被分配的記憶體
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        //worker上剩餘的記憶體減去被分配的記憶體需要大於將要被分配的記憶體
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        //判斷 executor是否超過限制
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We're adding cores to an existing executor, so no need
        // to check memory and executor limits
        // 否則只是對已經存在的executor新增cpu核數,沒必要檢查記憶體和executor限制
        keepScheduling && enoughCores
      }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application's limits
    //canLaunchExecutor無非就是判斷worker上的cores和memory的數量是否滿足條件
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)

    //不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達到了這個Application的要求
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        //再次判斷是否可以啟動一個executor
        while (keepScheduling && canLaunchExecutor(pos)) {
          // 需要分配的核數減去每個executor所需要的最小核數
          coresToAssign -= minCoresPerExecutor
          // 對應的worker節點需要分配的cpu核數加上要啟動該executor所需要的最小CPU核數
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          /**
           * 假如我們只允許在一個worker上啟動一個executor,那麼設定該worker上被分配的executor的數量為1
           * 否則就是在原來的executors的數量上加上1
           */
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          /**
           * 如果我們採用的是spreadOutApps這個演算法,就意味著我們需要儘可能的將executor分配到足夠多的
           * worker上,此時就應該設定keepScheduling設定為false,結束在該executors上的分配
           *
           * 如果我們採用的不是spreadOutApps這個演算法,就意味著我們需要一直在這個worker上分配這個executor
           * 這個時候我們就需要設定keepScheduling為true,讓其一直迴圈,一直到該worker上的資源不滿足再繼續分配executor了
           */
          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      //因為最外層是一個while迴圈,所以這邊在過濾一遍,如果分配結束了,那麼canLaunchExecutor就會返回false
      //得到的freeworkers必然是一個空,最外層的迴圈就結束了
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    assignedCores
  }

關於這兩種資源排程的策略:

spreadOutApps其實就是在可用的Worker列表中,逐個遍歷,在每個worker上面平均的啟動相應的Executor, 比如,現在需要啟動10個Executor,但是第一次判斷有5個可用(有空閒的資源)的worker節點,那麼第一趟遍歷的時候,會在每個Worker節點上面都啟動一個Executor,然後會進行第二次判斷,因為可能在第一次分配之後某個節點就沒有空閒的資源了,所以需要再次的判斷,然後進行第二次的遍歷分配,這裡我們假設在第二趟遍歷的時候,這5個worker依然有足夠的空閒資源,那麼在第二趟的遍歷之後,每個Worker節點上面都為這個app啟動了2個Executor(平均分配)

非 spreadOutApps其實有兩重的含義,在之前的版本中,這種方式含義是,在第一判斷之後,在有可用的Worker節點上面選取空閒資源最多的呢個節點,儘可能的在這個節點上面啟動app所需的所有Executor,如果資源不夠的話,會進行在一次的判斷,選取空閒資源最多的Worker繼續啟動(不是太合理).現在是按照assignedExecutors變數,可以指定在可用的Worker節點上面分別啟動相應個數的Executor,(按需分配).

至於這兩種資源分配的方式是由 if (spreadOutApps) { keepScheduling = false } 這個判斷條件來判斷的

第六步:呼叫第四步的allocateWorkerResourceToExecutors方法

private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    //在這個worker上需要分配的cores
    assignedCores: Int,
    //每一個executor需要的cores
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    // 獲取該worker應該有多少個executor
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
   // 獲取每一個executor應該分配的核數,如果沒有指定則使用計算的應該分配的核數
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      // 向worker上新增executor,建立ExecutorDesc物件,更新application已經分配到的cpu核數
      val exec = app.addExecutor(worker, coresToAssign)
      // 啟動executor
      launchExecutor(worker, exec)
      // 更新application的狀態
      app.state = ApplicationState.RUNNING
    }
  }

第六步:呼叫第六步的launchExecutor方法

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
   // worker啟動executor,並且更新worker的cpu和記憶體資訊
    worker.addExecutor(exec)
    worker.endpoint.send(LaunchExecutor(
      masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    // 向application傳送ExecutorAdded訊息
    exec.application.driver.send(ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }