1. 程式人生 > >spark的任務執行流程解析

spark的任務執行流程解析

一、從架構上看Spark的Job工作(Master\Worker)

[睡著的水-hzjs-2016.8.18]

1.Master節點上是Master程序,主要是管理資源的,資源主要是記憶體和CPU。Master能夠接收客戶端傳送的程式並且為程式進行註冊。worker節點有worker程序,負責當前節點的記憶體和cpu的使用,spark是主從結構式架構。

2.Spark執行作業的方式有很多,最簡單的是就是通過spark-shell ,每個程式的程式的ID是向master 註冊的時候由master分配的。worker節點在執行程式工作的時候的core數量、記憶體的大小是在安裝Spark叢集的時候配置檔案中我們自己配置的。當啟動spark-shell 的時候並沒有執行任何job計算任務,那Master有沒有分配資源呢?是有的!

3.master分配資源的方式是粗粒度的。spark-shell 中預設的是沒有任何的Job,有資源分配就是因為分配資源的方式粗粒度的,預設情況下,雖然沒有job計算,只要提交了程式並註冊了,就會分配資源。

4.分配資源預設的分配方式在每個workers上,為當前程式分配一個Executorbackend 進行,且預設情況下會最大化的使用cores和memory。

5.Executor一次性最多能執行多少個併發的task,取決於當前Executor 能夠使用的work的core的數量

6.有多個Executor ,就能更多的獲取記憶體跟core的數量,但是預設情況下,一個節點只有一個Executor.

7.出現OOM的解決方式之一就是加大分片的數量。

8.一個分片資料進哪一個Executor ,主要取決於資料的本地性。

9.Executor 的並行度是可以繼承的,後面的任務並行數量會繼承前面的task數量,如果不進行類似reduceByKey類似的操作。

10.reduceByKey 執行的時候會只有一個任務,並行分片唯一,一般是執行在資料量比較大的節點上,資料本地性執行緒並不關心執行 什麼程式碼,執行緒是代表這資源,由於它不關心程式碼,所以task與Thread是解耦合的,所以Thread是可以複用的,

總結:當spark叢集啟動的時候,首先啟動master程序,負責整個叢集資源的管理和分配,並且接收作業的提交,為作業註冊、分配資源及每個工作節點預設情況都會啟動一個worker Process 來管理當前節點的cpu等計算資源(實際上是還是通過Master來管理的)。worker Process 它會向master彙報worker當前能夠工作。當用戶向Master提交作業的時候,master為會為程式分配ID,並分配資源,預設情況下會為當前的應用程式在每個worker Process 下面分配一個CoarseGrainedExecutorBackend程序,該程序預設情況下會最大化的使用當前節點上的記憶體和CPU。

流程圖如下:

二、從依賴上看spark的job執行原理

兩種依賴的示意圖如下,窄依賴、寬依賴:

寬依賴與窄依賴的定義:

示例如下圖:

如果是這樣的複雜的依賴關係會產生很多問題,嚴重的影響程式的效能:

1、Task太大。遇到Shuffle級別的依賴關係(寬依賴)必須要計算依賴的RDD的所有的Partitions,並且是發生在一個Task中。

2、重複計算。如上圖最終的結果有三個Partitions,所以因為Shuffle的緣由,計算了三次所有的依賴RDD.

3、假設考慮從後往前的依賴關係,我們設計演算法的角度看那些多次用到就需要cache(),這會造成了儲存的浪費。

-------雖然這個依賴有很多的記憶體浪費,計算重複等,,但是我們依然可以看出血統的影子pipeline(做函式展開),無法很好的實現是因為上述的假設的核心問題都是在遇到shuffle依賴的時候無法很好的進行pipeline.我們只能退而求其次,在遇到shuffle的時候我們就需要斷開,這樣一個個的Stage的劃分就清楚了。窄依賴加入,寬依賴斷開。

-------每個Stage裡面的Task的數量是由最後一個Stage的Partition的數量決定的

-------再次思考pipeline,即使採用pipeline的方式,函式f對依賴的RDD中的資料集合操作也會有兩種方式:

1,f(記錄),f 作用於集合的每一條記錄,每次只作用於一條資料(spark採用);

2,f(記錄),f一次性作用於集合的全部資料;

-------Spark採用第一種方式的原因:、

1、無需等待,可以最大化的使用叢集的計算資源。

2、減少OOM的發生

3、最大化的有利於併發

4、可以精確的控制每一個partiton本身極其內部的計算

5、基於lineage的運算元流動式函式程式設計,節省了中間結果的產生,並且可以最快的恢復;

三、從物理執行的角度看Spark的job執行

------Spark Application 裡面可以產生一個或者多個Job,例如spark-shell 預設啟動的時候內部就沒有Job,只是作為資源分配的程式,可以在spark-shell 裡面寫程式碼產生他若干個job,普通程式一般而言可以有不同的Action,每一個Action一般也會觸發一個job

------Spark是MapReduce思想的一種更加精緻和高效的實現,MapReduce有很多集體不同的實現,例如hadoop的MapReduce基本的計算流程如下:

1、首先是以JVM為物件的併發執行的Mappper,其中Map的執行會產生資料,輸出資料經過Partitioner指定的規則,放到當地的檔案系統中,然後經過Shuffle、sort、Affregate 變成reduce的輸入,執行reduce產生最終的執行結果。但是在執行迭代的時候,由於每次都要將reduce的結果存入HDFS,下次計算還要取出,,,這樣就造成了很多的侷限性。

2、Spark 演算法構造和物理執行時的核心思想之一就是:最大化pipeline(資料複用效果好)!基於Pipeline的思想,資料被使用的時候才開始計算,從資料流動的視覺來說,是資料流動到計算的位置!從邏輯的角度上來看是運算元在資料上流帶動。從演算法構建的角度而言,是運算元作用於資料,所以是運算元在資料上流動。從物理執行的角度而言,是資料流動到計算的位置。

--------每個Stage中除了最後一個RDD運算元是真實的以外,前面的運算元都是假的。在中間它會進行運算元合併。。。

--------由於計算的Lazy特性,導致計算從後往前回溯,形成Computing Chain ,導致的結果就是需要首先計算出具體一個Stage內部左側的RDD中本次計算依賴的Partition(只是邏輯上思考的結果,在所有的傳送給Executor之前,所有的運算元已經合併成了一個,現實中就是一個函式).計算發生的位置在最後的RDD。

------從後往前回溯形成計算鏈條,實際執行肯定是從前往後執行的。

-----窄依賴的物理執行

一個Stage內部的RDD都是窄依賴,窄依賴計算本身是邏輯上看從Stage內部最左側的RDD開始立即計算的,根據Computing Chain(從後往前回溯構建生成),資料(Record) 從一個計算步驟流動下一個結算步驟,以此類推,知道算到Stage內部最後一個RDD來產生計算結果。實際物理計算是讓資料從前往後再運算元上流動,知道不能流動時,就開始計算下一個Record。這就導致了一個比較好的方式:

後面的RDD對前面的RDD的依賴雖然是Partition級別的資料集合的依賴,但是並不需要父RDD把Partition中所有的Records計算完畢才整體往後流動資料進行計算,這就極大的提高了計算速率。

-----寬依賴的物理執行

必須等到依賴的父Stage中最後一個RDD全部資料徹底計算完畢,才能夠經過shuffle來計算當前的Stage.