1. 程式人生 > >大話Spark(8)-原始碼之DAGScheduler

大話Spark(8)-原始碼之DAGScheduler

DAGScheduler的主要作用有2個:

一、把job劃分成多個Stage(Stage內部並行執行,整個作業按照Stage的順序依次執行)
二、提交任務

以下分別介紹下DAGScheduler是如何做這2件事情的,然後再跟原始碼看下DAGScheduler的實現。

一、如何把Job劃分成多個Stage

1) 回顧下寬依賴和窄依賴

窄依賴:父RDD的每個分割槽只被子RDD的一個分割槽使用。(map,filter,union操作等)
寬依賴:父RDD的分割槽可能被多個子RDD的分割槽使用。(reduceByKey,groupByKey等)

如下圖所示,左側的運算元為窄依賴, 右側為寬依賴


窄依賴可以支援在同一個叢集Executor上,以管道形式順序執行多條命令,例如在執行了map後,緊接著執行filter。分割槽內的計算收斂,不需要依賴所有分割槽的資料,可以並行地在不同節點進行計算。所以它的失敗回覆也更有效,因為它只需要重新計算丟失的parent partition即可。最重要的是窄依賴沒有shuffle過程,而寬依賴由於父RDD的分割槽可能被多個子RDD的分割槽使用,所以一定伴隨著shuffle操作。

2) DAGScheduler 如何把job劃分成多個Stage

DAGScheduler會把job劃分成多個Stage,如下圖sparkui上的截圖所示,job 0 被劃分成了3個stage

DAGScheduler劃分Stage的過程如下:
DAGScheduler會從觸發action操作的那個RDD開始往前倒推,首先會為最後一個RDD建立一個stage,然後往前倒推的時候,如果發現對某個RDD是寬依賴(產生Shuffle),那麼就會將寬依賴的那個RDD建立一個新的stage,那個RDD就是新的stage的最後一個RDD。然後依次類推,繼續往前倒推,根據窄依賴或者寬依賴進行stage的劃分,直到所有的RDD全部遍歷完成為止。

3) wordcount的Stage劃分

在前面大話spark(3)-一圖深入理解WordCount程式在Spark中的執行過程中,我畫過一張wordcount作業的Stage的劃分的圖,如下:

可以看出上圖中,第一個stage的3個task並行執行,遇到reduceByKey這個產生shuffle的操作開始劃分出新的Stage。但是其實這張圖是不準確的。
其實對於每一種有shuffle的操作,比如groupByKey、reduceByKey、countByKey的底層都對應了三個RDD:MapPartitionsRDD、ShuffleRdd、MapPartitionsRDD
(寬依賴shuffle生成的rdd為ShuffleRdd)
其中Shuffle發生在第一個RDD和第二個RDD之間,前面說過如果發現對某個RDD是寬依賴(產生Shuffle),那麼就會將寬依賴的那個RDD建立一個新的stage
所以說上圖中 reduceByKey操作其實對應了3個RDD,其中第一個RDD會被劃分到Stage1中!

4) DAGScheduler劃分Stage原始碼

RDD類中所有的action運算元觸發計算都會呼叫sc.runjob方法, 而sc.runjob方法底層都會呼叫到SparkContext中的dagscheduler物件的runJob方法。
例如count這個action操作
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

一直追著runJob方法往底層看最終呼叫dagScheduler.runJob,傳入呼叫這個方法的rdd

dagScheduler.runJob內部呼叫submitJob提交當前的action到scheduler
submitJob內部呼叫DAGSchedulerEventProcessLoop傳送JobSubmitted的資訊,
在JobSubmitted內部最終呼叫dagScheduler的handleJobSubmitted(dagScheduler的核心入口)。

handleJobSubmitted方法如下:

上面程式碼中submitStage提交作業,其內程式碼如下:

submitStage方法中呼叫getMissingParentStages方法獲取finalStage的父stage,
如果不存在,則使用submitMissingTasks方法提交執行;
如果存在,則把該stage放到waitingStages中,同時遞迴呼叫submitStage。通過該演算法把存在父stage的stage放入waitingStages中,不存在的作為作業執行的入口。

其中最重要的getMissingParentStages中是stage劃分的核心程式碼,如下:

這裡就是前面說到的stage劃分的方式,檢視最後一個rdd的依賴,如果是窄依賴,則不建立新的stage,如果是寬依賴,則用getOrCreateShuffledMapStage方法建立新的rdd,依次往前推。

所以Stage的劃分演算法最核心的兩個方法為submitStage何getMissingParentStage

二、提交任務

當Stage提交執行後,在DAGScheduler的submitMissingTasks方法中,會根據Stage的Partition個數拆分對應個數任務,這些任務組成一個TaskSet提交到TaskScheduler進行處理。
對於ResultStage(最後一個Stage)生成ResultTask,對於ShuffleMapStage生成ShuffleMapTask。
每一個TaskSet包含對應Stage的所有task,這些Task的處理邏輯完全一樣,不同的是對應處理的資料,而這些資料是對應其資料分片的(Partition)。
submitMissingTasks如下: