Spark原始碼分析之Master資源排程演算法原理
Master是通過schedule方法進行資源排程,告知worker啟動executor等。
一schedule方法
1判斷master狀態,只有alive狀態的master才可以進行資源排程,standby是不能夠排程的
2將可用的worker節點打亂,這樣有利於driver的均衡
3進行driver資源排程,遍歷處於等待狀態的driver佇列,發起driver
4在worker上開啟executor程序
private def
schedule(): Unit = {
// 只有alive狀態的master才可以進行資源排程,standby是不能夠排程的if (state
!= RecoveryState
return
}
// 將可用的worker節點打亂,這樣有利於driver的均衡val shuffledAliveWorkers
= Random.shuffle(workers.toSeq.filter(_.state
== WorkerState.ALIVE))
val numWorkersAlive
= shuffledAliveWorkers.size
var curPos
= 0
// 進行driver資源排程,遍歷處於等待狀態的driver佇列for (driver <- waitingDrivers.toList) {
var
var numWorkersVisited=
0
while (numWorkersVisited
< numWorkersAlive&& !launched) {
// 獲取worker
val worker
= shuffledAliveWorkers(curPos)
// 記錄worker訪問數遞增numWorkersVisited+=
1
// 判斷worker的可使用記憶體是否大於driver所需要的記憶體以及worker可使用cpu核數是否大於driver所需要的cpu核數
// 滿足條件發起driver
launchDriver(worker, driver)
// 將當前driver從等待佇列中移除waitingDrivers-=
driver
// 標記該driver發起狀態為true
launched =
true
}
// 將指標指向下一個worker,當然如果driver已經發起了,則為下一個準備發起下一個處於等待的driver
curPos = (curPos
+ 1) % numWorkersAlive
}
}
// 在worker上開啟executor程序startExecutorsOnWorkers()
}
二startExecutorsOnWorkers 在worker上開啟executor程序
# 遍歷處於等待狀態的application,且處於等待的狀態的application的所需要的cpu核數大於0
# 得到每一個executor所需要的核數
# 過濾出有效的可用worker,再從worker中過濾出worker剩餘記憶體和CPU核數 不小於app對應executor所需要的記憶體和CPU核數,按照剩餘的CPU核數反向排序worker
# 在可用的worker上排程executor,啟動executor有兩種演算法模式:
一:將應用程式儘可能多的分配到不同的worker上
二:和第一種相反,分配到儘可能少的worker上,通常用於計算密集型
每一個executor所需要的核數是可以配置的,一般來講如果worker有足夠的記憶體和CPU核數,同一個應用程式就可以在該worker啟動多個executors;否則就不能再啟動新的executor了,則需要到其他worker上去分配executor了
# 在可用的worker上分配資源給executor
private def startExecutorsOnWorkers(): Unit = {
// 遍歷處於等待狀態的application,且處於等待的狀態的application的所需要的cpu核數大於0
// coresLeft=app請求的核數-已經分配給executor的核數的和for (app <- waitingApps if app.coresLeft > 0) {
// 每一個executor所需要的核數val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// 過濾出有效的可用worker
// 再從worker中過濾出worker剩餘記憶體和CPU核數不小於app對應executor所需要的記憶體和CPU核數
// 按照剩餘的CPU核數反向排序woker
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
// 在可用的worker上排程executor,啟動executor有兩種演算法模式:
// 一:將應用程式儘可能多的分配到不同的worker上
// 二:和第一種相反,分配到儘可能少的worker上,通常用於計算密集型;
// 每一個executor所需要的核數是可以配置的,一般來講如果worker有足夠的記憶體和CPU核數,同一個應用程式就可以
// 在該worker啟動多個executors;否則就不能再啟動新的executor了,則需要到其他worker上去分配executor了val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// 在可用的worker上分配資源給executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
三scheduleExecutorsOnWorkers在每一個worker上排程資源
判斷該worker能不能分配一個或者多個executor,能則分配相對應的executor所需要的CPU核數
private def scheduleExecutorsOnWorkers(app: ApplicationInfo,
usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = {
// 如果我們指定executor需要分配的核數,coresPerExecutor表示executor所需要的cpu核數val coresPerExecutor = app.desc.coresPerExecutor
// app中每個executor所需要的最小cpu核數,如果沒有預設最小核數為1
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
// 如果我們沒有指定executor需要分配的核數,則一個worker上只能啟動一個executor
val oneExecutorPerWorker = coresPerExecutor.isEmpty
// 每一個executor所需要的記憶體val memoryPerExecutor = app.desc.memoryPerExecutorMB
// 獲取可用worker數量val numUsable = usableWorkers.length
// 構建一個可用worker長度的陣列,用於存放每個worker節點分配到的cpu核數(16,16,16,16)
val assignedCores = new Array[Int](numUsable)
// 構建一個可用worker長度的陣列,用於存放每一個worker上新分配的executor數量(1,2,1,0)
val assignedExecutors = new Array[Int](numUsable)
// 針對當前應用程式,還需要分配的cpu核數,它應該是application還需要的cpu核數和worker總共剩餘核數之和中最小的
// 防止超過當前可用的cpu核數var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
// 判斷我們是否可以為這個application在指定的worker上發起一個executor
def canLaunchExecutor(pos: Int): Boolean = {
// 判斷當前需要分配的cpu核數是否大於或者等於每個executor所需要的cpu核數,比如總共只能分配8核,但是
// 每個executor所需要的cpu核數是12,那麼就不能發起executor了,因為資源不夠用val keepScheduling = coresToAssign >= minCoresPerExecutor
// 當前worker剩餘的核數 - 應用程式分配到該worker上的核數是否滿足發起一個executor,比如現在worker剩餘核數16
// 然後又給application他分配了12核,即還剩4核可用,但是啟動一個executor需要12核,那麼4 < 12 表示核心不足使用了val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// 如果我們允許每一個worker啟動多個executor,然後我們可以啟動一個新的executor
// 否則如果worker已經啟動一個新executor,只需要將更多的核心分配給該executor即可val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
// 如果需要發起新的executor,既需要判斷cpu核數是否足夠,還需要判斷 executor是否超過限制總數以及否記憶體是否足夠if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// 否則只是對已經存在的executor新增cpu核數,沒必要檢查記憶體和executor限制keepScheduling && enoughCores
}
}
// 過濾出那些可用的worker節點var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
// 遍歷每一個空閒的worker
freeWorkers.foreach { pos =>
var keepScheduling = true
// 檢測當前worker是否能夠發起executor
while (keepScheduling && canLaunchExecutor(pos)) {
// 需要分配的核數減去每個executor所需要的最小核數coresToAssign -= minCoresPerExecutor
// 對應的worker節點需要分配的cpu核數加上要啟動該executor所需要的最小CPU核數assignedCores(pos) += minCoresPerExecutor
// 如果每一個worker只允許啟動一個executor,那麼該worker啟動的executor數量只能是1,否則應該加一個if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// 如果需要將executor分配到更多的worker,那麼就不再從當前worker節點繼續分配,而是從下一個worker上繼續分配if (spreadOutApps) {
keepScheduling = false
}
}
}
// 因為進行了一次分配,需要再次從可用的worker節點中過濾可用的worker節點freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
四allocateWorkerResourceToExecutors在worker上分配具體的資源
# 獲取該worker應該有多少個executor
# 獲取每一個executor應該分配的核數,如果沒有指定則使用計算的應該分配的核數
# 向worker上新增executor,建立ExecutorDesc物件,更新application已經分配到的cpu核數
# 啟動executor
# 更新application的狀態
private def allocateWorkerResourceToExecutors(app: ApplicationInfo, assignedCores: Int,
coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = {
// 獲取該worker應該有多少個executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
// 獲取每一個executor應該分配的核數,如果沒有指定則使用計算的應該分配的核數val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
// 向worker上新增executor,建立ExecutorDesc物件,更新application已經分配到的cpu核數val exec = app.addExecutor(worker, coresToAssign)
// 啟動executor
launchExecutor(worker, exec)
// 更新application的狀態app.state = ApplicationState.RUNNING
}
}
五launchDriver 發起driver
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// worker新增driver
worker.addDriver(driver)
driver.worker = Some(worker)
// 向worker傳送LaunchDriver訊息worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
// 更新driver狀態為RUNNING
driver.state = DriverState.RUNNING
}
六launchExecutor發起executor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
// worker啟動executor,並且更新worker的cpu和記憶體資訊worker.addExecutor(exec)
// 向worker傳送LaunchExecutor訊息worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 向application傳送ExecutorAdded訊息exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}