1. 程式人生 > >Spark底層原理詳細解析(深度好文,建議收藏)

Spark底層原理詳細解析(深度好文,建議收藏)

### Spark簡介 Apache Spark是用於**大規模資料處理**的統一分析引擎,基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了**高容錯性**和**高可伸縮性**,允許使用者將Spark部署在大量硬體之上,形成叢集。 Spark原始碼從1.x的40w行發展到現在的超過100w行,有1400多位大牛貢獻了程式碼。整個Spark框架原始碼是一個巨大的工程。下面我們一起來看下spark的底層執行原理。 ### Spark執行流程 ![Spark執行流程](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210125_2.png) 具體執行流程如下: 1. SparkContext 向資源管理器註冊並向資源管理器申請執行Executor 2. 資源管理器分配Executor,然後資源管理器啟動Executor 3. Executor 傳送心跳至資源管理器 4. **SparkContext 構建DAG有向無環圖** 5. **將DAG分解成Stage(TaskSet)** 6. **把Stage傳送給TaskScheduler** 7. **Executor 向 SparkContext 申請 Task** 8. **TaskScheduler 將 Task 傳送給 Executor 執行** 9. **同時 SparkContext 將應用程式程式碼發放給 Executor** 10. Task 在 Executor 上執行,執行完畢釋放所有資源 #### 1. 從程式碼角度看DAG圖的構建 ``` Val lines1 = sc.textFile(inputPath1).map(...).map(...) Val lines2 = sc.textFile(inputPath2).map(...) Val lines3 = sc.textFile(inputPath3) Val dtinone1 = lines2.union(lines3) Val dtinone = lines1.join(dtinone1) dtinone.saveAsTextFile(...) dtinone.filter(...).foreach(...) ``` 上述程式碼的DAG圖如下所示: ![構建DAG圖](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210126_1.png) Spark核心會在需要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是如上圖所示的DAG。 **Spark 的計算髮生在RDD的Action操作,而對Action之前的所有Transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算**。 #### 2. 將DAG劃分為Stage核心演算法 一個Application可以有多個job多個Stage: Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,後面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢後,後面的Stage才會執行。 劃分依據: **Stage劃分的依據就是寬依賴**,像reduceByKey,groupByKey等運算元,會導致寬依賴的產生。 > 回顧下寬窄依賴的劃分原則: **窄依賴**:父RDD的一個分割槽只會被子RDD的一個分割槽依賴。即一對一或者多對一的關係,可理解為獨生子女。 常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned)等。 **寬依賴**:父RDD的一個分割槽會被子RDD的多個分割槽依賴(涉及到shuffle)。即一對多的關係,可理解為超生。 常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned)等。 **核心演算法:回溯演算法** **從後往前回溯/反向解析,遇到窄依賴加入本Stage,遇見寬依賴進行Stage切分。** Spark核心會從觸發Action操作的那個RDD開始**從後往前推**,首先會為最後一個RDD建立一個Stage,然後繼續倒推,如果發現對某個RDD是寬依賴,那麼就會將寬依賴的那個RDD建立一個新的Stage,那個RDD就是新的Stage的最後一個RDD。 然後依次類推,繼續倒推,根據窄依賴或者寬依賴進行Stage的劃分,直到所有的RDD全部遍歷完成為止。 #### 3. 將DAG劃分為Stage剖析 ![DAG劃分Stage](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_2.png) **一個Spark程式可以有多個DAG(有幾個Action,就有幾個DAG,上圖最後只有一個Action(圖中未表現),那麼就是一個DAG)**。 一個DAG可以有多個Stage(根據寬依賴/shuffle進行劃分)。 **同一個Stage可以有多個Task並行執行**(**task數=分割槽數**,如上圖,Stage1 中有三個分割槽P1、P2、P3,對應的也有三個 Task)。 可以看到這個DAG中只reduceByKey操作是一個寬依賴,Spark核心會以此為邊界將其前後劃分成不同的Stage。 同時我們可以注意到,在圖中Stage1中,**從textFile到flatMap到map都是窄依賴,這幾步操作可以形成一個流水線操作,通過flatMap操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行map操作,這樣大大提高了計算的效率**。 #### 4. 提交Stages 排程階段的提交,最終會被轉換成一個任務集的提交,DAGScheduler通過TaskScheduler介面提交任務集,這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的例項來管理這個任務集的生命週期,對於DAGScheduler來說,提交排程階段的工作到此就完成了。 而TaskScheduler的具體實現則會在得到計算資源的時候,進一步通過TaskSetManager排程具體的任務到對應的Executor節點上進行運算。 ![](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_3.png) #### 5. 監控Job、Task、Executor 1. DAGScheduler監控Job與Task: 要保證相互依賴的作業排程階段能夠得到順利的排程執行,DAGScheduler需要監控當前作業排程階段乃至任務的完成情況。 這通過對外暴露一系列的回撥函式來實現的,對於TaskScheduler來說,這些回撥函式主要包括任務的開始結束失敗、任務集的失敗,DAGScheduler根據這些任務的生命週期資訊進一步維護作業和排程階段的狀態資訊。 2. DAGScheduler監控Executor的生命狀態: TaskScheduler通過回撥函式通知DAGScheduler具體的Executor的生命狀態,**如果某一個Executor崩潰了,則對應的排程階段任務集的ShuffleMapTask的輸出結果也將標誌為不可用,這將導致對應任務集狀態的變更,進而重新執行相關計算任務,以獲取丟失的相關資料**。 #### 6. 獲取任務執行結果 1. 結果DAGScheduler: 一個具體的任務在Executor中執行完畢後,其結果需要以某種形式返回給DAGScheduler,根據任務型別的不同,任務結果的返回方式也不同。 2. 兩種結果,中間結果與最終結果: 對於FinalStage所對應的任務,返回給DAGScheduler的是運算結果本身。 而對於中間排程階段對應的任務ShuffleMapTask,返回給DAGScheduler的是一個MapStatus裡的相關儲存資訊,而非結果本身,這些儲存位置資訊將作為下一個排程階段的任務獲取輸入資料的依據。 3. 兩種型別,**DirectTaskResult與IndirectTaskResult**: 根據任務結果大小的不同,ResultTask返回的結果又分為兩類: 如果結果足夠小,則直接放在DirectTaskResult物件內中。 如果超過特定尺寸則在Executor端會將DirectTaskResult先序列化,再把序列化的結果作為一個數據塊存放在BlockManager中,然後將BlockManager返回的BlockID放在IndirectTaskResult物件中返回給TaskScheduler,TaskScheduler進而呼叫TaskResultGetter將IndirectTaskResult中的BlockID取出並通過BlockManager最終取得對應的DirectTaskResult。 #### 7. 任務排程總體詮釋 **一張圖說明任務總體排程:** ![任務總體排程](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_4.png) ### Spark執行架構特點 #### 1. Executor程序專屬 **每個Application獲取專屬的Executor程序,該程序在Application期間一直駐留,並以多執行緒方式執行Tasks**。 Spark Application不能跨應用程式共享資料,除非將資料寫入到外部儲存系統。如圖所示: ![Executor程序專屬](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_5.png) #### 2. 支援多種資源管理器 Spark與資源管理器無關,只要能夠獲取Executor程序,並能保持相互通訊就可以了。 Spark支援資源管理器包含: Standalone、On Mesos、On YARN、Or On EC2。如圖所示: ![支援多種資源管理器](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_6.png) #### 3. Job提交就近原則 **提交SparkContext的Client應該靠近Worker節點(執行Executor的節點)**,最好是在同一個Rack(機架)裡,因為Spark Application執行過程中SparkContext和Executor之間有大量的資訊交換; 如果想在遠端叢集中執行,最好使用RPC將SparkContext提交給叢集,**不要遠離Worker執行SparkContext**。 如圖所示: ![Job提交就近原則](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_7.png) #### 4. 移動程式而非移動資料的原則執行 **移動程式而非移動資料的原則執行,Task採用了資料本地性和推測執行的優化機制**。 關鍵方法:taskIdToLocations、getPreferedLocations。 如圖所示: ![資料本地性](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210127_8.png) #### 搜尋公眾號:五分鐘學大資料,深度鑽研大資料