1. 程式人生 > >Spark原始碼分析之Master資源排程演算法原理

Spark原始碼分析之Master資源排程演算法原理

Master是通過schedule方法進行資源排程,告知worker啟動executor等。

一schedule方法

1判斷master狀態,只有alive狀態的master才可以進行資源排程,standby是不能夠排程的

2將可用的worker節點打亂,這樣有利於driver的均衡

3進行driver資源排程,遍歷處於等待狀態的driver佇列,發起driver

4在worker上開啟executor程序

private def schedule(): Unit = {
  // 只有alive狀態的master才可以進行資源排程,standby是不能夠排程的if (state != RecoveryState

.ALIVE) {
    return
 
}
  // 將可用的worker節點打亂,這樣有利於driver的均衡val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  val numWorkersAlive = shuffledAliveWorkers.size
 
var curPos = 0
 
// 進行driver資源排程,遍歷處於等待狀態的driver佇列for (driver <- waitingDrivers.toList) {
    var
launched = false
    var
numWorkersVisited= 0
   
while (numWorkersVisited < numWorkersAlive&& !launched) {
      // 獲取worker
     
val worker = shuffledAliveWorkers(curPos)
      // 記錄worker訪問數遞增numWorkersVisited+= 1
     
// 判斷worker的可使用記憶體是否大於driver所需要的記憶體以及worker可使用cpu核數是否大於driver所需要的cpu核數
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        // 滿足條件發起driver
       
launchDriver(worker, driver)
        // 將當前driver從等待佇列中移除waitingDrivers-= driver
        // 標記該driver發起狀態為true
       
launched = true
     
}
      // 將指標指向下一個worker,當然如果driver已經發起了,則為下一個準備發起下一個處於等待的driver
     
curPos = (curPos + 1) % numWorkersAlive
   
}
  }
  // worker上開啟executor程序startExecutorsOnWorkers()
}

二startExecutorsOnWorkers  在worker上開啟executor程序

# 遍歷處於等待狀態的application,且處於等待的狀態的application的所需要的cpu核數大於0

# 得到每一個executor所需要的核數

# 過濾出有效的可用worker,再從worker中過濾出worker剩餘記憶體和CPU核數 不小於app對應executor所需要的記憶體和CPU核數,按照剩餘的CPU核數反向排序worker

# 在可用的worker上排程executor,啟動executor有兩種演算法模式:

一:將應用程式儘可能多的分配到不同的worker上

二:和第一種相反,分配到儘可能少的worker上,通常用於計算密集型

每一個executor所需要的核數是可以配置的,一般來講如果worker有足夠的記憶體和CPU核數,同一個應用程式就可以在該worker啟動多個executors;否則就不能再啟動新的executor了,則需要到其他worker上去分配executor了

# 在可用的worker上分配資源給executor

