1. 程式人生 > >spark執行時的訊息通訊原始碼閱讀(二)

spark執行時的訊息通訊原始碼閱讀(二)

概要

(spark 版本為2.1.1)

應用程式(Application): 基於Spark的使用者程式,包含了一個Driver Program 和叢集中多個的Executor;

驅動程式(Driver Program):執行Application的main()函式並且建立SparkContext,通常用SparkContext代表Driver Program;

執行單元(Executor): 是為某Application執行在Worker Node上的一個程序,該程序負責執行Task,並且負責將資料存在記憶體或者磁碟上,每個Application都有各自獨立的Executors;

叢集管理程式(Cluster Manager): 在叢集上獲取資源的外部服務(例如:Standalone、Mesos或Yarn);

操作(Operation):作用於RDD的各種操作分為Transformation和Action;

 

角色

可以將spark的執行過程分為三塊,以standalone為例:

1、客戶端; 2、Master 3、Worker

再細分一下

1、客戶端可以細化為: Driver、sparkContext

2、Master就是Master

3、worker可以細分為:Executor

 

再細分一下:

1、客戶端中的sparkContext可以細分為:DAGScheduler、TaskScheduler

2、Master還是Master

3、Worker可以細分為:執行緒池、TaskRunner

 

畫個圖就是這麼個意思:

 

具體過程使用者提交應用程式時:

1、提交Spark任務,spark-submit提交application

2、使用spark-submit使用Standalone時會建立和構造一個DriverActor程序。

3、Driver執行編寫的程式碼,執行到在main函式中建立sparkContext,構建Spark Application的執行環境。

4、SparkContext(物件),在初始化的時候,做的最重要的兩件事情,就是構造出來DAGScheduler和TaskScheduler。

5、TaskScheduler(有自己的後臺程序),實際上負責,通過它對應的一個後臺程序,去連線Master,向Master註冊Application。

6、Master,接收到Application註冊的請求之後,Master會給Client返回一個註冊結果,Client將該Application標註為已註冊,並去連線Worker,會使用自己的資源排程演算法,在spark叢集的多個Worker上,為這個Application啟動多個Executor(StandaloneExecutorBackend)。

7、Master通知Worker啟動Executor。

8、Worker會為Applicator啟動Executor。

9、Executor(程序),啟動之後,會自己反向註冊到這個Application對應的這個SparkContext裡面的的TaskScheduler上去,這時TaskScheduler就知道自己服務於當前這個Application應用的Executor有哪些了,除此以外,Executor會向Master傳送心跳資訊,並申請Task(??????)。

10、所有Executor都反向註冊到Driver上之後,Driver結束SparkContext初始化,會繼續執行我們自己編寫的程式碼。

11、每執行到一個action,就會建立一個job。

12、job,會提交給DAGScheduler。

13、DAGScheduler,會將job劃分為多個stage,然後每個stage建立一個TaskSet。(stage,stage劃分演算法)。

14、每個TaskSet會提交給TaskScheduler。

15、TaskScheduler,會把TaskSet裡每一個task提交到executor上執行。所以,之前哪些executor是註冊到這個TaskScheduler上面來,那麼TaskScheduler在接收到TaskSet的時候,就會把Task提交到那些executor上面去。(task分配演算法)

16、Executor(程序),有一個執行緒池,每接收到一個task,都會用TaskRunner來封裝task,然後從執行緒池裡取出一個執行緒,執行這個task。

17、TaskRunner,將我們編寫的程式碼,也就是要執行的運算元以及函式,拷貝,反序列化,然後執行task。(Task,有兩種,ShuffleMapTask和ResultTask,只有最後一個stage是ResultTask,之前的stage都是ShuffleMapTask)。

18、所以,最後整個spark應用程式的執行,就是stage分批次作為taskset提交到executor執行,每個task針對RDD的一個partition,執行我們定義的運算元和函式,這些task在執行完對初始的RDD的運算元和函式之後,會產生一個新的RDD,這批task如果在一個stage裡面,他會繼續執行我們對第二個RDD定義的運算元和函式,然後以此類推,這個stage執行完以後會執行下一個stage,到job,直到所有操作執行完為止。

 

上述過程可簡化為以下過程:

1、在main方法中初始化SparkContext,SparkContext(客戶端)會向Master(也可以說是資源管理器)傳送應用註冊訊息,並申請執行Executor資源(此處是standalone環境,如果是onYarn就是ResourceManager),Master會給Client返回一個註冊結果,Client將該Application標註為已註冊。

