1. 程式人生 > >Spark(三)————作業提交流程

Spark(三)————作業提交流程

1、Spark核心API

[SparkContext]
連線到spark叢集,入口點.

[RDD]
它是一個分割槽的集合.
彈性分散式資料集.
不可變的資料分割槽集合.
基本操作(map filter , persist)
分割槽列表 //資料
應用給每個切片的計算函式 //行為
到其他RDD的依賴列表 //依賴關係
(可選)針對kv型別RDD的分割槽類
(可選)首選位置列表

[HadoopRDD]
讀取hadoop上的資料

[MapPartitionsRDD]
針對父RDD的每個分割槽提供了函式構成的新型別RDD.

[PairRDDFunctions]
對偶RDD函式類,可用於KV型別RDD的附加函式,可以通過隱式轉化得到.

[ShuffleRDD]
從Shuffle中計算結果的RDD.

[DAGScheduler]
高階排程器層面,實現按照階段(stage),建立按照shuffle.
對每個Job的各階段計算有向無環圖(DAG),並且跟蹤RDD和每個階段的輸出。
找出最小排程執行作業,將Stage物件以TaskSet方式提交給底層的排程器。
底層排程器實現TaskScheduler,進而在cluster上執行job。
TaskSet已經包含了全部的單獨的task,這些Task都能夠基於cluster的資料進行正確執行。

Stage通過在需要shuffle的邊界處將RDD打碎來建立Stage物件。
具有’窄依賴’的RDD操作(比如map /filter)被管道化至一個taskset中,而具有shuffle依賴的操作則包含多個Stage(一個進行輸出,另一個進行輸入),最終,每個stage都有一個針對其他stage的shuffle依賴,可以計算多個操作。

Dag排程器檢測首選位置來執行task,通過基於當前的快取狀態,並傳遞給底層的
task排程器來實現。根據shuffle的輸出是否丟失處理故障問題。不是由stage內因為丟失檔案引發的故障有task排程處理。在取消整個stage之前,task會進行少量次數的重試操作。

為了容錯,同一stage可能會執行多次,稱之為”attemp”,如果task排程器報告了一個故障(該故障是由於上一個stage丟失輸出檔案而導致的)DAG排程就會重新提交丟失的stage。這個通過具有 FetchFailed的CompletionEvent物件或者ExecutorLost進行檢測的。DAG排程器會等待一段時間看其他節點或task是否失敗,然後對丟失的stage重新提交taskset,計算丟失的task。
這裡寫圖片描述
2、相關術語介紹

[job]
提交給排程的頂層的工作專案,由ActiveJob表示,是Stage集合。

[Stage]
是task的集合,計算job中的中間結果。同一RDD的每個分割槽都會應用相同的計算函式。
在shuffle的邊界處進行隔離(因此引入了隔斷,需要上一個stage完成後,才能得到output結果)有兩種型別的stage:1)ResultStage,用於執行action動作的最終stage。2)ShuffleMapStage,對shuffle進行輸出檔案的寫操作的。如果job重用了同一個rdd的話,stage通常可以跨越多個job實現共享。

並行任務的集合,都會計算同一函式。所有task有著同樣的shuffle依賴,排程器執行的task DAG在shuffle邊界處劃分成不同階段。排程器以拓撲順序執行。

每個stage可以是shuffleMapStage,該階段下輸出是下一個stage的輸入,也可以是resultStage,該階段task直接執行spark action。對於shuffleMapStage,需要跟蹤每個輸出分割槽所在的節點。
每個stage都有FirstJobId,區分於首次提交的id。

[ShuffleMapStage]
產生輸出資料,在每次shuffle之前發生。內部含有shuffleDep欄位,有相關欄位記錄產生多少輸出以及多少輸出可用。
DAGScheduler.submitMapStage()方法可以單獨提交submitMapStage()。

[ResultStage]
該階段在RDD的一些分割槽中應用函式來計算Action的結果。有些stage並不會在所有分割槽上執行。例如first(),lookup();

[Task]
單獨的工作單元,每個傳送給一臺主機。作業是階段的集合,階段是任務的集合。

[Cache tracking]
Dag排程器找出哪些RDD被快取,避免不必要的重複計算,同時,也會記住哪些shuffleMap已經輸出了結果,避免map端shuffle的重複處理。

[Preferred locations]
dag排程器根據rdd中的首選位置屬性計算task在哪裡執行。

[Cleanup]
執行的job如果完成就會清楚資料結構避免記憶體洩漏,主要是針對耗時應用。

