1. 程式人生 > >【Spark2.0源碼學習】-10.Task執行與回饋

【Spark2.0源碼學習】-10.Task執行與回饋

maps 關系 pro private reason ges 寫入 tor sub

通過上一節內容,DriverEndpoint最終生成多個可執行的TaskDescription對象,並向各個ExecutorEndpoint發送LaunchTask指令,本節內容將關註ExecutorEndpoint如何處理LaunchTask指令,處理完成後如何回饋給DriverEndpoint,以及整個job最終如何多次調度直至結束。 一、Task的執行流程 承接上一節內容,Executor接受LaunchTask指令後,開啟一個新線程TaskRunner解析RDD,並調用RDD的compute方法,歸並函數得到最終任務執行結果 技術分享技術分享
  • ExecutorEndpoint接受到LaunchTask指令後,解碼出TaskDescription,調用Executor的launchTask方法
  • Executor創建一個TaskRunner線程,並啟動線程,同時將改線程添加到Executor的成員對象中,代碼如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
runningTasks.put(taskDescription.taskId, taskRunner)
  • TaskRunner
    • 首先向DriverEndpoint發送任務最新狀態為RUNNING
    • 從TaskDescription解析出Task,並調用Task的run方法
  • Task
    • 創建TaskContext以及CallerContext(與HDFS交互的上下文對象)
    • 執行Task的runTask方法
      • 如果Task實例為ShuffleMapTask:解析出RDD以及ShuffleDependency信息,調用RDD的compute()方法將結果寫Writer中(Writer這裏不介紹,可以作為黑盒理解,比如寫入一個文件中),返回MapStatus對象
      • 如果Task實例為ResultTask:解析出RDD以及合並函數信息,調用函數將調用後的結果返回
  • TaskRunner將Task執行的結果序列化,再次向DriverEndpoint發送任務最新狀態為FINISHED
二、Task的回饋流程 TaskRunner執行結束後,都將執行狀態發送至DriverEndpoint,DriverEndpoint最終反饋指令CompletionEvent至DAGSchedulerEventProcessLoop中 技術分享
技術分享
  • DriverEndpoint接受到StatusUpdate消息後,調用TaskScheduler的statusUpdate(taskId, state, result)方法
  • TaskScheduler如果任務結果是完成,那麽清除該任務處理中的狀態,並調動TaskResultGetter相關方法,關鍵代碼如下:
val taskSet = taskIdToTaskSetManager.get(tid)
 
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
  executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
taskSet.removeRunningTask(tid)
 
if (state == TaskState.FINISHED) {
  taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
  taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
  • TaskResultGetter啟動線程啟動線程【task-result-getter】進行相關處理
    • 通過解析或者遠程獲取得到Task的TaskResult對象
    • 調用TaskSet的handleSuccessfulTask方法,TaskSet的handleSuccessfulTask方法直接調用TaskSetManager的handleSuccessfulTask方法
  • TaskSetManager
    • 更新內部TaskInfo對象狀態,並將該Task從運行中Task的集合刪除,代碼如下:
val info = taskInfos(tid)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
    • 調用DAGScheduler的taskEnded方法,關鍵代碼如下:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
  • DAGScheduler向DAGSchedulerEventProcessLoop存入CompletionEvent指令,CompletionEvent對象定義如下
private[scheduler] case class CompletionEvent(
    task: Task[_],
    reason: TaskEndReason,
    result: Any,
    accumUpdates: Seq[AccumulatorV2[_, _]],
    taskInfo: TaskInfo)
  extends DAGSchedulerEvent
三、Task的叠代流程 DAGSchedulerEventProcessLoop中針對於CompletionEvent指令,調用DAGScheduler進行處理,DAGScheduler更新Stage與該Task的關系狀態,如果Stage下Task都返回,則做下一層Stage的任務拆解與運算工作,直至Job被執行完畢
技術分享 技術分享
  • DAGSchedulerEventProcessLoop接收到CompletionEvent指令後,調用DAGScheduler的handleTaskCompletion方法
  • DAGScheduler根據Task的類型分別處理
  • 如果Task為ShuffleMapTask
    • 待回饋的Partitions減取當前partitionId
    • 如果所有task都返回,則markStageAsFinished(shuffleStage),同時向MapOutputTrackerMaster註冊MapOutputs信息,且markMapStageJobAsFinished
    • 調用submitWaitingChildStages(shuffleStage)進行下層Stages的處理,從而叠代處理最終處理到ResultTask,job結束,關鍵代碼如下:
private def submitWaitingChildStages(parent: Stage) {
   ...
  val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
  waitingStages --= childStages
  for (stage <- childStages.sortBy(_.firstJobId)) {
    submitStage(stage)
  }
}
  • 如果Task為ResultTask
    • 改job的partitions都已返回,則markStageAsFinished(resultStage),並cleanupStateForJobAndIndependentStages(job),關鍵代碼如下
for (stage <- stageIdToStage.get(stageId)) {
  if (runningStages.contains(stage)) {
    logDebug("Removing running stage %d".format(stageId))
    runningStages -= stage
  }
  for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
    shuffleIdToMapStage.remove(k)
  }
  if (waitingStages.contains(stage)) {
    logDebug("Removing stage %d from waiting set.".format(stageId))
    waitingStages -= stage
  }
  if (failedStages.contains(stage)) {
    logDebug("Removing stage %d from failed set.".format(stageId))
    failedStages -= stage
  }
}
// data structures based on StageId
stageIdToStage -= stageId
jobIdToStageIds -= job.jobId
jobIdToActiveJob -= job.jobId
activeJobs -= job
至此,用戶編寫的代碼最終調用Spark分布式計算完畢。

【Spark2.0源碼學習】-10.Task執行與回饋