1. 程式人生 > >大資料之Spark(三)--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析

大資料之Spark(三)--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析

一、Spark核心API
-----------------------------------------------
    [SparkContext]
        連線到spark叢集,入口點.

    [HadoopRDD] extends RDD
        讀取hadoop hdfs上的資料,hbase的資料,s3的資料

    [MapPartitionsRDD]
        分割槽RDD: 針對父RDD的每個分割槽,提供了函式,生成的新型別RDD.

    [PairRDDFunctions]
        對偶RDD函式類。
        可用於KV型別RDD的附加函式。可以通過隱式轉化得到.

    [ShuffleRDD]
        從Shuffle中計算結果的RDD.

    [RDD]
        是分割槽的集合.
        彈性分散式資料集.
        不可變的資料分割槽集合.
        基本操作(map , filter , persist等)
        分割槽列表               //資料
        應用給每個切片的計算函式   //行為
        到其他RDD的依賴列表          //依賴關係
        (可選)針對kv型別RDD的分割槽類
        (可選)首選位置列表

    [DAGScheduler]排程器
        1.排程器工作原理:
            高階排程器層面,實現按照shuffle(網路間分發)劃分階段(stage).
            a.對每個JOB的各階段,計算出有向無環圖(DAG),並且跟蹤RDD和每個階段的輸出。
            b.找出最小排程來合理的執行作業(優化)
            c.將Stage物件以TaskSet方式提交給底層的任務排程器TaskScheduler。
            d.底層排程器TaskScheduler,實現在cluster上執行job.
            e.TaskSet已經包含了全部的單獨的task,這些Task都能夠基於cluster的資料進行正確執行。

        2.Stage的建立過程
            在shuffle的邊界處,打碎RDD Graph, 從而劃分Stage

        3.RDD操作與Stage劃分
            a.具有'窄依賴'的RDD操作(不需要shuffle的操作,比如map /filter),會被管道化至一個taskset中.只有一個Stage
            b.而具有shuffle(網路分發,一端輸出,一端讀取)依賴的操作, 則對應多個Stage
            c.每個stage中,都有一個針對其他stage的shuffle依賴,可以計算多個操作。

        4.排程器排程流程
            a.Dag排程器檢測首選位置來執行rask
            b.基於當前的快取狀態,將首選位置傳遞給底層的task排程器來實現合理的排程

        5.故障處理
            a.shuffle過程中丟失檔案而引發的故障,由DAG排程器處理
            b.stage內因為丟失檔案引發的故障,由task排程器處理.在取消整個stage之前,task會進行少量次數的重試操作。

        6.容錯
            a.為了容錯,同一stage可能會執行多次,稱之為"attemp"

            b.如果task排程器報告了一個故障(該故障是由於上一個stage丟失輸出檔案而導致的,shuffle),DAG排程就會重新提交丟失的stage
            這個故障是 通過具有 FetchFailed的CompletionEvent物件或者ExecutorLost進行檢測的。

            c.DAG排程器會等待一段時間看其他節點或task是否失敗,然後對丟失的stage重新提交taskset,計算丟失的task。


