1. 程式人生 > >Spark寬窄依賴詳解_

Spark寬窄依賴詳解_

1.寬窄依賴

 

圖中左邊是寬依賴,父RDD的4號分割槽資料劃分到子RDD的多個分割槽(一分割槽對多分割槽),這就表明有shuffle過程,父分割槽資料經過shuffle過程的hash分割槽器(也可自定義分割槽器)劃分到子RDD。例如GroupByKey,reduceByKey,join,sortByKey等操作。

圖右邊是窄依賴RDD的每個分割槽的資料直接到子RDD的對應一個分割槽(一分割槽對一分割槽),例如1號到5號分割槽的資料都只進入到子RDD的一個分割槽,這個過程沒有shuffle。Spark中Stage的劃分就是通過shuffle來劃分。shuffle可理解為資料的從原分割槽打亂重組到新的分割槽)

如:map,filter

總結:如果父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,否則的話就是寬依賴。

2.寬窄依賴&&容錯性

Spark基於lineage的容錯性是指,如果一個RDD出錯,那麼可以從它的所有父RDD重新計算所得,如果一個RDD僅有一個父RDD(即窄依賴),那麼這種重新計算的代價會非常小。

 

Spark基於Checkpoint(物化)的容錯機制何解?在上圖中,寬依賴得到的結果(經歷過Shuffle過程)是很昂貴的,因此,Spark將此結果物化到磁碟上了,以備後面使用

對於join操作有兩種情況,如果join操作的每個partition 僅僅和已知的Partition進行join,此時的join操作就是窄依賴;其他情況的join操作就是寬依賴;因為是確定的Partition數量的依賴關係,所以就是窄依賴,得出一個推論,窄依賴不僅包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨著RDD資料規模的改變而改變)

3.Stage的劃分

名詞解析

1.一個 job,就是由一個 rdd 的 action 觸發的動作,可以簡單的理解為,當你需要執行一個 rdd 的 action 的時候,會生成一個 job。

2.stage : stage 是一個 job 的組成單位,就是說,一個 job 會被切分成 1 個或 1 個以上的 stage,然後各個 stage 會按照執行順序依次執行。

3.task :即 stage 下的一個任務執行單元,一般來說,一個 rdd 有多少個partition,就會有多少個 task,因為每一個 task 只是處理一個partition 上的資料。

劃分規則

1.從後向前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到Stage中;

2.每個Stage裡面的Task的數量是由該Stage中最後 一個RDD的Partition數量決定的;

3.最後一個Stage裡面的任務的型別是ResultTask,前面所有其他Stage裡面的任務型別都是ShuffleMapTask;

4.代表當前Stage的運算元一定是該Stage的最後一個計算步驟;

總結:由於spark中stage的劃分是根據shuffle來劃分的,而寬依賴必然有shuffle過程,因此可以說spark是根據寬窄依賴來劃分stage的。

Spark優化

窄依賴對優化很有利。邏輯上,每個RDD的運算元都是一個fork/join(此join非上文的join運算元,而是指同步多個並行任務的barrier):把計算fork到每個分割槽,算完後join,然後fork/join下一個RDD的運算元。如果直接翻譯到物理實現,是很不經濟的:一是每一個RDD(即使 是中間結果)都需要物化到記憶體或儲存中,費時費空間;二是join作為全域性的barrier,是很昂貴的,會被最慢的那個節點拖死。如果子RDD的分割槽到 父RDD的分割槽是窄依賴,就可以實施經典的fusion優化,把兩個fork/join合為一個;如果連續的變換運算元序列都是窄依賴,就可以把很多個 fork/join併為一個,不但減少了大量的全域性barrier,而且無需物化很多中間結果RDD,這將極大地提升效能。Spark把這個叫做流水線(pipeline)優化。

Spark流水線優化:

 

變換運算元序列一碰上shuffle類操作,寬依賴就發生了,流水線優化終止。在具體實現 中,DAGScheduler從當前運算元往前回溯依賴圖,一碰到寬依賴,就生成一個stage來容納已遍歷的運算元序列。在這個stage裡,可以安全地實施流水線優化。然後,又從那個寬依賴開始繼續回溯,生成下一個stage。

Pipeline

spark中pipeline是一個partition對應一個partition,所以在stage內部只有窄依賴。pipeline詳解

stage與stage之間是寬依賴

4. 分散式計算過程

 

上圖是一個Spark的wordcount例子,根據上述stage劃分原則,這個job劃分為2個stage,有三行,分別是資料讀取、計算和儲存過程。

僅看程式碼,使用者根本體會不到資料在背後是平行計算。從圖中能看出資料分佈在不同分割槽(也可以理解不同機器上),資料經過flapMap、map和reduceByKey運算元在不同RDD的分割槽中流轉。(這些運算元就是上面所說對RDD進行計算的函式)

下圖從更高角度看:

 

Spark的執行架構由Driver(可理解為master)和Executor(可理解為worker或slave)組成,Driver負責把使用者程式碼進行DAG切分,劃分為不同的Stage,然後把每個Stage對應的task排程提交到Executor進行計算,這樣Executor就並行執行同一個Stage的task。

(這裡Driver和Executor程序一般分佈在不同機器上)

這裡有人可能不理解Stage和task,下圖就是Spark的作業劃分層次:

 

Application就是使用者submit提交的整體程式碼,程式碼中又有很多action操作,action運算元把Application劃分為多個job,job根據寬依賴劃分為不同Stage,Stage內劃分為許多(數量由分割槽決定,一個分割槽的資料由一個task計算)功能相同的task,然後這些task提交給Executor進行計算執行,把結果返回給Driver彙總或儲存。

這體現了 Driver端總規劃–Executor端分計算–結果最後彙總回Driver 的思想,也就是分散式計算的思想。