2、Master根據應用的資源,給選擇Worker分配Executor資源並啟動StandaloneExecutorBackend;

3、啟動Executor後,Executor會向SparkContext(客戶端)傳送註冊成功資訊,同時將執行情況將隨著心跳傳送到Master上,並申請Task;

4、當SparkContext的RDD觸發行動操作後,將建立RDD的DAG,通過DAGSchedule進行劃分stage轉化為TaskSet,並把Taskset傳送給Task Scheduler;

5、Task Scheduler將Task傳送給註冊的Executor執行,同時SparkContext將應用程式程式碼傳送給Executor,Excutor接收到任務訊息後,啟動並執行任務(也就是說任務是在Excutor中執行);

6、最後當所有任務執行時,有Driver處理結果並回收資源。(Driver來申請資源和回收資源)

 

 

程式碼流程圖如下:

 

 

執行流程圖如下:

 

Spark執行架構特點:

  • 每個Application獲取專屬的executor進 程,該程序在Application期間一直駐留,並以多執行緒方式執行tasks。這種Application隔離機制有其優勢的,無論是從排程角度看 (每個Driver排程它自己的任務),還是從執行角度看(來自不同Application的Task執行在不同的JVM中)。當然,這也意味著 Spark Application不能跨應用程式共享資料,除非將資料寫入到外部儲存系統。
  • Spark與資源管理器無關,只要能夠獲取executor程序,並能保持相互通訊就可以了。
  • 提 交SparkContext的Client應該靠近Worker節點(執行Executor的節點),最好是在同一個Rack裡,因為Spark Application執行過程中SparkContext和Executor之間有大量的資訊交換;如果想在遠端叢集中執行,最好使用RPC將 SparkContext提交給叢集,不要遠離Worker執行SparkContext。
  • Task採用了資料本地性和推測執行的優化機制。

程式碼流程:

 

a) 建立client向master註冊Application的註冊執行緒池

類:StandaloneAppClient在ClientEndpoint(ClientEndpoint為StandaloneAppClient的私有類)的tryRegisterAllMaster方法中建立註冊執行緒池registerMasterThreadPool,在該執行緒池中啟動註冊執行緒並向Master傳送RegisterApplication註冊應用的訊息,程式碼如下所示:

類:

 

/**

* Register with all masters asynchronously and returns an array `Future`s for cancellation.

*/

private def tryRegisterAllMasters(): Array[JFuture[_]] = {

// 由於HA等環節中有多個Master,需要遍歷所有Master傳送訊息

for (masterAddress <- masterRpcAddresses) yield {

//向執行緒池中啟動註冊執行緒,當該執行緒讀到應用註冊成功標識registered=true時退出註冊執行緒

registerMasterThreadPool.submit(new Runnable {

override def run(): Unit = try {

if (registered.get) {

return

}

logInfo("Connecting to master " + masterAddress.toSparkURL + "...")

//獲取Master端的引用,傳送註冊應用訊息

val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)

masterRef.send(RegisterApplication(appDescription, self))

} catch {

case ie: InterruptedException => // Cancelled

case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)

}

})

}

}

b)Master接收到Application註冊資訊,完成註冊並返回Client,同時向Worker傳送啟動Executor的請求

Master 接收到註冊應用的訊息時,在registerApplication方法中記錄應用

資訊並把該應用加入到等待執行應用列表中,註冊完畢後傳送成功訊息RegisterApplication給ClientEndpoint,同時呼叫startExecutorsOnWorkers方法執行應用。在執行前需要獲取執行應用的Worker,然後傳送LaunchExcutor訊息給Worker,通知Worker啟動Excutor,其中Master.startExcutorsOnWorkers方法如下:

 

/**

* Schedule and launch executors on workers

*/

private def startExecutorsOnWorkers(): Unit = {

// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

// in the queue, then the second app, etc.

//從app列表中使用FIFO排程演算法執行應用,即先註冊的應用先執行。

for (app <- waitingApps if app.coresLeft > 0) {

val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor

// Filter out workers that don't have enough resources to launch an executor

//在worker列表中,根據worker狀態和資源資訊過濾出需要執行應用的worker

val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)

.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&

worker.coresFree >= coresPerExecutor.getOrElse(1))

.sortBy(_.coresFree).reverse

//確定執行在哪些Worker上和每個Worker分類用於執行的核數,分配演算法有兩種,一種是把應用執行//在儘可能多的Worker上,另一種是執行在儘可能 少的Worker上

val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

 