[ActiveJob]
在Dag排程器中執行job。作業分為兩種型別:
1)result job,計算ResultStage來執行action.
2)map-state job,為shuffleMapState結算計算輸出結果以供下游stage使用。主要使用finalStage欄位進行型別劃分。

job只跟蹤客戶端提交的”leaf” stage,通過呼叫Dag排程器的submitjob或者submitMapStage()方法實現。
job型別引發之前stage的執行,而且多個job可以共享之前的stage。這些依賴關係由DAG排程器內部管理。

[LiveListenerBus]
監聽器匯流排,存放Spark監聽器事件的佇列,用於監控。非同步傳輸spark監聽事件到監聽器事件集合中。

[EventLoop]
從caller接受事件,在單獨的事件執行緒中處理所有事件,該類的唯一子類是DAGSchedulerEventProcessLoop。

[OutputCommitCoordinator]
輸出提交協調器.決定提交的輸出是否進入hdfs。

[TaskScheduler]
底層的排程器,唯一實現TaskSchedulerImpl。可插拔,同Dag排程器接受task,傳送給cluster,執行任務,失敗重試,返回事件給DAG排程器。

[TaskSchedulerImpl]
TaskScheduler排程器的唯一實現,通過BackendScheduler(後臺排程器)實現各種型別叢集的任務排程。

[SchedulerBackend]
可插拔的後臺排程系統,本地排程,mesos排程,。。。
在任務排程器下方,實現有三種:
1.LocalSchedulerBackend
本地後臺排程器
啟動task.
這裡寫圖片描述
2.StandaloneSchedulerBackend
獨立後臺排程器
3.CoarseGrainedSchedulerBackend
粗粒度後臺排程器
這裡寫圖片描述
[Executor]
spark程式執行者,通過執行緒池執行任務。
這裡寫圖片描述
這裡寫圖片描述
3、Dependency:依賴
這裡寫圖片描述

NarrowDependency(窄依賴):  子RDD的每個分割槽依賴於父RDD的少量分割槽。
         |
        / \
        ---
         |----  OneToOneDependency      //父子RDD之間的分割槽存在一對一關係。
         |----  RangeDependency         //父RDD的一個分割槽範圍和子RDD存在一對一關係。

    ShuffleDependency                   //依賴,在shuffle階段輸出時的一種依賴。

    PruneDependency                     //在PartitionPruningRDD和其父RDD之間的依賴
                                        //子RDD包含了父RDD的分割槽子集。

4、建立Spark上下文

[本地模式,通過執行緒模擬]
本地後臺排程器

spark local[3]                          //3執行緒,模擬cluster叢集
spark local[*]                          //匹配cpu個數,
spark local[3,2]                        //3:3個執行緒,2最多重試次數。

[相當於偽分散式]

StandaloneSchedulerBackend
spark local-cluster[N, cores, memory]   //模擬spark叢集。

[完全分散式]

StandaloneSchedulerBackend
spark spark://s201:7077                 //連線到spark叢集上.

maxFailures
最多失敗次數:
    1           //0和1等價,只執行一次。
    2           //如果失敗,最多執行兩次.

5、RDD持久化

跨操作進行RDD的記憶體式儲存。
持久化RDD時,節點上的每個分割槽都會儲存操作在記憶體中,以備在其他操作中進行重用。
快取技術是迭代式計算和互動式查詢的重要工具。使用persist()和cache()進行rdd的持久化。cache()是persist()一種。action第一次計算時會發生persist()。
spark的cache是容錯的,如果rdd的任何一個分割槽丟失了,都可以通過最初建立rdd的進行重新計算(RDD儲存的是程,不是資料)。
persist可以使用不同的儲存級別進行持久化。

MEMORY_ONLY         //只在記憶體
MEMORY_AND_DISK
MEMORY_ONLY_SER     //記憶體儲存(序列化)
MEMORY_AND_DISK_SER 
DISK_ONLY           //硬碟
MEMORY_ONLY_2       //帶有副本,2個
MEMORY_AND_DISK_2   //快速容錯。
OFF_HEAP 

rdd.unpersist();    //刪除持久化資料

資料傳遞: map(),filter()高階函式中訪問的物件被序列化到各個節點。每個節點都有一份拷貝。變數值並不會回傳到driver程式。

共享變數

spark通過廣播變數和累加器實現共享變數。
    [廣播變數]
        //建立廣播變數
        val bc1 = sc.broadcast(Array(1,2,3))
        bc1.value

    [累加器]
        val ac1 = sc.longaccumulator("ac1")
        ac1.value
        sc.parell..(1 to 10).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_)
        ac1.value           //10