二、Spark術語介紹
-------------------------------------------------------
    [job]
        提交給排程器的頂層的工作專案,由ActiveJob表示。
        是Stage集合,一個job分為很多個階段。

    [Stage]
        是task的集合,計算job中的中間結果。每個階段可以有很多個任務
        同一RDD的每個分割槽都會應用相同的計算函式。
        Stage在shuffle的邊界處進行隔離(因此引入了隔斷,需要上一個stage完成後,才能得到output結果)
        如果這些job共用了同一個rdd的話,Rdd的stage通常可以在這些job中共享。

        有兩種型別的stage:
            1)ResultStage,用於執行action動作的最終stage。
            2)ShuffleMapStage,對shuffle進行輸出檔案的寫操作的。

        Stage是並行任務的集合,都會計算同一函式。
        內部所有task都有同樣的shuffle依賴,
        每一個task都由排程器來執行,並且在shuffle邊界處劃分成不同階段。
        排程器是以拓撲順序執行的.

        每個stage可以shuffleMapStage,該階段下輸出是下一個stage的輸入。對於shuffleMapStage,需要跟蹤每個輸出分割槽所在的節點。
        也可以是resultStage,該階段task直接執行spark action。

        每個stage都有FirstJobId,區分於首次提交的id

    [ShuffleMapStage]
        產生資料輸出,在每次shuffle之前發生。
        內部含有shuffleDep欄位,和記錄產生多少輸出以及多少輸出可用的欄位
        DAGScheduler.submitMapStage()方法可以單獨提交submitMapStage()

    [ResultStage]
        該階段在RDD的一些分割槽中應用函式來計算Action的結果
        有些stage並不會在所有分割槽上執行。例如first(),lookup();

    [Task]
        單獨的工作單元,每個傳送給一臺主機。

    [Cache tracking]
        Dag排程器找出哪些RDD被快取,避免不必要的重複計算,同時,也會記住哪些shuffleMap已經輸出了
        結果,避免map端shuffle的重複處理。

    [Preferred locations]
        dag排程器根據rdd的中首選位置屬性計算task在哪裡執行。

    [Cleanup]
        執行的job如果完成就會清楚資料結構避免記憶體洩漏,主要是針對耗時應用。

    [ActiveJob]
        在Dag排程器中執行job。

        作業分為兩種型別,主要使用finalStage欄位進行型別劃分。
            1)result job,計算ResultStage來執行action.
            2)map-state job,為shuffleMapState結算計算輸出結果以供下游stage使用。

        job只跟蹤客戶端提交的"leaf" stage,通過呼叫Dag排程器的submitjob或者submitMapStage()方法實現.
        job型別引發之前stage的執行,而且多個job可以共享之前的stage。這些依賴關係由DAG排程器內部管理。

    [LiveListenerBus]
        非同步傳輸spark監聽事件到監聽器事件集合中。

    [EventLoop]
        從caller接受事件,在單獨的事件執行緒中處理所有事件,該類的唯一子類是DAGSchedulerEventProcessLoop。

    [LiveListenerBus]
        監聽器匯流排,存放Spark監聽器事件的佇列。用於監控。

    [OutputCommitCoordinator]
        輸出提交協調器.決定提交的輸出是否進入hdfs。


    [TaskScheduler]
        底層的排程器,唯一實現TaskSchedulerImpl。可插拔,同Dag排程器接受task,傳送給cluster,
        執行任務,失敗重試,返回事件給DAG排程器。

    [TaskSchedulerImpl]
        TaskScheduler排程器的唯一實現,通過BackendScheduler(後臺排程器)實現各種型別叢集的任務排程。


    [SchedulerBackend]
        可插拔的後臺排程系統,本地排程,mesos排程,。。。
        在任務排程器之後啟動,
        實現有三種
        1.LocalSchedulerBackend
            本地後臺排程器
            啟動task.

        2.StandaloneSchedulerBackend
            獨立後臺排程器

        3.CoarseGrainedSchedulerBackend
            粗粒度後臺排程器

    [Executor]
        spark程式執行者,通過執行緒池執行任務。


