Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (三) TaskScheduler : Executor 任務提交
阿新 • • 發佈:2019-01-10
架構圖:
程式碼提交時序圖
Standalone模式提交執行流程圖:
首先寫一個WordCount程式碼(這個程式碼,為了觀察多個stage操作,我寫了兩個reducebykey 函式)
原始碼:
直接執行程式碼,檢視spark執行程式時,將程式碼劃分stage生成的DAG流程圖
可知: WordCount 在stage劃分的時候,劃分為三個stage
即在程式碼中如下標識:
本文繼續說task提交:
接上文,本文講根據分配的資源啟動task
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#launchTasks
首先看一下,傳入的物件:TaskDescription
TaskDescription任務資訊:
taskId: Long : 任務id
attemptNumber: Int, 重試次數
executorId: String : executor的Id ,即task分配給具體哪個executor
name: String, 任務名稱
index: Int : 任務在TaskSet中的索引
addedFiles: Map[String, Long] : 任務依賴的檔案
addedJars: Map[String, Long] : 任務依賴的jar包
properties: Properties : 任務依賴的屬性
serializedTask: ByteBuffer 序列化
executor 啟動task , 呼叫:org.apache.spark.executor.Executor#LaunchTask 方法
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
1.將task封裝成 TaskRunner
2.加入等待佇列 runningTasks
3.執行執行緒 TaskRunner
啟動 TaskRunner 。。。。。。。。。。。
org.apache.spark.executor.Executor.TaskRunner # run
org.apache.spark.scheduler.Task#run
執行這個類中的run方法
執行task中的run方法
//TODO ShuffleMapTask
//TODO ResultTask
org.apache.spark.ShuffleMapTask#ResultTask
org.apache.spark.scheduler#ResultTask
其他的就是將task中的一些執行資訊直接返回,傳送給drver、bolckmanager 等等,有興趣的去關注一下。。。。。。。。