private def startExecutorsOnWorkers(): Unit = {
  // 遍歷處於等待狀態的application,且處於等待的狀態的application的所需要的cpu核數大於0
  // coresLeft=app請求的核數-已經分配給executor的核數的和for (app <- waitingApps if app.coresLeft > 0) {
    // 每一個executor所需要的核數val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
    // 過濾出有效的可用worker
    // 再從worker中過濾出worker剩餘記憶體和CPU核數不小於app對應executor所需要的記憶體和CPU核數
    // 按照剩餘的CPU核數反向排序woker
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse
    // 在可用的worker上排程executor,啟動executor有兩種演算法模式:
    // 一:將應用程式儘可能多的分配到不同的worker
    // 二:和第一種相反,分配到儘可能少的worker上,通常用於計算密集型;
    // 每一個executor所需要的核數是可以配置的,一般來講如果worker有足夠的記憶體和CPU核數,同一個應用程式就可以
    // 在該worker啟動多個executors;否則就不能再啟動新的executor了,則需要到其他worker上去分配executorval assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

    // 在可用的worker上分配資源給executor
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

三scheduleExecutorsOnWorkers在每一個worker上排程資源

判斷該worker能不能分配一個或者多個executor,能則分配相對應的executor所需要的CPU核數

private def scheduleExecutorsOnWorkers(app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = {
  // 如果我們指定executor需要分配的核數,coresPerExecutor表示executor所需要的cpu核數val coresPerExecutor = app.desc.coresPerExecutor
  //  app中每個executor所需要的最小cpu核數,如果沒有預設最小核數為1
  val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
  // 如果我們沒有指定executor需要分配的核數,則一個worker上只能啟動一個executor
  val oneExecutorPerWorker = coresPerExecutor.isEmpty
  // 每一個executor所需要的記憶體val memoryPerExecutor = app.desc.memoryPerExecutorMB
  // 獲取可用worker數量val numUsable = usableWorkers.length
  // 構建一個可用worker長度的陣列,用於存放每個worker節點分配到的cpu核數(16,16,16,16)
  val assignedCores = new Array[Int](numUsable)
  // 構建一個可用worker長度的陣列,用於存放每一個worker上新分配的executor數量(1,2,1,0)
  val assignedExecutors = new Array[Int](numUsable)
  // 針對當前應用程式,還需要分配的cpu核數,它應該是application還需要的cpu核數和worker總共剩餘核數之和中最小的
  // 防止超過當前可用的cpu核數var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

  // 判斷我們是否可以為這個application在指定的worker上發起一個executor
  def canLaunchExecutor(pos: Int): Boolean = {
    // 判斷當前需要分配的cpu核數是否大於或者等於每個executor所需要的cpu核數,比如總共只能分配8核,但是
    // 每個executor所需要的cpu核數是12,那麼就不能發起executor了,因為資源不夠用val keepScheduling = coresToAssign >= minCoresPerExecutor
    // 當前worker剩餘的核數 - 應用程式分配到該worker上的核數是否滿足發起一個executor,比如現在worker剩餘核數16
    // 然後又給application他分配了12核,即還剩4核可用,但是啟動一個executor需要12核,那麼4 < 12 表示核心不足使用了val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

    // 如果我們允許每一個worker啟動多個executor,然後我們可以啟動一個新的executor
    // 否則如果worker已經啟動一個新executor,只需要將更多的核心分配給該executor即可val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
    // 如果需要發起新的executor,既需要判斷cpu核數是否足夠,還需要判斷 executor是否超過限制總數以及否記憶體是否足夠if (launchingNewExecutor) {
      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
      val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
      keepScheduling && enoughCores && enoughMemory && underLimit
    } else {
      // 否則只是對已經存在的executor新增cpu核數,沒必要檢查記憶體和executor限制keepScheduling && enoughCores
    }
  }
  // 過濾出那些可用的worker節點var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
  while (freeWorkers.nonEmpty) {
    // 遍歷每一個空閒的worker
    freeWorkers.foreach { pos =>
      var keepScheduling = true
      // 檢測當前worker是否能夠發起executor
      while (keepScheduling && canLaunchExecutor(pos)) {
        // 需要分配的核數減去每個executor所需要的最小核數coresToAssign -= minCoresPerExecutor
        // 對應的worker節點需要分配的cpu核數加上要啟動該executor所需要的最小CPU核數assignedCores(pos) += minCoresPerExecutor
        // 如果每一個worker只允許啟動一個executor,那麼該worker啟動的executor數量只能是1,否則應該加一個if (oneExecutorPerWorker) {
          assignedExecutors(pos) = 1
        } else {
          assignedExecutors(pos) += 1
        }

        // 如果需要將executor分配到更多的worker,那麼就不再從當前worker節點繼續分配,而是從下一個worker上繼續分配if (spreadOutApps) {
          keepScheduling = false
        }
      }
    }
    // 因為進行了一次分配,需要再次從可用的worker節點中過濾可用的worker節點freeWorkers = freeWorkers.filter(canLaunchExecutor)
  }
  assignedCores
}

四allocateWorkerResourceToExecutors在worker上分配具體的資源

# 獲取該worker應該有多少個executor

# 獲取每一個executor應該分配的核數,如果沒有指定則使用計算的應該分配的核數

# 向worker上新增executor,建立ExecutorDesc物件,更新application已經分配到的cpu核數

# 啟動executor

# 更新application的狀態

private def allocateWorkerResourceToExecutors(app: ApplicationInfo, assignedCores: Int,
    coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = {
  // 獲取該worker應該有多少個executor
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  // 獲取每一個executor應該分配的核數,如果沒有指定則使用計算的應該分配的核數val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {
    // worker上新增executor,建立ExecutorDesc物件,更新application已經分配到的cpu核數val exec = app.addExecutor(worker, coresToAssign)
    // 啟動executor
    launchExecutor(worker, exec)
    // 更新application的狀態app.state = ApplicationState.RUNNING
  }
}

五launchDriver 發起driver

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  // worker新增driver
  worker.addDriver(driver)
  driver.worker = Some(worker)
  // worker傳送LaunchDriver訊息worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  // 更新driver狀態為RUNNING
  driver.state = DriverState.RUNNING
}

六launchExecutor發起executor

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  // worker啟動executor,並且更新workercpu和記憶體資訊worker.addExecutor(exec)
  // worker傳送LaunchExecutor訊息worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  // application傳送ExecutorAdded訊息exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}