// Now that we've decided how many cores to allocate on each worker, let's allocate them

for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {

//傳送LaunchExecutor訊息給Worker,通知Worker啟動Executor。

allocateWorkerResourceToExecutors(

app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))

}

}

}

c) client 接收到Master返回的註冊成功資訊,完成註冊Application

AppClient.ClientEndpoint接收到Master傳送的RegisterApplication訊息,需要把登錄檔示registered,置為true(表示已註冊),Master註冊執行緒獲取狀態變化後,完成註冊Application程序,StandaloneAppClient.RegisteredApplication程式碼如下:

 

override def receive: PartialFunction[Any, Unit] = {

case RegisteredApplication(appId_, masterRef) =>

// FIXME How to handle the following cases?

// 1. A master receives multiple registrations and sends back multiple

// RegisteredApplications due to an unstable network.

// 2. Receive multiple RegisteredApplication from different masters because the master is

// changing.

appId.set(appId_)

registered.set(true)

master = Some(masterRef)

listener.connected(appId.get)

d) Worker的啟動Executor的過程

在b)步驟中,在Master類的startExecutorsOnWorkers方法中分配資源執行應用程式時,呼叫allocateWorkerResourceToExecutors方法實現在Worker中啟動Executor。當Worker收到Master傳送過來的LaunchExecutor訊息後,先例項化ExecutorRunner物件,在ExecutorRunner啟動中,會建立程序生成器 ProcessBuilder,然後由該生成器使用command建立CoarseGrainedExecutorBackend物件,該物件是Executor執行的容器,最後Worker傳送ExecutorStateChanged訊息給Master,通知Executor已經建立完畢。

當Worker接收到啟動Executor訊息,執行程式碼如下:

 

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>

if (masterUrl != activeMasterUrl) {

logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")

} else {

try {

logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

// 建立 Executor 執行目錄

// Create the executor's working directory

val executorDir = new File(workDir, appId + "/" + execId)

if (!executorDir.mkdirs()) {

throw new IOException("Failed to create directory " + executorDir)

}

 

// Create local dirs for the executor. These are passed to the executor via the

// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the

// application finishes.(通過SPARK_EXECUTOR_DIRS 環境變數,在worker中建立Executor執行目錄,當程式執行完畢後由worker進行刪除)

val appLocalDirs = appDirectories.getOrElse(appId, {

val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)

val dirs = localRootDirs.flatMap { dir =>

try {

//建立執行目錄

val appDir = Utils.createDirectory(dir, namePrefix = "executor")

//授權

Utils.chmod700(appDir)

Some(appDir.getAbsolutePath())

} catch {

case e: IOException =>

logWarning(s"${e.getMessage}. Ignoring this directory.")

None

}

}.toSeq

if (dirs.isEmpty) {

throw new IOException("No subfolder can be created in " +

s"${localRootDirs.mkString(",")}.")

}

dirs

})

appDirectories(appId) = appLocalDirs

 

//在ExecutorRunner中建立CoarseGrainedExecutorBackend物件,建立的是使用應用資訊中的//command,而command是在SparkDeploySchedulerBackbend的start方法中構建

val manager = new ExecutorRunner(

appId,

execId,

appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),

cores_,

memory_,

self,

workerId,

host,

webUi.boundPort,

publicAddress,

sparkHome,

executorDir,

workerUri,

conf,

appLocalDirs, ExecutorState.RUNNING)

executors(appId + "/" + execId) = manager

manager.start()

coresUsed += cores_

memoryUsed += memory_

//向master傳送訊息,表示Executoor狀態已經更改為ExecutorState.RUNNING

sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

} catch {

case e: Exception =>

logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)

if (executors.contains(appId + "/" + execId)) {

executors(appId + "/" + execId).kill()

executors -= appId + "/" + execId

}

sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,

Some(e.toString), None))

}

}

 

在ExecutorRunner建立中呼叫了fetchAndRunExecutor方法進行實現,在該方法中command內容在SparkDeploySchedulerBackend中定義,指定構造Executor執行容器CoarseGrainedExecutorBackend,其建立過程如下所示:ExecutorRunner.fetchAndRunExecutor()

 

private def fetchAndRunExecutor() {

try {

// Launch the process

// 通過應用程式資訊和環境配置建立構造器builder

val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),

memory, sparkHome.getAbsolutePath, substituteVariables)

val command = builder.command()

val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")

logInfo(s"Launch command: $formattedCommand")

 

// 在構造器builder中新增執行目錄等資訊

