1. 程式人生 > >大資料:Spark Standalone 叢集排程(二)如何建立、分配Executors的資源

大資料:Spark Standalone 叢集排程(二)如何建立、分配Executors的資源

Standalone 的整體架構

在Spark叢集中的3個角色Client, Master, Worker, 下面的圖是Client Submit 一個任務的流程圖:


完整的流程:Driver 提交任務給Master, 由Master節點根據任務的引數對進行Worker的Executor的分配,Worker節點獲取到具體的分配資訊啟動executor 的子程序

Master分配Executor的策略

Master 接收到從Client傳送的RegiterApplication 的訊息後,開始進行worker資源的分配和排程

1. 尋找有效的Worker

val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse

在worker列表中,尋找有效的worker

A. 剩餘記憶體大於單個Executor需要的記憶體

B. 剩餘的核心數大於單個Executor所需的核心數

在Worker的分配中剩餘的核心最多的(最空閒)的Worker,優先分配Executor

2. 分配Executor

Executor 和核數的關係? 可以簡單的理解為程序和執行緒的關係,所以在分配一個新Executor的時候不僅要考慮核數同時還需要考慮記憶體是否足夠。

幾個控制引數

a. 每個executor的核數

當沒有設定executor的核數

  1. 預設認為每個executor的核數是1
  2. 一個Worker上只能分配一個Executor(在這種情況下,一個Executor可以啟動多個cores直到Worker的最大能分配的核數

b. 每個Executor的需要的記憶體數

d. Application 所需要的核心數(total-executor-cores)

每個執行的Application所設定的最大核心數,如果沒有設定,取預設的核心數

如何判斷能在Worker上分配Executor?

def canLaunchExecutor(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We're adding cores to an existing executor, so no need
        // to check memory and executor limits
        keepScheduling && enoughCores
      }
    }
  • Worker 上剩餘的核數大於一個Executor的核數
  • Worker 上如果允許建立新的Executor,需要檢查Worker上的記憶體是否足夠Executor,和建立的Executor的總數否超過App對Executor的大小限制

EX: 設定

executor-cores=5
但如果Worker裡剩餘的core數只有4,這時候這個Executor 是無法在這個Worker上分配成功的

如何在Worker上均衡分配Executor

在Spark上通過輪訓的在所有有效的Worker列表(在前面1裡已經談過如何建立空閒的worker列表)裡建立Executor,每次輪訓的在每個Worker上分配一個executor的核數(一個executor),直到分配完這個應用所需要的所有核數。

Master.scala

  private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
。。。。。。。。。

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application's limits
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    assignedCores
  }

assignedCores是每個workers的被分配的核數的列表,為何不是分配的Executor數目呢?

還記得前面的引數每個Executor的核數的配置麼?如果沒有配置,就是預設為每個Worker只起一個Executor, 如果此時返回的是Executor的數目列表的話,在這種情況下只能返回{1,1...}的集合,根本無法知道每個Worker的分配的核數。

但反過來卻很容易知道每個Work要建立的Executor的數目,只要 cores.sum/coresPerExecutor 就可以了

3. Worker上申請資源

  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val exec = app.addExecutor(worker, coresToAssign)
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }
基於2部分的Executor的分配原則,生成Executor的ID號,向Worker輪訓的傳送每個Executor的LaunchExecutor訊息,同時也彙報給Driver ExecutedAdded的訊息
  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }
Driver對ExecutedAdded訊息的處理
      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
        val fullId = appId + "/" + id
        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
          cores))
        listener.executorAdded(fullId, workerId, hostPort, cores, memory)

在listener處理的函式裡,只是簡單的記錄了日誌
  override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
    memory: Int) {
    logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
      fullId, hostPort, cores, Utils.megabytesToString(memory)))
  }

4. 資源申請管理

雖然我們都在談論Executor,但實際上核心數才是關鍵,而Worker的資源也是由core和記憶體來決定是否能夠在上申請成功,如果Worker上的空閒核心數不夠申請一個Executor的核心數時候,這個Worker會被忽略。

在Master上會有一個數組統計依然存活的Application

private val waitingApps = new ArrayBuffer[ApplicationInfo]

在啟動ExecutorsOnWorkers函式裡
private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps if app.coresLeft > 0) {
.....
}
}
沒有執行完的application都會被加入到等待佇列裡,直到application執行結束,才會從佇列中被移除。

如果沒有完全分配完core的application(比如application設定了總共需要的cores,但實際上資源不夠只分配了一部分Cores),都會繼續再次優先被分配資源,因為在waitingApps的佇列的前面,後續的Application資源分配遵循FIFO的策略,等待前面的Application分配資源結束,才能獲取到分配資源的權利。

注意:

這裡並不是代表沒分配完Core的Application就不開始運行了,Application的最小單位是Executor, 在前面的程式碼裡也看到在分配的時候,只要Worker能被分配出Executor,就會對Worker傳送LaunchExecutor 訊息,並不需要等完整的分配完下面的引數

total-executor-cores=10

在Spark理念中當資源不足的時候,先分配給Application一部分的Executor,讓任務運行了在說,後續如果有Worker資源被釋放,繼續對該Application從worker中申請Executor,直到資源申請完,或者該Application執行完,而所有的Worker的狀態、資源的狀況,均儲存在Master裡,由Master來全域性排程分配。

設定過大的Total-Executor-Cores會帶來很大的風險

Master會不停的持續的分配Worker資源直到最大的Core的數目為止

後續:

Worker收到LaunchExecutor的訊息後,會啟動Executor的子程序,Executor會發訊息RegisterExecutor給Application,通知Application所分配的Executor啟動了

Executor和Core?

在Worker啟動Executor子程序的時候,並沒有啟動對應的Core數目的執行緒

private def fetchAndRunExecutor() {
}
對Worker來說, Core是控制worker的核心數的,worker並不關心executor, 只關心自己的所剩的核心數,Spark這種CPU密集型的運算框架,Worker是運算的子節點,關心core的數目才是核心

那麼Core數對Executor程序的意義何在?

在Spark中在Application的任務中,同時還有task,task的任務資料的拆分是基於引數和Executor彙報的Core數量,而Task的數量會決定最後在Executor中啟動的執行Task的執行緒數,關於Task後續會講到。