1. 程式人生 > >Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (三) TaskScheduler : Executor 任務提交

Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (三) TaskScheduler : Executor 任務提交

架構圖:

 

 

程式碼提交時序圖

Standalone模式提交執行流程圖:

 

 

首先寫一個WordCount程式碼(這個程式碼,為了觀察多個stage操作,我寫了兩個reducebykey 函式)

原始碼:

 

直接執行程式碼,檢視spark執行程式時,將程式碼劃分stage生成的DAG流程圖

 

可知: WordCount 在stage劃分的時候,劃分為三個stage 

即在程式碼中如下標識:

 

本文繼續說task提交:

 

接上文,本文講根據分配的資源啟動task

 

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#launchTasks

首先看一下,傳入的物件:TaskDescription

 

 

TaskDescription任務資訊:

    taskId: Long :  任務id
    attemptNumber: Int, 重試次數
    executorId: String : executor的Id ,即task分配給具體哪個executor
    name: String, 任務名稱
    index: Int :   任務在TaskSet中的索引 
    addedFiles: Map[String, Long] : 任務依賴的檔案
    addedJars: Map[String, Long] : 任務依賴的jar包
    properties: Properties : 任務依賴的屬性
    serializedTask: ByteBuffer 序列化 

 

 

executor 啟動task , 呼叫:org.apache.spark.executor.Executor#LaunchTask 方法  


executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

1.將task封裝成 TaskRunner 
2.加入等待佇列 runningTasks 
3.執行執行緒 TaskRunner

 

啟動  TaskRunner  。。。。。。。。。。。

org.apache.spark.executor.Executor.TaskRunner # run

 

 

 

 

org.apache.spark.scheduler.Task#run

執行這個類中的run方法

 

執行task中的run方法
//TODO  ShuffleMapTask
//TODO  ResultTask

 

org.apache.spark.ShuffleMapTask#ResultTask

org.apache.spark.scheduler#ResultTask 

 

 

 

 

其他的就是將task中的一些執行資訊直接返回,傳送給drver、bolckmanager 等等,有興趣的去關注一下。。。。。。。。