大資料之Spark(三)--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析
阿新 • • 發佈:2018-11-09
一、Spark核心API ----------------------------------------------- [SparkContext] 連線到spark叢集,入口點. [HadoopRDD] extends RDD 讀取hadoop hdfs上的資料,hbase的資料,s3的資料 [MapPartitionsRDD] 分割槽RDD: 針對父RDD的每個分割槽,提供了函式,生成的新型別RDD. [PairRDDFunctions] 對偶RDD函式類。 可用於KV型別RDD的附加函式。可以通過隱式轉化得到. [ShuffleRDD] 從Shuffle中計算結果的RDD. [RDD] 是分割槽的集合. 彈性分散式資料集. 不可變的資料分割槽集合. 基本操作(map , filter , persist等) 分割槽列表 //資料 應用給每個切片的計算函式 //行為 到其他RDD的依賴列表 //依賴關係 (可選)針對kv型別RDD的分割槽類 (可選)首選位置列表 [DAGScheduler]排程器 1.排程器工作原理: 高階排程器層面,實現按照shuffle(網路間分發)劃分階段(stage). a.對每個JOB的各階段,計算出有向無環圖(DAG),並且跟蹤RDD和每個階段的輸出。 b.找出最小排程來合理的執行作業(優化) c.將Stage物件以TaskSet方式提交給底層的任務排程器TaskScheduler。 d.底層排程器TaskScheduler,實現在cluster上執行job. e.TaskSet已經包含了全部的單獨的task,這些Task都能夠基於cluster的資料進行正確執行。 2.Stage的建立過程 在shuffle的邊界處,打碎RDD Graph, 從而劃分Stage 3.RDD操作與Stage劃分 a.具有'窄依賴'的RDD操作(不需要shuffle的操作,比如map /filter),會被管道化至一個taskset中.只有一個Stage b.而具有shuffle(網路分發,一端輸出,一端讀取)依賴的操作, 則對應多個Stage c.每個stage中,都有一個針對其他stage的shuffle依賴,可以計算多個操作。 4.排程器排程流程 a.Dag排程器檢測首選位置來執行rask b.基於當前的快取狀態,將首選位置傳遞給底層的task排程器來實現合理的排程 5.故障處理 a.shuffle過程中丟失檔案而引發的故障,由DAG排程器處理 b.stage內因為丟失檔案引發的故障,由task排程器處理.在取消整個stage之前,task會進行少量次數的重試操作。 6.容錯 a.為了容錯,同一stage可能會執行多次,稱之為"attemp" b.如果task排程器報告了一個故障(該故障是由於上一個stage丟失輸出檔案而導致的,shuffle),DAG排程就會重新提交丟失的stage 這個故障是 通過具有 FetchFailed的CompletionEvent物件或者ExecutorLost進行檢測的。 c.DAG排程器會等待一段時間看其他節點或task是否失敗,然後對丟失的stage重新提交taskset,計算丟失的task。 二、Spark術語介紹 ------------------------------------------------------- [job] 提交給排程器的頂層的工作專案,由ActiveJob表示。 是Stage集合,一個job分為很多個階段。 [Stage] 是task的集合,計算job中的中間結果。每個階段可以有很多個任務 同一RDD的每個分割槽都會應用相同的計算函式。 Stage在shuffle的邊界處進行隔離(因此引入了隔斷,需要上一個stage完成後,才能得到output結果) 如果這些job共用了同一個rdd的話,Rdd的stage通常可以在這些job中共享。 有兩種型別的stage: 1)ResultStage,用於執行action動作的最終stage。 2)ShuffleMapStage,對shuffle進行輸出檔案的寫操作的。 Stage是並行任務的集合,都會計算同一函式。 內部所有task都有同樣的shuffle依賴, 每一個task都由排程器來執行,並且在shuffle邊界處劃分成不同階段。 排程器是以拓撲順序執行的. 每個stage可以shuffleMapStage,該階段下輸出是下一個stage的輸入。對於shuffleMapStage,需要跟蹤每個輸出分割槽所在的節點。 也可以是resultStage,該階段task直接執行spark action。 每個stage都有FirstJobId,區分於首次提交的id [ShuffleMapStage] 產生資料輸出,在每次shuffle之前發生。 內部含有shuffleDep欄位,和記錄產生多少輸出以及多少輸出可用的欄位 DAGScheduler.submitMapStage()方法可以單獨提交submitMapStage() [ResultStage] 該階段在RDD的一些分割槽中應用函式來計算Action的結果 有些stage並不會在所有分割槽上執行。例如first(),lookup(); [Task] 單獨的工作單元,每個傳送給一臺主機。 [Cache tracking] Dag排程器找出哪些RDD被快取,避免不必要的重複計算,同時,也會記住哪些shuffleMap已經輸出了 結果,避免map端shuffle的重複處理。 [Preferred locations] dag排程器根據rdd的中首選位置屬性計算task在哪裡執行。 [Cleanup] 執行的job如果完成就會清楚資料結構避免記憶體洩漏,主要是針對耗時應用。 [ActiveJob] 在Dag排程器中執行job。 作業分為兩種型別,主要使用finalStage欄位進行型別劃分。 1)result job,計算ResultStage來執行action. 2)map-state job,為shuffleMapState結算計算輸出結果以供下游stage使用。 job只跟蹤客戶端提交的"leaf" stage,通過呼叫Dag排程器的submitjob或者submitMapStage()方法實現. job型別引發之前stage的執行,而且多個job可以共享之前的stage。這些依賴關係由DAG排程器內部管理。 [LiveListenerBus] 非同步傳輸spark監聽事件到監聽器事件集合中。 [EventLoop] 從caller接受事件,在單獨的事件執行緒中處理所有事件,該類的唯一子類是DAGSchedulerEventProcessLoop。 [LiveListenerBus] 監聽器匯流排,存放Spark監聽器事件的佇列。用於監控。 [OutputCommitCoordinator] 輸出提交協調器.決定提交的輸出是否進入hdfs。 [TaskScheduler] 底層的排程器,唯一實現TaskSchedulerImpl。可插拔,同Dag排程器接受task,傳送給cluster, 執行任務,失敗重試,返回事件給DAG排程器。 [TaskSchedulerImpl] TaskScheduler排程器的唯一實現,通過BackendScheduler(後臺排程器)實現各種型別叢集的任務排程。 [SchedulerBackend] 可插拔的後臺排程系統,本地排程,mesos排程,。。。 在任務排程器之後啟動, 實現有三種 1.LocalSchedulerBackend 本地後臺排程器 啟動task. 2.StandaloneSchedulerBackend 獨立後臺排程器 3.CoarseGrainedSchedulerBackend 粗粒度後臺排程器 [Executor] spark程式執行者,通過執行緒池執行任務。 三、Spark三級排程流程原始碼分析 ------------------------------------------- 1.sc.textFile: val rdd1 = textFile(); --> new HadoopRDD --> val rdd1 = new MapParttionsRDD1 2.rdd1.flatMap: val rdd2 = rdd1.flatMap() --> val rdd2 = new MapParttionsRDD2 3.rdd1.map: val rdd3 = rdd2.map() --> val rdd3 = new MapParttionsRDD3 4.rdd1.reduceByKey: val rdd4 = rdd3.reduceByKey() --> 隱式函式rddToPairRDDFunction --> PairRDDFunctions物件 --> val rdd4 = new RDD 5.三級排程流程原始碼分析 [主執行緒]a. cilent ---建立---> sc = new SparkContext() -----> HadoopRDD hd = sc.textFile() --> 進行一系列[上述1-4]的變換 -----> 返回一個RDD物件給sc ---->返回RDD給Client [主執行緒]b. client拿到rdd -----> [client]rdd.collect() ----> [rdd.collect]sc.runJob() ----> sc.runJob.runJob(rdd,func,分割槽序列0...n) ----> ... ----> DAGSchedule.runJob [主執行緒]c. DAGSchedule.runJob ----> DAGSchedule.submit():返回一個JobSubmitted[Event]物件:對DAG排程器事件資訊的封裝 ----> DAGSchedule.DAGSchedulerEventProcessLoop.post():將JobSubmitted[Event]傳遞給事件輪詢器[DAGSchedulerEventProcessLoop]的EventQuene [EventLoopThread]d. DAGSchedulerEventProcessLoop:開啟分執行緒 -----> EventLoopThread ----> EventLoop.EventQuene.take():開始輪詢,取得event,並傳遞給DAGSchedulerEventProcessLoop ----> 一直輪詢,無休止迴圈執行緒 [EventLoopThread]e. DAGSchedulerEventProcessLoop.onReceive(event):收到事件event,並傳遞給DAGScheduler [EventLoopThread]f. DAGScheduler.doOnReceive(event):模式匹配,看事件型別,然後抽取event中封裝的資訊 ----> DAGScheduler.handelJobSubmitted(從event中抽取的資訊ed: func,rdd,partions...) ----> DAGScheduler.createResultStage(func,rdd,partions...):生成finalState ----> DAGScheduler.new ActiveJob(finalState,...): 返回job物件 ----> DAGScheduler.SparkListenerJobStart(job,...):spark監聽器事件 ----> DAGScheduler.SparkListenerBus.post():DAGScheduler通過post的方式將監聽器事件,提交給SparkListenerBus.eventQueue(活躍監聽匯流排的事件佇列),目的是事件的型別轉換,將排程器事件轉換成監聽器事件 ----> 開啟分執行緒LiveListenerBusThread,進行狀態監控 ----> 同時繼續執行緒h [LiveListenerBusThread]g. SparkListenerBus.eventQueue.poll():無休止迴圈輪詢,取得事件event ----> SpakeListenerBus.postToAll(event) ----> 開始監控整個event的進度和狀態 [分執行緒a]h. DAGScheduler.submitStage(finalStage): 遞迴呼叫,逐次提交final之前的state, 直至finalState之前的state(mapStates)都已經提交完畢 ----> DAGScheduler.submitMissingTasks(state,jobId):對state進行模式匹配,shuffleStage/resultState進行不同的一系列的處理,但是最終都將state轉換成taskSet任務集 ----> DAGScheduler.TaskScheduler.submitTasks(taskSet):DAGScheduler呼叫自身變數TaskScheduler的提交任務方法,將任務集提交給任務排程器TaskScheduler ----> 至此,開啟二級Task排程 [分執行緒a]i. TaskSchedulerImpl.submitTasks(taskSet) ----> TaskSchedulerImpl.submitTasks.createTaskManager():得到任務集管理器manager ----> TaskSchedulerImpl.submitTasks.schedulableBuilder.addTaskSetManger(manager) ----> .... ----> TaskSchedulerImpl.submitTasks.SchedulerBackend.reviveOffers():最終呼叫後臺排程器SchedulerBackend的reviveOffers方法 ----> 至此,開啟三級後臺排程 [分執行緒b]j. SchedulerBackend.reviveOffers() : 開啟程序間通訊,序列化tasks,得到訊息reviveOffers,並且根據程序型別開啟相應的後臺排程器程序[local,standlone,Coarse],並將訊息[reviveOffers]傳送給已經開啟的後臺排程器程序 ----> [以本地後臺排程器為例] ----> LocalSchedulerBackend.localEndPoint.send(reviveOffers) ----> RPCEndPoint通過Netty框架,開始傳送reviveOffers給對應的RPC終端localEndPoint[local,stand...] [分執行緒b]k. LocalSchedulerBackend.localEndPoint.receive() : 本地後臺排程器RPC終端,接收程序間的訊息 ----> LocalSchedulerBackend.reviveOffers():反序列化reviveOffers,取得排程器tasks ----> 遍歷tasks任務集,並將每個任務提交給Executor[執行程式] [分執行緒c]l. Executor.launchTask() :開啟任務,開始執行任務 ----> Executor.new TaskRunner():得到任務執行器tr ----> Executor.runningTasks.put(tr): 將tr put到一個任務列表中 ----> Executor.threadPool.executor(tr) : 在未來的某個時刻,執行tr.run(),在run函式中呼叫ShuffleMapTask.run()方法 [分執行緒c--主執行緒]m. ShuffleMapTask.run() ----> ShuffleMapTask.runTask() -----> ShuffleMapTask.new shuffleWriter().write() ----> 至此,終於開始呼叫在WorldCount client中定義的flatMap(x => x.spilt(" "))函式 6.總結: a.Cilent觸發rdd的Action行為,通過SpackContext提交作業,開啟DAGScheduler一級排程 b.DAGScheduler首先將SpackContent的作業資訊封裝成一個作業提交事件JobSubmitted[Event],然後新增到事件佇列中,然後開啟分執行緒開始輪詢該事件佇列 c.DAGScheduler.OnReceive取得輪詢取出的事件並檢視事件型別,根據事件型別,抽取事件中的資訊,然後進行一系列的封裝,最終封裝成一個State物件 d.DAGScheduler.submitStage,DAG排程器開始提交State給TaskScheduler,開啟二級排程。 e.一級排程總結就是:將SpackCotent提交的job資訊先封裝成Event,然後封裝成State,然後將State提交給二級排程 f.TaskScheduler,二級排程提交任務集,並且將任務集封裝成reviveOffers, 通過程序間通訊[Netty],將reviveOffers傳遞給相應的三級底層排程器[local,standlone,ceros...],開啟三級排程 g.三級排程首先建立一個TaskRunner執行緒,線上程中執行run函式,在run函式中對reviveOffers進行解析,然後將結果傳遞給ShuffleMapTask h.ShuffleMapTask開始新建混洗寫入器,開始shuffle操作,ShuffleMapTask.new shuffleWriter().write() i.至此開始呼叫自定義的WorldCount.flatMap函式