1. 程式人生 > >Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (一) DAGScheduler 之 stage 提交

Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (一) DAGScheduler 之 stage 提交

 

一個Spark Application分為stage級別和task級別的排程, 

task來源於stage,所有本文先從stage提交開始講解task任務提交。

 

架構圖:

Standalone模式提交執行流程圖:

首先寫一個WordCount程式碼(這個程式碼,為了觀察多個suffle操作,我寫了兩個reducebykey 函式)

原始碼:

 

直接執行程式碼,檢視spark執行程式時,將程式碼劃分stage生成的DAG流程圖

 

可知: WordCount 在stage劃分的時候,劃分為三個stage 

即在程式碼中如下標識:

 

講TaskScheduler ,先從DAGScheduler中提交任務開始吧,其中在stage劃分task的時候,涉及到一些優化演算法。

org.apache.spark.scheduler.DAGScheduler#handleMapStageSubmitted

這個方法主要有三個部分:

1、建立finalStage

finalStage = getOrCreateShuffleMapStage(dependency, jobId)
2、建立ActiveJob

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
3.提交stage

submitStage(finalStage)

 

直接看第三步 submitStage

 

 

這個是提交stage方法。

裡面是一個遞迴方法,舉例:

在程式碼中, 劃分為三個stage:

stage0  ---> stage1     ---> stage2

 submitStage(stage: Stage) 這個方法先傳入的是 finalStage(stage2)

在方法裡面迴圈遞迴, 分別尋找stage的父stage, 即 stage2 找到 stage1 , stage1找到stage0

stage0 沒有父stage 即走 提交方法:

submitMissingTasks(stage: Stage, jobId: Int)

好,接下來,我們看submitMissingTasks

 

可以看到入參: ShuffleMapStage 0 和 jobId 0

 

 找出當前stage的所有分割槽中,還沒計算完分割槽的stage

 

ShuffleMapStage

stage.findMissingPartitions獲取需要計算的分割槽,不同的stage有不同的實現:

ResultStage

 

計算 分割槽的最佳位置 :  taskIdToLocations

 

 

計算最佳位置的核心方法: getPreferredLocsInternal  (遞迴方法)

 

這個開始傳入的RDD:3,

rdd:3找不到最佳位置, 找到rdd:3的父級rdd:2,

rdd2,找不到最佳位置,找到rdd2的父級rdd1

rdd1有最佳位置,直接返回: 具體的機器地址:

 

廣播資訊:

為每一個MapStage的分割槽 建立一個 ShuffleMapTask 或者 ResultTask 

將ShuffleMapTask 或者 ResultTask  封裝成taskSet,提交Task

 

在這裡執行的是

taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

 

接著呼叫執行的是:

org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks

 

 

未完,請看下一篇文章:

 

 

Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (二) TaskScheduler ​​

https://blog.csdn.net/zhanglong_4444/article/details/85249376