三、Spark三級排程流程原始碼分析
-------------------------------------------
    1.sc.textFile:
        val rdd1 = textFile(); --> new HadoopRDD -->  val rdd1 = new MapParttionsRDD1

    2.rdd1.flatMap:
        val rdd2 = rdd1.flatMap() --> val rdd2 = new MapParttionsRDD2

    3.rdd1.map:
        val rdd3 = rdd2.map() --> val rdd3 = new MapParttionsRDD3

    4.rdd1.reduceByKey:
        val rdd4 = rdd3.reduceByKey() --> 隱式函式rddToPairRDDFunction --> PairRDDFunctions物件
         --> val rdd4 = new RDD

    5.三級排程流程原始碼分析
        [主執行緒]a. cilent ---建立--->  sc = new SparkContext()  -----> HadoopRDD hd = sc.textFile() --> 進行一系列[上述1-4]的變換
            -----> 返回一個RDD物件給sc ---->返回RDD給Client

        [主執行緒]b. client拿到rdd -----> [client]rdd.collect() ----> [rdd.collect]sc.runJob() ----> sc.runJob.runJob(rdd,func,分割槽序列0...n)
            ----> ... ----> DAGSchedule.runJob

        [主執行緒]c. DAGSchedule.runJob ---->  DAGSchedule.submit():返回一個JobSubmitted[Event]物件:對DAG排程器事件資訊的封裝
            ----> DAGSchedule.DAGSchedulerEventProcessLoop.post():將JobSubmitted[Event]傳遞給事件輪詢器[DAGSchedulerEventProcessLoop]的EventQuene

        [EventLoopThread]d. DAGSchedulerEventProcessLoop:開啟分執行緒 -----> EventLoopThread
            ----> EventLoop.EventQuene.take():開始輪詢,取得event,並傳遞給DAGSchedulerEventProcessLoop
            ----> 一直輪詢,無休止迴圈執行緒

        [EventLoopThread]e. DAGSchedulerEventProcessLoop.onReceive(event):收到事件event,並傳遞給DAGScheduler

        [EventLoopThread]f. DAGScheduler.doOnReceive(event):模式匹配,看事件型別,然後抽取event中封裝的資訊
            ----> DAGScheduler.handelJobSubmitted(從event中抽取的資訊ed: func,rdd,partions...)
            ----> DAGScheduler.createResultStage(func,rdd,partions...):生成finalState
            ----> DAGScheduler.new ActiveJob(finalState,...): 返回job物件
            ----> DAGScheduler.SparkListenerJobStart(job,...):spark監聽器事件
            ----> DAGScheduler.SparkListenerBus.post():DAGScheduler通過post的方式將監聽器事件,提交給SparkListenerBus.eventQueue(活躍監聽匯流排的事件佇列),目的是事件的型別轉換,將排程器事件轉換成監聽器事件
            ----> 開啟分執行緒LiveListenerBusThread,進行狀態監控
            ----> 同時繼續執行緒h

        [LiveListenerBusThread]g. SparkListenerBus.eventQueue.poll():無休止迴圈輪詢,取得事件event ----> SpakeListenerBus.postToAll(event)
            ----> 開始監控整個event的進度和狀態

        [分執行緒a]h. DAGScheduler.submitStage(finalStage): 遞迴呼叫,逐次提交final之前的state, 直至finalState之前的state(mapStates)都已經提交完畢
            ----> DAGScheduler.submitMissingTasks(state,jobId):對state進行模式匹配,shuffleStage/resultState進行不同的一系列的處理,但是最終都將state轉換成taskSet任務集
            ----> DAGScheduler.TaskScheduler.submitTasks(taskSet):DAGScheduler呼叫自身變數TaskScheduler的提交任務方法,將任務集提交給任務排程器TaskScheduler
            ----> 至此,開啟二級Task排程

        [分執行緒a]i. TaskSchedulerImpl.submitTasks(taskSet) ----> TaskSchedulerImpl.submitTasks.createTaskManager():得到任務集管理器manager
            ----> TaskSchedulerImpl.submitTasks.schedulableBuilder.addTaskSetManger(manager) ----> ....
            ----> TaskSchedulerImpl.submitTasks.SchedulerBackend.reviveOffers():最終呼叫後臺排程器SchedulerBackend的reviveOffers方法
            ----> 至此,開啟三級後臺排程

        [分執行緒b]j. SchedulerBackend.reviveOffers() : 開啟程序間通訊,序列化tasks,得到訊息reviveOffers,並且根據程序型別開啟相應的後臺排程器程序[local,standlone,Coarse],並將訊息[reviveOffers]傳送給已經開啟的後臺排程器程序
            ----> [以本地後臺排程器為例] ----> LocalSchedulerBackend.localEndPoint.send(reviveOffers) ----> RPCEndPoint通過Netty框架,開始傳送reviveOffers給對應的RPC終端localEndPoint[local,stand...]

        [分執行緒b]k. LocalSchedulerBackend.localEndPoint.receive() : 本地後臺排程器RPC終端,接收程序間的訊息 ----> LocalSchedulerBackend.reviveOffers():反序列化reviveOffers,取得排程器tasks
            ----> 遍歷tasks任務集,並將每個任務提交給Executor[執行程式]

        [分執行緒c]l. Executor.launchTask() :開啟任務,開始執行任務 ----> Executor.new TaskRunner():得到任務執行器tr ----> Executor.runningTasks.put(tr): 將tr put到一個任務列表中
            ----> Executor.threadPool.executor(tr) : 在未來的某個時刻,執行tr.run(),在run函式中呼叫ShuffleMapTask.run()方法

        [分執行緒c--主執行緒]m. ShuffleMapTask.run() ----> ShuffleMapTask.runTask() -----> ShuffleMapTask.new shuffleWriter().write()
            ----> 至此,終於開始呼叫在WorldCount client中定義的flatMap(x => x.spilt(" "))函式

    6.總結:
        a.Cilent觸發rdd的Action行為,通過SpackContext提交作業,開啟DAGScheduler一級排程

        b.DAGScheduler首先將SpackContent的作業資訊封裝成一個作業提交事件JobSubmitted[Event],然後新增到事件佇列中,然後開啟分執行緒開始輪詢該事件佇列

        c.DAGScheduler.OnReceive取得輪詢取出的事件並檢視事件型別,根據事件型別,抽取事件中的資訊,然後進行一系列的封裝,最終封裝成一個State物件

        d.DAGScheduler.submitStage,DAG排程器開始提交State給TaskScheduler,開啟二級排程。

        e.一級排程總結就是:將SpackCotent提交的job資訊先封裝成Event,然後封裝成State,然後將State提交給二級排程

        f.TaskScheduler,二級排程提交任務集,並且將任務集封裝成reviveOffers, 通過程序間通訊[Netty],將reviveOffers傳遞給相應的三級底層排程器[local,standlone,ceros...],開啟三級排程

        g.三級排程首先建立一個TaskRunner執行緒,線上程中執行run函式,在run函式中對reviveOffers進行解析,然後將結果傳遞給ShuffleMapTask

        h.ShuffleMapTask開始新建混洗寫入器,開始shuffle操作,ShuffleMapTask.new shuffleWriter().write()

        i.至此開始呼叫自定義的WorldCount.flatMap函式