大資料:Spark Standalone 叢集排程(二)如何建立、分配Executors的資源
Standalone 的整體架構
在Spark叢集中的3個角色Client, Master, Worker, 下面的圖是Client Submit 一個任務的流程圖:
完整的流程:Driver 提交任務給Master, 由Master節點根據任務的引數對進行Worker的Executor的分配,Worker節點獲取到具體的分配資訊啟動executor 的子程序
Master分配Executor的策略
Master 接收到從Client傳送的RegiterApplication 的訊息後,開始進行worker資源的分配和排程
1. 尋找有效的Worker
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse
在worker列表中,尋找有效的worker
A. 剩餘記憶體大於單個Executor需要的記憶體
B. 剩餘的核心數大於單個Executor所需的核心數
在Worker的分配中剩餘的核心最多的(最空閒)的Worker,優先分配Executor
2. 分配Executor
Executor 和核數的關係? 可以簡單的理解為程序和執行緒的關係,所以在分配一個新Executor的時候不僅要考慮核數同時還需要考慮記憶體是否足夠。
幾個控制引數
a. 每個executor的核數
當沒有設定executor的核數
- 預設認為每個executor的核數是1
- 一個Worker上只能分配一個Executor(在這種情況下,一個Executor可以啟動多個cores直到Worker的最大能分配的核數
b. 每個Executor的需要的記憶體數
d. Application 所需要的核心數(total-executor-cores)
每個執行的Application所設定的最大核心數,如果沒有設定,取預設的核心數
如何判斷能在Worker上分配Executor?
def canLaunchExecutor(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 if (launchingNewExecutor) { val assignedMemory = assignedExecutors(pos) * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { // We're adding cores to an existing executor, so no need // to check memory and executor limits keepScheduling && enoughCores } }
- Worker 上剩餘的核數大於一個Executor的核數
- Worker 上如果允許建立新的Executor,需要檢查Worker上的記憶體是否足夠Executor,和建立的Executor的總數否超過App對Executor的大小限制
EX: 設定
executor-cores=5
但如果Worker裡剩餘的core數只有4,這時候這個Executor 是無法在這個Worker上分配成功的如何在Worker上均衡分配Executor
在Spark上通過輪訓的在所有有效的Worker列表(在前面1裡已經談過如何建立空閒的worker列表)裡建立Executor,每次輪訓的在每個Worker上分配一個executor的核數(一個executor),直到分配完這個應用所需要的所有核數。
Master.scala
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
。。。。。。。。。
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
assignedCores是每個workers的被分配的核數的列表,為何不是分配的Executor數目呢?
還記得前面的引數每個Executor的核數的配置麼?如果沒有配置,就是預設為每個Worker只起一個Executor, 如果此時返回的是Executor的數目列表的話,在這種情況下只能返回{1,1...}的集合,根本無法知道每個Worker的分配的核數。
但反過來卻很容易知道每個Work要建立的Executor的數目,只要 cores.sum/coresPerExecutor 就可以了
3. Worker上申請資源
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
基於2部分的Executor的分配原則,生成Executor的ID號,向Worker輪訓的傳送每個Executor的LaunchExecutor訊息,同時也彙報給Driver ExecutedAdded的訊息 private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
Driver對ExecutedAdded訊息的處理
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
在listener處理的函式裡,只是簡單的記錄了日誌
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
4. 資源申請管理
雖然我們都在談論Executor,但實際上核心數才是關鍵,而Worker的資源也是由core和記憶體來決定是否能夠在上申請成功,如果Worker上的空閒核心數不夠申請一個Executor的核心數時候,這個Worker會被忽略。在Master上會有一個數組統計依然存活的Application
private val waitingApps = new ArrayBuffer[ApplicationInfo]
在啟動ExecutorsOnWorkers函式裡
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
.....
}
}
沒有執行完的application都會被加入到等待佇列裡,直到application執行結束,才會從佇列中被移除。
如果沒有完全分配完core的application(比如application設定了總共需要的cores,但實際上資源不夠只分配了一部分Cores),都會繼續再次優先被分配資源,因為在waitingApps的佇列的前面,後續的Application資源分配遵循FIFO的策略,等待前面的Application分配資源結束,才能獲取到分配資源的權利。
注意:
這裡並不是代表沒分配完Core的Application就不開始運行了,Application的最小單位是Executor, 在前面的程式碼裡也看到在分配的時候,只要Worker能被分配出Executor,就會對Worker傳送LaunchExecutor 訊息,並不需要等完整的分配完下面的引數
total-executor-cores=10
在Spark理念中當資源不足的時候,先分配給Application一部分的Executor,讓任務運行了在說,後續如果有Worker資源被釋放,繼續對該Application從worker中申請Executor,直到資源申請完,或者該Application執行完,而所有的Worker的狀態、資源的狀況,均儲存在Master裡,由Master來全域性排程分配。
設定過大的Total-Executor-Cores會帶來很大的風險
Master會不停的持續的分配Worker資源直到最大的Core的數目為止
後續:
Worker收到LaunchExecutor的訊息後,會啟動Executor的子程序,Executor會發訊息RegisterExecutor給Application,通知Application所分配的Executor啟動了
Executor和Core?
在Worker啟動Executor子程序的時候,並沒有啟動對應的Core數目的執行緒
private def fetchAndRunExecutor() {
}
對Worker來說, Core是控制worker的核心數的,worker並不關心executor, 只關心自己的所剩的核心數,Spark這種CPU密集型的運算框架,Worker是運算的子節點,關心core的數目才是核心
那麼Core數對Executor程序的意義何在?
在Spark中在Application的任務中,同時還有task,task的任務資料的拆分是基於引數和Executor彙報的Core數量,而Task的數量會決定最後在Executor中啟動的執行Task的執行緒數,關於Task後續會講到。