builder.directory(executorDir)

builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))

// In case we are running this from within the Spark Shell, avoid creating a "scala"

// parent process for the executor command

builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

 

//在構造器builder中新增監控頁面輸入日誌地址資訊

// Add webUI log urls

val baseUrl =

if (conf.getBoolean("spark.ui.reverseProxy", false)) {

s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="

} else {

s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="

}

builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")

builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

 

//啟動構造器,建立CoarseGrainedExecutorBackkend例項

process = builder.start()

val header = "Spark Executor Command: %s\n%s\n\n".format(

formattedCommand, "=" * 40)

 

//輸出CoarseGrainedExecutorBackkend例項的執行資訊

// Redirect its stdout and stderr to files (正確資訊)

val stdout = new File(executorDir, "stdout")

stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

 

//錯誤資訊

val stderr = new File(executorDir, "stderr")

Files.write(header, stderr, StandardCharsets.UTF_8)

stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

 

// 等待CoarseGrainedExecutorBackkend執行結算書,當結束時向Worker傳送退出狀態資訊

// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)

// or with nonzero exit code

val exitCode = process.waitFor()

state = ExecutorState.EXITED

val message = "Command exited with code " + exitCode

worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))

} catch {

case interrupted: InterruptedException =>

logInfo("Runner thread for executor " + fullId + " interrupted")

state = ExecutorState.KILLED

killProcess(None)

case e: Exception =>

logError("Error running executor", e)

state = ExecutorState.FAILED

killProcess(Some(e.toString))

}

}

 

e) Master接收到Worker傳送的啟動Executor完成的資訊

Master接收到Worker傳送的ExecutorStateChange訊息,根據ExecutorState。

類Master

 

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>

val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

execOption match {

case Some(exec) =>

val appInfo = idToApp(appId)

val oldState = exec.state

exec.state = state

 

if (state == ExecutorState.RUNNING) {

assert(oldState == ExecutorState.LAUNCHING,

s"executor $execId state transfer from $oldState to RUNNING is illegal")

appInfo.resetRetryCount()

}

// 向driver 傳送ExecutorUpdated訊息

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

 

if (ExecutorState.isFinished(state)) {

// Remove this executor from the worker and app

logInfo(s"Removing executor ${exec.fullId} because it is $state")

// If an application has already finished, preserve its

// state to display its information properly on the UI

if (!appInfo.isFinished) {

appInfo.removeExecutor(exec)

}

exec.worker.removeExecutor(exec)

 

val normalExit = exitStatus == Some(0)

// Only retry certain number of times so we don't go into an infinite loop.

// Important note: this code path is not exercised by tests, so be very careful when

// changing this `if` condition.

if (!normalExit

&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES

&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path

val execs = appInfo.executors.values

if (!execs.exists(_.state == ExecutorState.RUNNING)) {

logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +

s"${appInfo.retryCount} times; removing it")

removeApplication(appInfo, ApplicationState.FAILED)

}

}

}

schedule()

case None =>

logWarning(s"Got status update for unknown executor $appId/$execId")

}

 

f) Executor啟動後,會將Executor資訊傳送給Driver,Driver會返回確認訊息,併發送LaunchTask訊息執行任務。

在CoarseGrainedExecutorBackkend啟動方法onStart中,會發送註冊Executor訊息RegisterExecutor給DriverEndPoint,在Driver端,先判斷Executor是否已經註冊,如果已經存在則傳送註冊失敗RegisterExecutorFailed訊息,否則Driver會記錄該Executor資訊,傳送註冊成功RegisterExecutor訊息,在makeOffers()方法中分配執行任務資源,最後傳送LaunchTask訊息執行任務。

其中在Driver端進行註冊的Executor的過程如下:

類:CoarseGrainedSchedulerBackend

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>

if (executorDataMap.contains(executorId)) {

executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))

context.reply(true)

} else {

// If the executor's rpc env is not listening for incoming connections, `hostPort`

// will be null, and the client connection should be used to contact the executor.

val executorAddress = if (executorRef.address != null) {

executorRef.address

} else {

context.senderAddress

}

logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")

//記錄Executor

addressToExecutorId(executorAddress) = executorId

totalCoreCount.addAndGet(cores)

totalRegisteredExecutors.addAndGet(1)

val data = new ExecutorData(executorRef, executorRef.address, hostname,

cores, cores, logUrls)

// This must be synchronized because variables mutated

// in this block are read when requesting executors

//建立Executor編號和其具體資訊的鍵值列表

CoarseGrainedSchedulerBackend.this.synchronized {

executorDataMap.put(executorId, data)

if (currentExecutorIdCounter < executorId.toInt) {

currentExecutorIdCounter = executorId.toInt

}

if (numPendingExecutors > 0) {

numPendingExecutors -= 1

logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")

}

}

//回覆Executor 完成註冊訊息並在監聽匯流排中加入Executor事件

executorRef.send(RegisteredExecutor)

// Note: some tests expect the reply to come after we put the executor in the map

context.reply(true)

listenerBus.post(

SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))

