1. 程式人生 > >spark基礎之排程器執行機制簡述

spark基礎之排程器執行機制簡述

一 概述

驅動程式在啟動的時候,首先會初始化SparkContext,初始化SparkContext的時候,就會建立DAGScheduler、TaskScheduler、SchedulerBackend等,同時還會向Master註冊程式;如果註冊沒有問題。Master通過叢集管理器(cluster manager)會給這個程式分配資源,然後SparkContext根據action觸發job。

Job裡面有一系列RDD, DAGScheduler從後往前推若發現是寬依賴的話,就劃分不同的Stage。

Stage劃分完後,Stage提交給底層的排程器TaskScheduler,TaskScheduler拿到這個Task的集合,因為Stage內部都是計算邏輯完全一樣的任務,只是資料不一樣而已。TaskScheduler就會根據資料本底性,將任務分配到Executor上執行。

Executor在任務執行完畢或者出狀況時,肯定要向Driver彙報

最後執行完畢,關閉SparkContext,同時建立的那些物件也被關掉。

二 什麼是Spark Driver 程式

Driver程式就是執行應用程式的main函式,它會建立SparkContext,準備應用程式的執行環境(初始化各個元件,比如DAGScheduler等),

然後應用程式由SparkContext負責和叢集通訊,資源的申請以及任務的分配和監控等。當Worker節點的Executor執行完Task之後,Driver同時負責將SparkContext關閉。

三 SparkContext

SparkContext是使用者和Spark叢集進行互動的唯一入口,可以用來在Spark叢集中建立RDD,累加器Accumulator和廣播變數; 它也是驅動程式至關重要的物件,由它提供應用程式所需要的執行環境。

SparkContext的核心作用就是準備應用程式執行環境,所以在初始化的時候會構造一系列物件DAGScheduler, TaskScheduler等,同時負責向Master註冊應用程式

只可以有一個SparkContext例項執行在一個JVM中,所以在建立SparkContext的時候之前,確保之前的SparkContext已經關閉了,即呼叫stop方法停止當前JVM中唯一執行的SparkContext

四 Spark Job的觸發

# 每一個final RDD的action操作會觸發一個job,比如count,collect,saveAsTextFile,forEach等都會觸發job。這就意味著應用程式如果有多個action操作 .

# 每一個Job根據寬依賴來劃分Stage,每一個job可能有一個或者多個Stage,比如reduceByKey,groupByKey等運算元,每一個Stage生成一個Task

# 所有的Stage會形成一個DAG(有向無環圖),由於RDD的Lazy特性,導致Stage也是Lazy級別的,只有遇到了Action才會真正發生作業的執行,在Action之前,Spark框架只是將要進行的計算記錄下來,並沒有真的執行。

# 一個作業可能有ResultStage和ShuffleMapStage組成:一個作業如果shuffle操作,那麼就只有一個ResultStage;如果有shuffle操作,那麼,則存在一個ResultStage和至少一個ShuffleMapStage



五 DAGScheduler

# DAG:Direct Acyclic Graph,spark主要用於RDD關係建模,描述RDD之間的依賴關係,主要用於構建RDD的資料流,即RDD的各個分割槽資料是從哪裡來的和構建基於資料流之上的操作運算元流,即RDD各個分割槽資料總共會經過哪些transformation和action的這兩種型別的一系列的操作的排程執行

# DAGScheduler需要解析DAG.它是一個面向stage的高層排程器,它把DAG拆分成很多個Task,每一組task都是一個stage,解析的時候,每當遇到shuffle操作的時候就會產生新的stage,然後以一個個TaskSet的形式提交給底層的排程器TaskScheduler.

# DAGScheduler需要記錄哪些RDD需要寫入磁碟

# DAGScheduler 需要尋求Task的最優排程,比如stage內部資料的本地性等

# DAGScheduler 需要監視因為shuffle跨節點輸出可能導致的失敗,如果發現stage失敗,可能需要重新提交stage

Job、Stage、TaskSet、Task含義和關係:

Job: 一個action操作就會觸發一個job,如果有多個action操作就會有多個job.

Stage: 一個Job會被DAGScheduler拆分成多組任務,每一組任務就是由一個Stage封裝,stage之間也有依賴關係。如果RDD之間沒有shuffle操作那麼就只有一個ResultStage;如果有shuffle操作,那麼就有一個ResultStage和至少一個ShuffleMapStage

TaskSet: 一組任務就是一個TaskSet,對應著一個Stage,所以也可以理解為一個Stage就是一個TaskSet

Task: 一個獨立的工作單元,由驅動程式傳送到Executor上去執行。通常情況下,一個Task處理一個RDD的分割槽的資料,根據返回型別不同,又分為ResultTask和ShuffleMapTask

六 TaskScheduler

TaskScheduler主要是提交TaskSet到叢集運算並彙報結果

# 為TaskSet建立和維護一個TaskSetManager,並追蹤任務本地性及錯誤資訊

# 遇到一些迷路的任務(straggle)會放在其他節點重試

# 向DAGScheduler彙報執行情況,包括shuffle輸出丟失時報告fetch failed錯誤

七 SchedulerBackend

排程器的通訊終端,以SparkDeploySchedulerBackend在啟動時,構造了AppClient例項,並在該例項start時啟動ClientEndpoint訊息迴圈體,ClientEndpoint在啟動時會向Master註冊當前程式。

SparkDeploySchedulerBackend的父類CoarseGrainedSchedulerBackend在start時會例項化型別為DriverEndPoint訊息迴圈體,SparkDeploySchedulerBackend專門負責收集Worker上資源資訊,當ExecutorBackend啟動時會發送RegisteredExecutor資訊向DriverPoint註冊,此時SparkDeploySchedulerBackend就掌握了當前應用程式所擁有的計算資源。