Spark2.3.2原始碼解析:9.排程系統 DAGScheduler 之 Stage 劃分原始碼詳解
Stage劃分的時候,大家應該都知道是從最後一個stage向根據寬窄依賴,遞迴進行stage劃分。
但是程式碼裡面涉及的邏輯複雜。畢竟涉及到相互遞迴呼叫。讓人似懂非懂。 反正我是炸毛了 o(╥﹏╥)o
本文專門用一篇文章詳細論述DAGScheduler 的 stage 劃分流程
為了更容易理解,本文采用 debug模式+例項+原始碼的方式進行講解
首先寫一個WordCount程式碼(這個程式碼,為了觀察多個suffle操作,我寫了兩個reducebykey 函式)
原始碼:
直接執行程式碼,檢視spark執行程式時,將程式碼劃分stage生成的DAG流程圖
可知: WordCount 在stage劃分的時候,劃分為三個stage
即在程式碼中如下標識:
首先,我們明確一個概念RDD
我們知道RDD有兩個重要屬性 id , name
為了在後面除錯的時候,清除的理解rdd之間的呼叫,需要對其做編號,本文以rdd的id進行區分
宣告的rdd的屬性我舉幾個例子:
zrdd1 :
型別: MapPartitionsRDD
id : 1
name : /tmp/zl/data/data.txt (注:只有zrdd1的name有值,為資料路徑, 其他的rddname值都是“null ” )
zrdd4:
型別: ShuffledRDD
id : 4
name : null
直接說結果:
屬性 | RDD Id (重要,區分標識) | RDD型別 |
zrdd1 | 1 | MapPartitionsRDD |
zrdd2 | 2 | MapPartitionsRDD |
zrdd3 | 3 | MapPartitionsRDD |
zrdd4 | 4 | ShuffledRDD |
zrdd5 | 5 | MapPartitionsRDD |
zrdd6 | 6 | ShuffledRDD |
zrdd7 | 7 | MapPartitionsRDD |
程式入口的觸發點即為: zrdd7.count() 方法。 實際執行的是runjob方法。開啟程式執行入口。
程式依賴關係如下圖:
接下來我們看原始碼解析程式碼檢視stage是如何劃分的:
即如下程式碼排程流程圖中標識的部分。
以為之前的文章有說明,所以不再詳細解釋。有興趣的小夥伴可以直接看
https://blog.csdn.net/zhanglong_4444/article/details/85111604
好了,我們開始正式說程式碼:
org.apache.spark.scheduler.DAGScheduler#createResultStage
這個方法裡面最重要的是getOrCreateParentStages 方法,從這就容易開始亂了。
別慌,我先給畫個呼叫圖,先搞清楚邏輯,再用debug跟一便就好了。
從圖上可知,最外層迴圈的主體為: getOrCreateParentStages
記住這個啊。 這個才是真正的迴圈呼叫建立stage的方法,不要被getShuffleDependencies這個方法所迷惑
getShuffleDependencies 這個方法只是根據一個rdd返回這個rdd所在的寬依賴 ShuffleDependency
好了,先看一下類中的程式碼,然後我在畫個圖,講解
getOrCreateParentStages:
根據給定的RDD獲取或者建立父stages列表 ,新的stage會根據提供的firstJobId進行建立
這個方法很重要,遞迴呼叫的就是這個方法:
getShuffleDependencies
根據給定的RDD獲取或者建立父stages列表
返回值結構: ShuffleDependency 是一個寬依賴
getOrCreateShuffleMapStage (這個方法注意看一下)
getMissingAncestorShuffleDependencies
這裡面有一個遞迴方法 getShuffleDependencies 獲取shuffle依賴 (快取過的即為處理過的,不做任何處理)
ArrayStack 棧是一種後進先出(LIFO)的資料結構。
所以在迴圈的時候,最先取出的值,是最後放進的值。
createShuffleMapStage
根據所給的 ShuffleDependency 建立 ShuffleMapStage
這個裡面尤其要注意一點:
val parents = getOrCreateParentStages(rdd, jobId)
好了,接下來,我們畫個圖理解一下。
其實也不用畫圖。
主要是:
val deps = getMissingAncestorShuffleDependencies(shuffleDep.rdd)
這句,直接會吧所有寬依賴的都會找出來,然後提交。
返回的資料結構是 ArrayStack 這個資料結構是棧是一種後進先出(LIFO)的資料結構
用遞迴的的方式拿到stage ,然後再取出
因為儲存的時候,是棧儲存,所以提交的時候是stage0, 帶入上面的方法:
val parents = getOrCreateParentStages(rdd, jobId)
stage0沒有parents,所以返回值,為空。然後將stage0加入快取。 如下程式碼
stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage
當在傳入stage1的時候,獲取父的依賴,也就是stage0,這個在上一次呼叫的時候,已經處理過了
已經獲取到了,所以在呼叫getOrCreateParentStages方法的時候,可以直接從快取中拿到值。
如下方法,直接從快取中獲取。相當於做了一個優化。
好了,下面是畫圖的方式說了一下,有不明白的地方可以給我留言。
舉例:
根據程式碼劃分:stage的時候是這個結構:
入棧:
出棧:
好了,接下來看一下圖多個依賴,提交的時候。流程圖
多個依賴提交例子 (深度遍歷演算法)
RDDs原始依賴圖
getShuffleDependencies
RDD:15 , 獲取上一層依賴,返回的結果是 ShuffleDependency 集合
getMissingAncestorShuffleDependencies
深度遍歷順序獲取所有祖先的寬依賴,這裡返回的是一個集合。 其實這個也是一個優化,如果採用遞迴方法的呼叫的話,
很容易因為巢狀層級過多,導致棧溢位。
傳入值如果是RDD:13 返回 紅色的寬依賴。
最後劃分結果
就寫到這裡了,這部分有疑問或者有不對的地方
麻煩請指教,不勝感激。。。
參考連結:
http://spark.apache.org/docs/latest/
https://www.jianshu.com/p/14355e250e2f