1. 程式人生 > >DAGScheduler stage 劃分算法

DAGScheduler stage 劃分算法

tst 方法 代碼 總結 思想 內存 ima mis bsp

DAGScheduler stage 劃分算法

stage劃分算法很重要,對於spark開發人員來說,必須對stage劃分算法很清晰,知道自己編寫的spark Application被劃分成了幾個job,每個job被劃分成了幾個stage,每個stage包括哪些代碼,這樣當發現哪個stage報錯或者執行特別慢,才能針對對應代碼排查問題和性能調優

stage 劃分思想:

由submitStage() 和getMissingParentStage() 組成

會從觸發Action操作的那個RDD開始往前,首先為最後一個RDD創建一個stage,然後在往前,如果遇到某個RDD是寬依賴,就會為寬依賴創建一個新的stage,新的RDD就是最新的stage的最後一個RDD,然後以依次類推,繼續往前,根據寬依賴或者窄依賴進行stage劃分,知直到最後一個RDD遍歷完為止

stage劃分步驟:

1、使用出發job的最後一個RDD,創建finalStage(創建一個stage對象,並且將stage加入到DAGScheduler內部的內存緩存中)

2、使用finalStage創建一個job(這個job的最後一個stage,就是 finalStage)

3、將job加入到內存緩存中

4、使用 submitStage() 提交 finalStage  

提交stage的方法(stage劃分算法入口):

調用 getMissingParentStage() 獲取當前這個 stage 的父 stage:

往棧中推入stage的最後一個RDD

while循環對stage的最後一個RDD,調用自己定義的visit()方法

visit():如果是窄依賴,將RDD放入棧中,如果是寬依賴,使用寬依賴的那個RDD創建一個stage,將isShuffleMap設為true

提交stage,為stage創建一批task,task數量與Partition數量相同

計算每個task對應的Partition的最佳位置(就是從stage最後一個RDD開始,去找被cache或checkpoint的RDD的Partition,task的最佳位置,就是該Partition的位置,這樣task就在那個節點上執行,不需要計算之前的RDD;如果從最後一個RDD到最開始的RDD,都沒有被cache或checkpoint,那麽最佳位置就是Nil,就是沒有最佳位置)

5.、針對stage的task,創建TaskSet對象,調用TaskScheduler的submitTask方法,提交TaskSet,提交到Excutor上去執行

總結如下:

1、從finalstage倒推,

2、通過寬依賴進行新的stage劃分

3、使用遞歸,優先提交父stage

技術分享圖片

對於每一種有shuffle的操作。底層對應了三個RDD:MapPartitionsRDD、ShuffleRDD、MapPartitionsRDD

DAGScheduler stage 劃分算法