//分配執行任務資源併發送LaunchTask訊息執行任務

makeOffers()

}

 

g) Executor接收到自己註冊成功的訊息後,會向Driver傳送心跳,並等待任務

當CoarseGrainedExecutorBackend接收到Executor註冊成功RegisterExecutor訊息時,在CoarseGrainedExecutorBackend 容器是例項化Executor物件。啟動完畢後,會向Driver定時傳送心跳資訊,等待接收從Driver端傳送執行任務的訊息

類CoarseGrainedExecutorBackend

 

case RegisteredExecutor =>

logInfo("Successfully registered with driver")

try {

//根據環境變數的引數啟動Executor,在spark中塔是真正任務的執行者

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

} catch {

case NonFatal(e) =>

exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)

}

在 new Executor 該類中,定時向Driver傳送心跳資訊,等待Driver下發任務:

 

// Executor for the heartbeat task.

private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

 

/**

* Schedules a task to report heartbeat and partial metrics for active tasks to driver.

*/

private def startDriverHeartbeater(): Unit = {

//設定間隔時間為10s

val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

 

//等待隨機的時間間隔,這樣心跳在同步中不會結束

// Wait a random interval so the heartbeats don't end up in sync

val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

 

val heartbeatTask = new Runnable() {

override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())

}

//傳送心跳資訊給Driver

heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)

}

}

 

h) 執行任務的過程

CoarseGrainedExecutorBackend的Executor啟動後,接收從Driver端傳送LaunchTask執行任務訊息,任務執行是在Executor的launchTask方法實現的。在執行時會建立TaskRunner程序,由該程序進行任務的處理,處理完畢後傳送statusUpdate訊息返回給CoarseGrainedExecutorBackend

類CoarseGrainedExecutorBackend通過Executor啟動launchTask:

case LaunchTask(data) =>

if (executor == null) {

// 當executor沒有成功啟動時,輸出異常日誌並關閉

exitExecutor(1, "Received LaunchTask command but executor was null")

} else {

val taskDesc = ser.deserialize[TaskDescription](data.value)

logInfo("Got assigned task " + taskDesc.taskId)

//啟動TaskRunner程序執行任務

executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,

taskDesc.name, taskDesc.serializedTask)

}

 

呼叫Executor的launchTask方法,在該方法中建立TaskRunner程序,然後把該程序加入到threadPool中,由Executor進行統一排程:

 

def launchTask(

context: ExecutorBackend,

taskId: Long,

attemptNumber: Int,

taskName: String,

serializedTask: ByteBuffer): Unit = {

val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,

serializedTask)

runningTasks.put(taskId, tr)

threadPool.execute(tr)

}

任務執行過程和獲取執行結果。

 

i) 執行完任務後的過程

在TaskRunner執行任務完成時,會由向Driver端傳送狀態變更訊息,當Driver接收到該訊息時,呼叫TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果進行處理,處理完畢後再給該Executor分配執行任務,其中,在Driver端處理狀態變更程式碼如下:

類 CoarseGrainedSchedulerBackend

case StatusUpdate(executorId, taskId, state, data) =>

//呼叫TaskSchedulerImpl的statusUpdate()方法,根據任務執行不同的結果進行處理

scheduler.statusUpdate(taskId, state, data.value)

if (TaskState.isFinished(state)) {

executorDataMap.get(executorId) match {

//任務執行成功後,回收該Executor執行該 任務的cpu,再根據實際情況分配任務。

case Some(executorInfo) =>

executorInfo.freeCores += scheduler.CPUS_PER_TASK

makeOffers(executorId)

case None =>

// Ignoring the update since we don't know about the executor.

logWarning(s"Ignored task status update ($taskId state $state) " +

s"from unknown executor with ID $executorId")

}

}

 

仔細看以上程式碼,其實就可以看出Driver端的方法就是CoarseGrainedSchedulerBackend類的方法

 

程式碼類和方法的執行流程: