1. 程式人生 > >7-1、Spark-Scheduler

7-1、Spark-Scheduler

4、Scheduler

Scheduler(任務排程)模組是Spark Core的核心模組之一。

Spark對於DAG(Directed Acyclic Graph,有向無環圖)的實現以及不同執行階段的劃分和任務的提交執行。

任務排程,即組成應用的多個Job之間如何分配計算資源。

4.1、整體模組概述

4.1.1DAGSchedulerTaskScheduler

任務排程模組主要包含兩大部分:DAGSchedulerTaskScheduler,它們負責將使用者提交的計算任務按照DAG劃分為不同的階段並且將不同階段的計算任務提交到叢集進行最終的計算。

下圖為任務排程邏輯檢視:

 

RDD Objects可以理解為使用者實際程式碼中建立的

RDD,這些程式碼邏輯上組成一個DAG。使用者實際程式設計的時候可以認為它處理的資料都是可以存在記憶體中,而無需關心最終在叢集中執行的任務是否整個資料都可以裝載到記憶體中或者說它究竟需要多少個節點參與運算。

DAGScheduler主要負責分析使用者提交的應用,並根據任務的依賴關係建立DAG,然後將DAG劃分為不同的Stage(階段),其中每個Stage由可以併發執行的一組Task構成,這些Task的執行邏輯完全相同,只是作用不同的資料。而且DAG在不同的資源管理框架下的實現是相同的。

DAGScheduler將這組Task劃分完成後,會將這組Task提交到TaskSchedulerTaskScheduler

通過Cluster Manager在叢集中的某個WorkerExecutor上啟動任務。

使用者提交的Job最終會呼叫DAGSchedulerrunJob,它又會呼叫submitJob

Submit首先會為這個Job生成一個Job ID,並且生成一個JobWaiter的例項來監聽Job的執行狀態。

JobWriter會監聽Job的執行狀態,而Job是由多個Task組成的,因此只有Job的所有Task都成功完成,Job才標記為成功,任意一個Task失敗都會標記這個Job失敗。

4.2JobStage的劃分

使用者提交的計算任務是由一個個RDD構成的DAG,如果RDD在轉換的時候需要做

Shuffle,那麼這個Shuffle的過程,就將這個DAG分為了不同的階段(即Stage)。

由於Shuffle的存在,不同的Stage是不能平行計算的,因為後面的Stage的計算需要前面的StageShuffle的結果。

一個Stage由一組完全獨立的計算任務(即Task組成),每個Task的執行邏輯完全相同,只不過每個Task都會處理其所對應的Partition如果Partition的數量和Task數量一致,那麼一個Partition會被該Stage的一個Task處理。

Spark根據寬依賴將Job劃分為不同的階段(Stage)。

下圖為RDD劃分示意圖:

 

Stage的劃分是從最後一個RDD開始的,也就是觸發Action的那個RDD

根據上圖分析:

劃分從RDD G的依賴開始:

RDD G依賴兩個RDDRDD BRDD F,其中先處理哪一個是隨機的。假如先處理RDD B,由於RDD BRDD G是窄依賴,因此RDD BRDD G可以劃分到一個Stage(即Stage3)。 再處理RDD F,因為這個依賴是寬依賴,所以RDD FRDD G被劃分到不同的Stage(即Stage 2Stage3),其中RDD F所在的Stage2RDD G所在的Stage3parent Stage

接著處理RDD B的依賴,由於它依賴的RDD A是寬依賴,因此它們屬於不同的StageStage1Stage3

最後,這個DAG被劃分為3Stage,即RDD A所在的Stage1RDD CRDD DRDD ERDD F所在的Stage2RDD BRDD G所在的Stage3

其中Stage1Stage2是相互獨立的,可以並行執行。Stage3依賴於Stage1Stage2,只有Stage1Stage2計算完成之後,它才可以開始計算。

4.3、應用Application

TaskSchedulerDAGScheduler都是在SparkContext建立的時候建立的。DAGScheduler儲存了TaskScheduler的引用,因此需要在TaskScheduler建立之後進行建立。

TaskScheduler負責Application的不同Job之間的排程,在Task執行失敗的時候啟動重試機制,並且為執行速度慢的Task啟動備份的任務。SchedulerBackend負責與Cluster Manager互動,取得該Application分配到的資源,並將這些資源傳給TaskScheduler,由TaskSchedulerTask最終分配計算資源。

 

應用的基本元件:

Application:使用者自定義的Spark程式,使用者提交後,SparkApp分配資源,將程式轉換並執行。

Driver Program:執行Applicationmain()函式並建立SparkContext

RDD GraphRDDSparkde 核心結構,可以通過一系列的運算元進行操作。當RDD遇到Action運算元時,將之前的所有運算元形成一個有向無環圖(DAG),也就是圖中的RDD Graph。再在Spark中轉化為Job,提交到叢集執行。一個App中可以包含多個Job

Job:一個RDD Graph觸發的作業,往往由Spark Action運算元觸發,在SparkContext中通過runJob方法向Spark提交Job

Stage:每個Job會根據RDD的寬依賴關係被切分成很多Stage,每個Stage中包含一組相同的Task,這一組TaskTaskSet

Task:一個分割槽對應一個TaskTask執行RDD 中對應Stage中包含的運算元。Task被封裝好後放入Executor的執行緒池中執行。

Driver程序是應用的主控程序,負責應用的解析、切分並排程TaskExecutor執行,包含DAGScgeduler等重要物件。

Master排程應用,針對每一個應用分發給指定的一個Worker啟動Driver,即SchedulerBackendSchedulerBackend程序中包含DAGScheduler,它會根據RDDDAG切分Stage,生成TaskSet,並排程和分發TaskExecutor

4.4、任務排程實現

排程分為4個級別:Application排程、Job排程、Stage排程、Task的排程與分發。

4.4.1Application排程:

SparkContext維持著整個應用的上下文,提供一些核心方法。如runJob提交Job,然後通過主節點的分配獲得一組Executor JVM程序執行任務。Executor空間內的不同應用之間是不共享的,一個Executor在一個時間段內只能分配給一個應用使用。

預設情況下,使用者向以Standalone模式執行的Spark叢集提交的應用使用FIFO(先進先出)的順序進行排程,每個應用會獨佔所有可用節點的資源。

StandaloneMesosYARN模式都不提供跨應用的共享記憶體。

4.4.2Job排程:

Spark應用程式內部,使用者通過不同執行緒提交的Job可以並行執行,這裡的Job就是Spark Action運算元觸發的整個RDD DAG為一個Job。實現上就是呼叫runJob方法提交的Job

下圖為FIFO排程模式:

 

在演算法執行中,先看優先順序,TaskSet的優先順序是JobID,因為先提交的Job ID小,所以就會被更優先的排程。這裡相當於進行了兩層排序,先看是否是同一個JobTaskSet,不同Job之間的TaskSet先排序。

最後執行的satageID最小為0,最先應該執行的stageID最大。但是這裡的排程機制是優先排程StageID小的。在DAGScheduler中控制Stage是否被提交到佇列,如果還有父母Stage未執行完,則該stageTaskset不會被提交到排程池中。這就保證了雖然最先執行的stageid最大,但是排序完,由於後面的還沒有提交到排程池,所以會先執行。

StageTaskSet排程邏輯主要在DAGScheduler中,而Job排程由FIFO或者FAIR演算法排程。

下圖為FAIR排程模型:

 

Spark0.8版本後,可以開始通過配置FAIR共享排程模式排程Job

FAIR共享模式排程下,Spark在多Job之間以輪訓方式為任務分配資源,所有任務擁有大致相當的優先順序來共享叢集的資源。

FAIR排程器同樣支援將Job分組加入排程池排程,使用者可以同時針對不同優先順序對每個排程池配置不同的排程權重。這種方式允許更重要的Job配置在高優先順序池中優先排程。這種方式借鑑了HadoopFAIR排程器。

配置排程池:

1)、排程模式:使用者可以選擇FIFOFAIR

2)、權重:作用在控制整個叢集資源分配上,這個排程池相對其他排程池優先順序的高低。

3)、minShare:這個引數決定整體排程的排程池能給待排程的排程池分配多少資源就可以滿足排程池的資源需求,剩下的資源可以分配給其他排程池。

4.4.3Stage排程:

Stage的排程由DAGScheduler完成的。由RDD 的有向無環圖DAG切分出了Stage的有向無環圖DAG

StageDAG通過最後執行的Stage為根進行廣度優先遍歷,便利到最開始執行的Stage執行,如果提交的stage仍有未完成的父母stage,則Stage需要等待其父母執行完才能執行。

TaskScheduler中將每個Stage中對應的任務進行提交和排程。

一個應用對應一個TaskScheduler,也就是這個應用中所有Action觸發的Job中的TaskSetManager都是由這個TaskScheduler排程的。

每個Stage對應的一個TaskSetManager通過Stage回溯到最源頭缺失的Stage提交到排程池pool中,在排程池中,這些TaskSetManager又會根據Job ID排序,先提交的JobTaskSetManager優先排程,然後一個Job內的TaskSetManager ID小的先排程,並且如果有未執行完的父母StageTaskSetManager,則不會提交到排程池中。

4.4.4Task排程

整體的Task分發由TaskSchedulerImpl來實現,但是Task的排程(本質上是Task在哪個分割槽上執行)邏輯由TaskSetManager完成。這個類監控這個任務的生命週期,當任務失敗時,重新排程。

如果是呼叫過cache()方法的RDD,資料已經快取在記憶體,則讀取記憶體快取中分割槽的資料。

如果能直接獲取執行地點,則返回執行地點作為任務的執行地點。通常DAG中最源頭的RDD或者每個Stage中最開始的RDD會有執行地點的資訊。例如HadoopRDDHDFS讀取的分割槽就是最好的執行地點。

如果以上2種情況都不是,則將遍歷RDD獲取第一個窄依賴的父RDD對應分割槽的執行地點。