1. 程式人生 > >Spark2.3.2原始碼解析:9.排程系統 DAGScheduler 之 Stage 劃分原始碼詳解

Spark2.3.2原始碼解析:9.排程系統 DAGScheduler 之 Stage 劃分原始碼詳解

Stage劃分的時候,大家應該都知道是從最後一個stage向根據寬窄依賴,遞迴進行stage劃分。

但是程式碼裡面涉及的邏輯複雜。畢竟涉及到相互遞迴呼叫。讓人似懂非懂。 反正我是炸毛了 o(╥﹏╥)o

本文專門用一篇文章詳細論述DAGScheduler 的 stage 劃分流程

為了更容易理解,本文采用 debug模式+例項+原始碼的方式進行講解

首先寫一個WordCount程式碼(這個程式碼,為了觀察多個suffle操作,我寫了兩個reducebykey 函式)

原始碼:

 

直接執行程式碼,檢視spark執行程式時,將程式碼劃分stage生成的DAG流程圖

 

可知: WordCount 在stage劃分的時候,劃分為三個stage 

即在程式碼中如下標識:


 

首先,我們明確一個概念RDD

 

我們知道RDD有兩個重要屬性 id  , name

為了在後面除錯的時候,清除的理解rdd之間的呼叫,需要對其做編號,本文以rdd的id進行區分

宣告的rdd的屬性我舉幾個例子:

 

zrdd1 : 

型別:  MapPartitionsRDD

id : 1

name : /tmp/zl/data/data.txt  (注:只有zrdd1的name有值,為資料路徑, 其他的rddname值都是“null ”  )

 


zrdd4:

型別:  ShuffledRDD

id : 4

name : null

 

直接說結果:

屬性 RDD Id  (重要,區分標識) RDD型別
zrdd1 1 MapPartitionsRDD
zrdd2 2 MapPartitionsRDD
zrdd3 3 MapPartitionsRDD
zrdd4 4 ShuffledRDD
zrdd5 5 MapPartitionsRDD
zrdd6 6 ShuffledRDD
zrdd7 7 MapPartitionsRDD

 

 

 

 

 

 

 

 

 

程式入口的觸發點即為: zrdd7.count() 方法。  實際執行的是runjob方法。開啟程式執行入口。

程式依賴關係如下圖:

 

接下來我們看原始碼解析程式碼檢視stage是如何劃分的:

 

即如下程式碼排程流程圖中標識的部分。

 

 

以為之前的文章有說明,所以不再詳細解釋。有興趣的小夥伴可以直接看

https://blog.csdn.net/zhanglong_4444/article/details/85111604

 

好了,我們開始正式說程式碼:

org.apache.spark.scheduler.DAGScheduler#createResultStage

 

這個方法裡面最重要的是getOrCreateParentStages 方法,從這就容易開始亂了。

別慌,我先給畫個呼叫圖,先搞清楚邏輯,再用debug跟一便就好了。

 

從圖上可知,最外層迴圈的主體為: getOrCreateParentStages

記住這個啊。 這個才是真正的迴圈呼叫建立stage的方法,不要被getShuffleDependencies這個方法所迷惑

getShuffleDependencies 這個方法只是根據一個rdd返回這個rdd所在的寬依賴 ShuffleDependency

 

好了,先看一下類中的程式碼,然後我在畫個圖,講解

getOrCreateParentStages: 

根據給定的RDD獲取或者建立父stages列表 ,新的stage會根據提供的firstJobId進行建立 

這個方法很重要,遞迴呼叫的就是這個方法:

 

getShuffleDependencies

根據給定的RDD獲取或者建立父stages列表 

 

返回值結構: ShuffleDependency 是一個寬依賴



getOrCreateShuffleMapStage  (這個方法注意看一下)

 

 

 

 

getMissingAncestorShuffleDependencies

這裡面有一個遞迴方法 getShuffleDependencies 獲取shuffle依賴 (快取過的即為處理過的,不做任何處理)

 

ArrayStack  棧是一種後進先出(LIFO)的資料結構。 

所以在迴圈的時候,最先取出的值,是最後放進的值。

 

 

 

createShuffleMapStage

根據所給的 ShuffleDependency 建立 ShuffleMapStage

這個裡面尤其要注意一點:

val parents = getOrCreateParentStages(rdd, jobId)

 

 

 

 

好了,接下來,我們畫個圖理解一下。

其實也不用畫圖。

主要是:

val deps =  getMissingAncestorShuffleDependencies(shuffleDep.rdd)

這句,直接會吧所有寬依賴的都會找出來,然後提交。

返回的資料結構是 ArrayStack 這個資料結構是棧是一種後進先出(LIFO)的資料結構

用遞迴的的方式拿到stage ,然後再取出

因為儲存的時候,是棧儲存,所以提交的時候是stage0, 帶入上面的方法:

val parents = getOrCreateParentStages(rdd, jobId)

stage0沒有parents,所以返回值,為空。然後將stage0加入快取。 如下程式碼

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage

當在傳入stage1的時候,獲取父的依賴,也就是stage0,這個在上一次呼叫的時候,已經處理過了

已經獲取到了,所以在呼叫getOrCreateParentStages方法的時候,可以直接從快取中拿到值。

如下方法,直接從快取中獲取。相當於做了一個優化。

 

好了,下面是畫圖的方式說了一下,有不明白的地方可以給我留言。

 

舉例:

根據程式碼劃分:stage的時候是這個結構:

 

入棧:

 

出棧:

 

 

好了,接下來看一下圖多個依賴,提交的時候。流程圖

 

 

 

 

多個依賴提交例子 (深度遍歷演算法)

RDDs原始依賴圖

 

 

getShuffleDependencies

RDD:15 , 獲取上一層依賴,返回的結果是 ShuffleDependency 集合

 

 

 

getMissingAncestorShuffleDependencies

深度遍歷順序獲取所有祖先的寬依賴,這裡返回的是一個集合。 其實這個也是一個優化,如果採用遞迴方法的呼叫的話,

很容易因為巢狀層級過多,導致棧溢位。

 

傳入值如果是RDD:13 返回 紅色的寬依賴。

 

 

最後劃分結果

 

 

就寫到這裡了,這部分有疑問或者有不對的地方

麻煩請指教,不勝感激。。。

 

 

 

 

參考連結:

http://spark.apache.org/docs/latest/

https://www.jianshu.com/p/14355e250e2f