Spark:Master原理剖析與原始碼分析
Master主備切換
Spark原生的standalone模式是支援主備切換的,也就是說master可以配置兩個,當Action Master因故障掛了的時候,系統會自動將Standby Master 切換成 Active Master。
Master的準備切換分為兩種:
- 一種是基於檔案系統的,spark提供目錄儲存spark Application和worker的註冊資訊,並將他們的恢復狀態寫入該目錄,當spark的master節點宕掉的時候,重啟master,就能獲取application和worker的註冊資訊。需要手動進行切換
- 一種是基於zookeeper的,用於生產模式。其基本原理是通過zookeeper來選舉一個Master,其他的Master處於Standby狀態。將Standalone叢集連線到同一個ZooKeeper例項並啟動多個Master,利用zookeeper提供的選舉和狀態儲存功能,可以使一個Master被選舉,而其他Master處於Standby狀態。如果現任Master死去,另一個Master會通過選舉產生,並恢復到舊的Master狀態,然後恢復排程。整個恢復過程可能要1-2分鐘。
流程:
- 在active Master宕掉之後,內部持久化(FileSystemPersistenceEngine和ZookeeperPersistenceEngine)引擎首先會讀取持久化的storedApps、storedDrivers、storedWorkers
- 判斷,如果storedApps、storedDrivers、storedWorkers有任何一個是有內容的
- 將持久化的Application、Driver、Worker資訊重新註冊,註冊到Master內部的記憶體快取結構中
- 將Application和Worker的狀態都修改為UNKNOWN,然後向Application所對應的Driver和Worker傳送StandBy Master的地址
- 如果Driver和Wroker是正常運轉的情況下,接收到Master傳送過來的地址後,就會返回響應訊息到新的Master
- Master陸續接收到Driver和Worker傳送過來的訊息後,會使用completeRecovery()方法對沒有傳送響應訊息的Driver和Worker進行處理,過濾掉他們的資訊。
- 呼叫Master的schedule()方法,對正在排程的Driver和Application進行排程。在worker上啟動driver,或者是為Applicaiton在worker上啟動executor
原始碼分析:
原始碼:org/apache/spark/deploy/master/Master.scala
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader => {
//讀取持久化的storedApps、storedDrivers、storedWorkers
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
//判斷,如果storedApps、storedDrivers、storedWorkers有任何一個是有內容的
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
//開始恢復master
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
}
//完成master的主備切換
case CompleteRecovery => completeRecovery()
......
}
第一步:開始恢復master
/**
* 開始恢復master
*/
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
// 註冊application
registerApplication(app)
app.state = ApplicationState.UNKNOWN
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver
}
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
// 註冊worker
registerWorker(worker)
worker.state = WorkerState.UNKNOWN
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
第二步:點選第一步中registerApplication
private def registerApplication(app: ApplicationInfo): Unit = {
//拿到driver的地址
val appAddress = app.driver.address
//如果driver的地址存在的情況下,就直接返回,就相當於對driver進行重複註冊
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
//將Application的資訊加入到記憶體快取中
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
//將Application的資訊加入到等待排程的佇列中,排程的演算法為FIFO
waitingApps += app
}
第三步:點選第一步中registerWorker
/**
* 註冊worker
*
* 遍歷所有管理的Worker,若有與新註冊的Worker相同的host,port且處於Dead(超時)狀態的Worker則直接從workers中移除。
* 若管理的addressToWorker已經存在新註冊的Worker一樣的workerAddress,則獲取老Worker,若狀態是UNKNOWN說明Master 處於recovery,
* Worker正處於恢復中,則將老Worker移除,將新Worker直接加入併成功返回,若老Worker是其他狀態則說明已經重複註冊了,返回失敗。
*
*/
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
//過濾掉狀態為DEAD的worker
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
// 遍歷workers HashSet,移除該worker
workers -= w
}
//worker地址
val workerAddress = worker.endpoint.address
//如果已存在,該worker為oldWorker
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
// 如果oldWorker的狀態為UNKOWN,則移除該oldWorker
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
//一個從UNKOWN狀態中註冊的worker,該worker在恢復過程被重啟
// old worker必須被殺死,移除old worker,接收新的worker
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
// 將新的worker新增到HashSet中去
workers += worker
// 將worker id,address資訊新增到對應的HashMap中
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
// 返回True,worker註冊成功
true
}
第四步:完成master的主備切換
/**
* 完成master的主備切換,也就是完成master的主備切換
*/
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
//將Applicaiton和Worker都過濾出來,目前狀況還是UNKNOWN的
//然後遍歷,分別呼叫removeWorker和finishApplication方法,對可能已經出故障,或者已經死掉的Application和Worker進行清理
/**
* 清理三點:
* 1、從記憶體快取結構中移除
* 2、從相關元件的記憶體快取中移除(比如說worker所在的driver也要移除)
* 3、從持久化儲存中移除
*/
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
// 重新啟動driver,對於sparkstreaming程式而言
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
//資源排程演算法
schedule()
logInfo("Recovery complete - resuming operations!")
}
第五步:點選第四步中的removeWorker
/**
* 移除worker
*/
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
//將記憶體快取結構中移除(HashMap)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
// 向driver傳送exeutor丟失了
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
// 將worker上的所有executor給清楚掉
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
//spark自動監視,driver所在的worker掛掉的時候,也會把這個driver移除掉,如果配置supervise這個屬性的時候,driver也掛掉的時候master會重新啟動driver
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
// 持久化引擎會移除worker
persistenceEngine.removeWorker(worker)
}
第六步:點選finishApplication
private def finishApplication(app: ApplicationInfo) {
removeApplication(app, ApplicationState.FINISHED)
}
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
// 將資料從記憶體快取結果中移除
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach(a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
// If application events are logged, use them to rebuild the UI
rebuildSparkUI(app)
for (exec <- app.executors.values) {
// 殺掉app對應的executor
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
app.driver.send(ApplicationRemoved(state.toString))
}
// 從持久化引擎中移除application
persistenceEngine.removeApplication(app)
schedule()
// Tell all workers that the application has finished, so they can clean up any app state.
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}
第七步:點選relaunchDriver
/**
* 重新發布Driver
*/
private def relaunchDriver(driver: DriverInfo) {
driver.worker = None
// 將driver的狀態設定為relaunching
driver.state = DriverState.RELAUNCHING
// 將driver加入到等待的隊列當中
waitingDrivers += driver
schedule()
}
第八步:點選removeDriver
/**
* 移除driver
*/
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//記憶體快取移除(HashSet)
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 將driver加入到已經完成的driver中
completedDrivers += driver
// 將driver從持久化引擎中移除掉
persistenceEngine.removeDriver(driver)
// 將driver的狀態設定為final
driver.state = finalState
driver.exception = exception
// 將driver所在的worker中移除掉driver
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
Master註冊機制
worker向master註冊:
- worker在啟動之後,就會向master進行註冊
- 對於worker狀態為dead的,過濾掉。比如說master在一定時間期限已經完成了恢復,但是發現其中的worker為unknown的情況下,對worker進行remove,將worker的狀態設定為dead,如果過了很長時間worker又莫名其妙的向master進行註冊的情況下,直接過濾掉;對於worker狀態為unknown,master會將舊的worker資訊給清理掉,替換成新的worker資訊。比如說master剛啟動的時候,會向worker傳送新的地址的時候,master會將該worker狀態設定為unknown,worker向master返回註冊資訊的時候,master會將舊的worker資訊給清理掉,替換成新的worker資訊
- 將worker加入記憶體快取中(HashMap)
- 用持久化引擎將worker資訊持久化,可能是檔案系統,可能是zookeeper
- 呼叫schedule()方法進行排程
原始碼分析:
第一步:處理worker的註冊請求
原始碼位置:org.apache.spark.deploy.master.Master
/**
* 處理worker的註冊請求
* 注:
* 這樣設計Worker註冊機制有一個很大的好處,在生產環境下,想要把新的Worker 加入到已經執行的Spark 叢集上,
* 不需要重新啟動Spark 叢集,就能夠使用新加入的Worker 以提升處理效能
*
*/
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
//判斷Master的狀態是否為StandBy
if (state == RecoveryState.STANDBY) {
// ignore, don't send response 忽略,不傳送響應
} else if (idToWorker.contains(id)) {
// 如果WorkerId存在,則回覆Worker註冊失敗"重複的worker id"
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
// 將worker的id,host,埠,cpu數,記憶體等資訊封裝成一個WorkerInfo
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
// 判斷註冊Worker註冊是否成功
if (registerWorker(worker)) {
// 持久化引擎新增worker的資訊
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl))
// 資源排程演算法
schedule()
} else {
//worker註冊失敗
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
第二步:點選第一步的registerWorke
/**
* 註冊worker
*/
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
//過濾掉狀態為DEAD的worker
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
// 遍歷workers HashSet,移除該worker
workers -= w
}
//worker地址
val workerAddress = worker.endpoint.address
//如果已存在,該worker為oldWorker
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
// 如果oldWorker的狀態為UNKOWN,則移除該oldWorker
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
//一個從UNKOWN狀態中註冊的worker,該worker在恢復過程被重啟
// old worker必須被殺死,移除old worker,接收新的worker
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
// 將新的worker新增到HashSet中去
workers += worker
// 將worker id,address資訊新增到對應的HashMap中
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
// 返回True,worker註冊成功
true
}
第三步:點選第二步中的removeWorker
/**
* 移除worker
*/
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
//將記憶體快取結構中移除(HashMap)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
// 向driver傳送exeutor丟失了
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
// 將worker上的所有executor給清楚掉
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
//spark自動監視,driver所在的worker掛掉的時候,也會把這個driver移除掉,如果配置supervise這個屬性的時候,driver也掛掉的時候master會重新啟動driver
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
// 持久化引擎會移除worker
persistenceEngine.removeWorker(worker)
}
第四步:點選第三步中的removeExecutor
原始碼位置:org.apache.spark.deploy.master.ApplicationInfo
/**
* 將executor從內部的記憶體緩衝結構中移除
*/
private[master] def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
coresGranted -= exec.cores
}
}
第五步:點選第三步中的relaunchDriver
/**
* 重新發布Driver
*/
private def relaunchDriver(driver: DriverInfo) {
driver.worker = None
// 將driver的狀態設定為relaunching
driver.state = DriverState.RELAUNCHING
// 將driver加入到等待的隊列當中
waitingDrivers += driver
schedule()
}
第六步:點選第三步中的removeDriver
/**
* 移除driver
*/
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//記憶體快取移除(HashSet)
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 將driver加入到已經完成的driver中
completedDrivers += driver
// 將driver從持久化引擎中移除掉
persistenceEngine.removeDriver(driver)
// 將driver的狀態設定為final
driver.state = finalState
driver.exception = exception
// 將driver所在的worker中移除掉driver
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
driver向master註冊
- 用spark-submit提交sparkApplication的時候,dirver首先就會向master進行註冊,將driver資訊放入到記憶體快取中,也就是hashmap中
- 加入等待排程佇列,也就是ArrayBuffer
- 用持久化引擎將driver資訊持久化,可能是檔案系統,可能是zookeeper
- 呼叫schedule()方法進行排程
原始碼分析:
第一步:請求提交Driver
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
/**
* 請求提交Driver,引數 DriverDescription
*/
case RequestSubmitDriver(description) => {
// 判斷Master狀態是否為Alive
if (state != RecoveryState.ALIVE) {
// 如果不為Alive,則迴應提交Driver失敗,需要向Alive的Master提交
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
//使用createDriver方法,建立driver
val driver = createDriver(description)
// 使用持久化引擎將driver的資訊持久化
persistenceEngine.addDriver(driver)
// 將driver加入到等待排程的佇列中(ArrayBuffer)
waitingDrivers += driver
// 在持久化記憶體中將driver加入到內部的記憶體緩衝結構中(HashSet)
drivers.add(driver)
//排程演算法
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
//註冊成功後,RpcCallContext迴應Driver註冊響應,已成功提交driver + id
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
......
}
第二步:點選第一步中createDriver
/**
* 建立Driver,返回封裝好的DriverInfo
*/
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
//將時間,driver id,desc等資訊封裝成一個DriverInfo返回
new DriverInfo(now, newDriverId(date), desc, date)
}
application向master進行註冊(registerApplication()方法)
- Driver啟動好之後,會執行我們的application程式碼,執行sparkContext的初始化,底層的SparkDeploySchedulerBackend,會通過AppClient內部的執行緒,ClientActor傳送RegisterAppliction,到master進行Application進行註冊
- 將application資訊放入到記憶體快取中(HashmMap)
- 將application加入等待的排程佇列(ArrayBuffer)
- 用持久化引擎將application資訊持久化,可能是檔案系統,可能是zookeeper
- 呼叫schedule()方法進行排程
原始碼分析:
第一步:底層的SparkDeploySchedulerBackend,會通過AppClient內部的執行緒,ClientActor傳送Register Appliction,到master進行Application進行註冊
原始碼位置:org.apache.spark.deploy.client.AppClient
/**
* Register with all masters asynchronously and returns an array `Future`s for cancellation.
* master支援兩種主備切換機制,一種的hdfs的,另外一種是基於zookeeper的(動態ha,熱切換的)
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
//傳送RegisterApplication這個case cass,把appDescription傳送給master,進行向master註冊
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
第二步:處理Applicaton的註冊的請求
原始碼位置:org.apache.spark.deploy.master.Master
/**
* 處理Applicaton的註冊的請求
*/
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
// 如果master的狀態為standby,也就是當前的這個master,不是active
// 那麼applicaiton來請求註冊,什麼都不會做
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
// 用ApplicationDescription資訊,建立ApplicationInfo
val app = createApplication(description, driver)
// 註冊Application ,將Application加入快取,將Application加入等待排程的佇列
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// 用持久化引擎,將ApplicationInfo進行持久化
persistenceEngine.addApplication(app)
// 反向,向SparkDeploySchedulerBackend的AppClient的ClientActor傳送訊息,也就是registeredApplication,而不是registerApplication
driver.send(RegisteredApplication(app.id, self))
schedule()
}
}
第三步:點選第二步中的createApplication
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
// 用當前時間戳給該application生成唯一的id
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
}
第四步:點選registerApplication
private def registerApplication(app: ApplicationInfo): Unit = {
//拿到driver的地址
val appAddress = app.driver.address
//如果driver的地址存在的情況下,就直接返回,就相當於對driver進行重複註冊
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
//將Application的資訊加入到記憶體快取中
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
//將Application的資訊加入到等待排程的佇列中,排程的演算法為FIFO
waitingApps += app
}
第五步:反向,向SparkDeploySchedulerBackend的AppClient的ClientActor傳送訊息,點選SparkDeploySchedulerBackend的RegisteredApplication
原始碼位置:org.apache.spark.deploy.client.AppClient
override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(appId_, masterRef) =>
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
/**
* 如何修復這個bug?
* 1.由於不穩定的網路,一個主接收多個註冊並返回多個註冊應用程式
* 2.在master的變化的過程中,如何從不同的registeredApplication從不同的master中
*/
appId = appId_
registered = true
master = Some(masterRef)
listener.connected(appId)
case ApplicationRemoved(message) =>
markDead("Master removed our application: %s".format(message))
stop()
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
// FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
// guaranteed), `ExecutorStateChanged` may be sent to a dead master.
sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId))
}
Master狀態改變處理機制原理
Driver狀態改變(DriverStateChanged)
- 如果driver的狀態是錯誤的、完成的、被殺掉、失敗,那麼就移除driver
- 首先將driver加入到已經完成的driver中,然後將driver的狀態設定為final,最後從driver所對應的worker中移除driver,釋放資源
第一步:Driver狀態改變
原始碼分析:
/**
* Driver狀態改變
*/
case DriverStateChanged(driverId, state, exception) => {
state match {
//如果Driver的狀態是錯誤,完成,被殺掉,失敗
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
//移除Driver
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
第二步:點選第一步中removeDriver
/**
1. 移除driver
*/
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
//scala的find高階函式,找到driverId對應的 driver
drivers.find(d => d.id == driverId) match {
//如果找到了,Some,樣例類(Option)
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//記憶體快取移除(HashSet)
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 將driver加入到已經完成的driver中
completedDrivers += driver
// 將driver從持久化引擎中移除掉
persistenceEngine.removeDriver(driver)
// 將driver的狀態設定為final
driver.state = finalState
driver.exception = exception
// 將driver所在的worker中移除掉driver
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
Executor狀態改變(ExecutorStateChanged)
- 找到executor對應的applicaiton,然後再反過來通過applicaiton內部的executors快取獲得executor資訊
- 設定executor的當前狀態為LAUNCHING狀態,並向driver同步傳送ExecutorUpdated訊息
- 如果executor的狀態已經發生變化,從application中移除executor,從執行中executor對應的worker中移除executor
- 判斷如果executor的狀態是異常的,進行applicatino重試操作,如果重試了10次,都是失敗,那麼就認為排程失敗,移除application。如果在重試的10次之內恢復正常了,就進行重新排程
原始碼分析:
/**
* Executor狀態發生改變
*/
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
// 找到executor對應的App,然後再反過來通過App內部的executors快取獲得executor資訊
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
// 如果有值
case Some(exec) => {
// 設定executor的當前狀態為LAUNCHING狀態
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
// 向driver同步傳送ExecutorUpdated訊息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
// 如果executor的狀態已經發生變化
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
// 從app快取中移除executor
appInfo.removeExecutor(exec)
}
// 從執行executor的worker的快取中移除executor
exec.worker.removeExecutor(exec)
// 判斷,如果executor的狀態是非正常的
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
// 判斷applcation當前的重試次數,是否達到了最大值,最大值為10次
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
//重新進行排程
schedule()
} else {
// 否則就進行removeApplication操作
// 也就是說,executor反覆排程都是失敗,那麼就認為application失敗了
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}
第二步:點選第一步的removeExecutor
/**
* 將executor從內部的記憶體緩衝結構中移除
*/
private[master] def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
coresGranted -= exec.cores
}
}
第三步:點選第一步的removeApplication
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
// 將資料從記憶體快取結果中移除
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach(a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
// If application events are logged, use them to rebuild the UI
rebuildSparkUI(app)
for (exec <- app.executors.values) {
// 殺掉app對應的executor
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
//driver傳送訊息
app.driver.send(ApplicationRemoved(state.toString))
}
// 從持久化引擎中移除application
persistenceEngine.removeApplication(app)
schedule()
// Tell all workers that the application has finished, so they can clean up any app state.
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}
Master資源排程演算法
原理:
- 首先判斷master的狀態不是alive的話直接返回,也就是說,standby master是不會進行application等資源的排程的
- 取出workers中所有註冊上來上的worker,進行過濾,必須是狀態為alive的worker,呼叫rondom的shuffle方法進行隨機的打亂(從第三個worker進行shuffle)
- 遍歷活著的worker,啟動driver,將driver加入都記憶體緩衝結構中,並將driver從等待的driver的佇列中移除
- 在workers上啟動executors,使用是預設SpreadOutApps演算法
原始碼分析:
第一步:呼叫schedule()方法
原始碼位置:org.apache.spark.deploy.master.Master
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*
* 資源排程演算法
*
*/
private def schedule(): Unit = {
//先判斷master的狀態不是alive的話直接返回
// 也就是說,standby master是不會進行application等資源的排程的
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
/**
* Random.shuffle的原理是對傳入的集合的元素進行隨機的打亂
* 意思就是從ArrayBuffer的最後一個元素開始到第三個元素,對於每個元素,都會取出該範圍內的隨機數
* 比如說buf.length為10,然後next會取0到10的一個隨機數,然後就會把buf隨機的一個位置和該數字進行交換
*/
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
/**
* 這裡是對取出workers中所有註冊上來上的worker,進行過濾,必須是狀態為alive的worker
*/
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
/**
* 遍歷等待的driver,只有yarn-cluster和standalone的cluster模式提交的時候,才會註冊driver,其他方式都是在
* 本地啟動driver,而不是來註冊driver,更不可能讓master來排程driver
*/
for (driver <- waitingDrivers) {
//判斷當前的worker的空閒記憶體量大於等於driver需要的記憶體量,並且判斷worker的空閒cpu數大於等於driver需要的cpu數量
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 啟動driver
launchDriver(worker, driver)
//將driver從等待的driver的佇列中刪除
waitingDrivers -= driver
}
}
}
//在workers上啟動和排程executor
startExecutorsOnWorkers()
}
第二步:呼叫第一步的shuffle方法
原始碼位置:scala.util.Random
/** Returns a new collection of the same type in a randomly chosen order.
* 對傳入的集合的元素進行隨機的打亂
* @return the shuffled collection
*/
def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
//傳遞的值賦值給ArrayBuffer
val buf = new ArrayBuffer[T] ++= xs
//位置替換
def swap(i1: Int, i2: Int) {
val tmp = buf(i1)
buf(i1) = buf(i2)
buf(i2) = tmp
}
//從ArrayBuffer的最後一個元素開始到第三個元素,對於每個元素,都會取出該範圍內的隨機數
//比如說buf.length為10,然後next會取0到10的一個隨機數,然後就會把buf隨機的一個位置和該數字進行交換
for (n <- buf.length to 2 by -1) {
val k = nextInt(n)
swap(n - 1, k)
}
(bf(xs) ++= buf).result
}
第三步:呼叫第一步的launchDriver方法
/**
* 在某一個worker上啟動driver
*/
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 將driver加入到worker內部的緩衝結構中
// 將worker中使用的記憶體和cpu的數量,都加上driver需要的記憶體和cpu的數量
worker.addDriver(driver)
// 將worker加入到driver的記憶體緩衝結構中
driver.worker = Some(worker)
// 呼叫worker的actor,給worker傳送註冊driver的資訊,讓worker啟動driver
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
//將driver狀態設定成執行中
driver.state = DriverState.RUNNING
}
第四步:呼叫第一步的startExecutorsOnWorkers方法
/**
* Schedule and launch executors on workers
*
* Application的排程機制(核心之核心)
* ①spreadOutApps 將app要使用的資源分平均分配到workers上
* ②非spreadOutApps 將app儘可能多的分配到一個或幾個worker上,這樣其他的worker就不用分配了,儘可能資源大的分配
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
// 首先遍歷waitingApps中ApplicationInfo,並且還需要判斷程式中定義的使用cpu的數量-啟動執行application上
// worker上的excutor所使用的的cpu的要大於0
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
// 從workers中,過濾出worker的狀態為alive的,按照cpu的數量進行倒序排序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
// 給每個worker分配完application要求的cpu core之後,遍歷worker,只要判斷之前給這個worker分配到了core
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
// 就在worker上啟動executor
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
在這個方法中會為每一個application啟動一個executor,首先需要從workers中篩選出那些滿足條件的worker用來啟動executor,需要滿足的條件如下:
- worker中的剩餘的空閒的記憶體需要大於application中指定的每一個executor所需要的記憶體
- worker中的剩餘的空閒的cpu的核數必須大於application中指定的每一個executor所需要的cpu的核數(如果啟動的時候沒有指定coresPerExecutor,那麼就認為這個值是1)
第五步:呼叫第四步的scheduleExecutorsOnWorkers方法
/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
*
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*
* 這裡有一個bug問題,叢集中有4個worker,每一個worker是16核,我們分配3個executor,要求每一個executor分配16核,總共分配48核
* 實際情況下,我們每一個worker中分配了12核,12核小於16核,報錯[SPARK-8881].
*
* 為Application分配要在Worker上啟動的Executor返回一個數組,包括每個worker分配到的core數
*
*/
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
//每一個executor上的core的數量
val coresPerExecutor = app.desc.coresPerExecutor
//app中每個executor所需要的最小cpu核數,如果沒有預設最小核數為1
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
//如果沒有設定coresPerExecutor則代表對於當前的application只能在一個worker上啟動該app的一個executor
val oneExecutorPerWorker = coresPerExecutor.isEmpty
// 每個Executor的記憶體
val memoryPerExecutor = app.desc.memoryPerExecutorMB
// 獲取可用worker數量
val numUsable = usableWorkers.length
//構建一個可用worker長度的陣列,用於存放每個worker節點可用於分配的cpu核數(16,16,16,16)
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
//構建一個可用worker長度的陣列,用於存放每一個worker上新分配的executor數量(1,2,1,0)
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
//app實際會分配的CPU的數量,取app剩餘的還沒有被分配的CPU的數量和可以使用的workers的cpu數量總和之間的最小值
//防止超過當前可用的cpu核數
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
/** Return whether the specified worker can launch an executor for this app. */
/**
* 判斷指定的worker是否可以為這個app啟動一個executor
* 1.主要判斷是否還有core沒有被分配
* 2.判斷worker上的core的數量是否充足
* 3.if 新建一個executor 判斷worker上的記憶體是否足夠,判斷executor的數量是否達到了上限
* 4.不是 新建一個executor 那麼就只需要判斷1和2
*/
def canLaunchExecutor(pos: Int): Boolean = {
/**
* 判斷當前需要分配的cpu核數是否大於或者等於每個executor所需要的cpu核數,
* 比如總共只能分配8核,但是每個executor所需要的cpu核數是12,那麼就不能發起executor了,因為資源不夠用
*/
val keepScheduling = coresToAssign >= minCoresPerExecutor
/**
* 判斷該worker上是否還有足夠的cores,worker上剩餘的cores減去已經被分配出去的cores
*
* 比如現在worker剩餘核數16,然後又給application他分配了12核,即還剩4核可用
* 但是啟動一個executor需要12核,那麼4 < 12 表示核心不足使用了
*/
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
/**
* 如果我們允許每一個worker啟動多個executor,然後我們就可以啟動一個新的executor
* 否則如果worker已經啟動一個新executor,只需要將更多的核心分配給該executor即可
*/
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
// 如果需要發起新的executor,既需要判斷cpu核數是否足夠,還需要判斷 executor是否超過限制總數以及記憶體是否足夠
if (launchingNewExecutor) {
//該worker上已經被分配的記憶體
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
//worker上剩餘的記憶體減去被分配的記憶體需要大於將要被分配的記憶體
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
//判斷 executor是否超過限制
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
// 否則只是對已經存在的executor新增cpu核數,沒必要檢查記憶體和executor限制
keepScheduling && enoughCores
}
}
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
//canLaunchExecutor無非就是判斷worker上的cores和memory的數量是否滿足條件
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
//不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達到了這個Application的要求
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
//再次判斷是否可以啟動一個executor
while (keepScheduling && canLaunchExecutor(pos)) {
// 需要分配的核數減去每個executor所需要的最小核數
coresToAssign -= minCoresPerExecutor
// 對應的worker節點需要分配的cpu核數加上要啟動該executor所需要的最小CPU核數
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
/**
* 假如我們只允許在一個worker上啟動一個executor,那麼設定該worker上被分配的executor的數量為1
* 否則就是在原來的executors的數量上加上1
*/
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
/**
* 如果我們採用的是spreadOutApps這個演算法,就意味著我們需要儘可能的將executor分配到足夠多的
* worker上,此時就應該設定keepScheduling設定為false,結束在該executors上的分配
*
* 如果我們採用的不是spreadOutApps這個演算法,就意味著我們需要一直在這個worker上分配這個executor
* 這個時候我們就需要設定keepScheduling為true,讓其一直迴圈,一直到該worker上的資源不滿足再繼續分配executor了
*/
if (spreadOutApps) {
keepScheduling = false
}
}
}
//因為最外層是一個while迴圈,所以這邊在過濾一遍,如果分配結束了,那麼canLaunchExecutor就會返回false
//得到的freeworkers必然是一個空,最外層的迴圈就結束了
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
關於這兩種資源排程的策略:
spreadOutApps其實就是在可用的Worker列表中,逐個遍歷,在每個worker上面平均的啟動相應的Executor, 比如,現在需要啟動10個Executor,但是第一次判斷有5個可用(有空閒的資源)的worker節點,那麼第一趟遍歷的時候,會在每個Worker節點上面都啟動一個Executor,然後會進行第二次判斷,因為可能在第一次分配之後某個節點就沒有空閒的資源了,所以需要再次的判斷,然後進行第二次的遍歷分配,這裡我們假設在第二趟遍歷的時候,這5個worker依然有足夠的空閒資源,那麼在第二趟的遍歷之後,每個Worker節點上面都為這個app啟動了2個Executor(平均分配)
非 spreadOutApps其實有兩重的含義,在之前的版本中,這種方式含義是,在第一判斷之後,在有可用的Worker節點上面選取空閒資源最多的呢個節點,儘可能的在這個節點上面啟動app所需的所有Executor,如果資源不夠的話,會進行在一次的判斷,選取空閒資源最多的Worker繼續啟動(不是太合理).現在是按照assignedExecutors變數,可以指定在可用的Worker節點上面分別啟動相應個數的Executor,(按需分配).
至於這兩種資源分配的方式是由 if (spreadOutApps) { keepScheduling = false }
這個判斷條件來判斷的
第六步:呼叫第四步的allocateWorkerResourceToExecutors方法
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
//在這個worker上需要分配的cores
assignedCores: Int,
//每一個executor需要的cores
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
// 獲取該worker應該有多少個executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
// 獲取每一個executor應該分配的核數,如果沒有指定則使用計算的應該分配的核數
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
// 向worker上新增executor,建立ExecutorDesc物件,更新application已經分配到的cpu核數
val exec = app.addExecutor(worker, coresToAssign)
// 啟動executor
launchExecutor(worker, exec)
// 更新application的狀態
app.state = ApplicationState.RUNNING
}
}
第六步:呼叫第六步的launchExecutor方法
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
// worker啟動executor,並且更新worker的cpu和記憶體資訊
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(
masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 向application傳送ExecutorAdded訊息
exec.application.driver.send(ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}