1. 程式人生 > >Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend

Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend

進行 text actor 類型 能夠 ext lang 運行 匯報

Spark Runtime裏的主要層次分析,梳理Runtime組件和運行流程,

技術分享

DAGScheduler

Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

面向stage的切分,切分依據為寬依賴

維護waiting jobs和active jobs。維護waiting stages、active stages和failed stages,以及與jobs的映射關系

主要職能

  1. 接收提交Job的主入口。submitJob(rdd, ...)
    runJob(rdd, ...)。在SparkContext裏會調用這兩個方法。
    • 生成一個Stage並提交,接著推斷Stage是否有父Stage未完畢,若有。提交並等待父Stage。以此類推。結果是:DAGScheduler裏添加了一些waiting stage和一個running stage。
    • running stage提交後。分析stage裏Task的類型,生成一個Task描寫敘述,即TaskSet。

    • 調用TaskScheduler.submitTask(taskSet, ...)方法,把Task描寫敘述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會為這個TaskSet分配資源並觸發運行。
    • DAGScheduler提交job後。異步返回JobWaiter對象。能夠返回job運行狀態,能夠cancel job,運行成功後會處理並返回結果
  2. 處理TaskCompletionEvent
    • 假設task運行成功,相應的stage裏減去這個task。做一些計數工作:
      • 假設task是ResultTask,計數器Accumulator加一。在job裏為該task置true,job finish總數加一。

        加完後假設finish數目與partition數目相等。說明這個stage完畢了,標記stage完畢。從running stages裏減去這個stage,做一些stage移除的清理工作

      • 假設task是ShuffleMapTask。計數器Accumulator
        加一,在stage裏加上一個output location。裏面是一個MapStatus類。MapStatusShuffleMapTask運行完畢的返回,包含location信息和block size(能夠選擇壓縮或未壓縮)。同一時候檢查該stage完畢,向MapOutputTracker註冊本stage裏的shuffleId和location信息。然後檢查stage的output location裏是否存在空。若存在空。說明一些task失敗了,整個stage又一次提交;否則,繼續從waiting stages裏提交下一個須要做的stage
    • 假設task是重提交,相應的stage裏添加這個task
    • 假設task是fetch失敗,立即標記相應的stage完畢。從running stages裏減去。

      假設不同意retry。abort整個stage;否則,又一次提交整個stage。

      另外,把這個fetch相關的location和map任務信息。從stage裏剔除,從MapOutputTracker註銷掉。最後,假設這次fetch的blockManagerId對象不為空,做一次ExecutorLost處理,下次shuffle會換在還有一個executor上去運行。

    • 其它task狀態會由TaskScheduler處理,如Exception, TaskResultLost, commitDenied等。

  3. 其它與job相關的操作還包含:cancel job, cancel stage, resubmit failed stage等

其它職能
1. cacheLocations 和 preferLocation

private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

TaskScheduler

維護task和executor相應關系,executor和物理資源相應關系。在排隊的task和正在跑的task。

內部維護一個任務隊列。依據FIFO或Fair策略,調度任務。

TaskScheduler本身是個接口,spark裏僅僅實現了一個TaskSchedulerImpl。理論上任務調度能夠定制。以下是TaskScheduler的主要接口:

def start(): Unit
def postStartHook() { }
def stop(): Unit
def submitTasks(taskSet: TaskSet): Unit
def cancelTasks(stageId: Int, interruptThread: Boolean)
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
    blockManagerId: BlockManagerId): Boolean

主要職能

  1. submitTasks(taskSet),接收DAGScheduler提交來的tasks
    • 為tasks創建一個TaskSetManager,加入到任務隊列裏。

      TaskSetManager跟蹤每一個task的運行狀況,維護了task的很多詳細信息。

    • 觸發一次資源的索要。
      • 首先。TaskScheduler對比手頭的可用資源和Task隊列。進行executor分配(考慮優先級、本地化等策略),符合條件的executor會被分配給TaskSetManager
      • 然後。得到的Task描寫敘述交給SchedulerBackend。調用launchTask(tasks)。觸發executor上task的運行。

        task描寫敘述被序列化後發給executor,executor提取task信息。調用task的run()方法運行計算。

  2. cancelTasks(stageId),取消一個stage的tasks
    • 調用SchedulerBackendkillTask(taskId, executorId, ...)方法。

      taskId和executorId在TaskScheduler裏一直維護著。

  3. resourceOffer(offers: Seq[Workers]),這是很重要的一個方法,調用者是SchedulerBacnend,用途是底層資源SchedulerBackend把空余的workers資源交給TaskScheduler。讓其依據調度策略為排隊的任務分配合理的cpu和內存資源。然後把任務描寫敘述列表傳回給SchedulerBackend
    • 從worker offers裏。搜集executor和host的相應關系、active executors、機架信息等等
    • worker offers資源列表進行隨機洗牌,任務隊列裏的任務列表依據調度策略進行一次排序
    • 遍歷每一個taskSet,依照進程本地化、worker本地化、機器本地化、機架本地化的優先級順序,為每一個taskSet提供可用的cpu核數,看是否滿足
      • 默認一個task須要一個cpu。設置參數為"spark.task.cpus=1"
      • 為taskSet分配資源,校驗是否滿足的邏輯,終於在TaskSetManagerresourceOffer(execId, host, maxLocality)方法裏
      • 滿足的話,會生成終於的任務描寫敘述。而且調用DAGSchedulertaskStarted(task, info)方法。通知DAGScheduler,這時候每次會觸發DAGScheduler做一次submitMissingStage的嘗試,即stage的tasks都分配到了資源的話,立即會被提交運行
  4. statusUpdate(taskId, taskState, data),還有一個很重要的方法,調用者是SchedulerBacnend,用途是SchedulerBacnend會將task運行的狀態匯報給TaskScheduler做一些決定
    • TaskLost,找到該task相應的executor。從active executor裏移除。避免這個executor被分配到其它task繼續失敗下去。

    • task finish包含四種狀態:finished, killed, failed, lost。僅僅有finished是成功運行完畢了。

      其它三種是失敗。

    • task成功運行完,調用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否則調用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)TaskResultGetter內部維護了一個線程池,負責異步fetch task運行結果並反序列化。默認開四個線程做這件事,可配參數"spark.resultGetter.threads"=4

TaskResultGetter取task result的邏輯

  • 對於success task。假設taskResult裏的數據是直接結果數據。直接把data反序列出來得到結果。假設不是。會調用blockManager.getRemoteBytes(blockId)從遠程獲取。

    假設遠程取回的數據是空的,那麽會調用TaskScheduler.handleFailedTask,告訴它這個任務是完畢了的可是數據是丟失的。

    否則。取到數據之後會通知BlockManagerMaster移除這個block信息,調用TaskScheduler.handleSuccessfulTask。告訴它這個任務是運行成功的。而且把result data傳回去。

  • 對於failed task。從data裏解析出fail的理由,調用TaskScheduler.handleFailedTask。告訴它這個任務失敗了,理由是什麽。

SchedulerBackend

TaskScheduler下層。用於對接不同的資源管理系統,SchedulerBackend是個接口。須要實現的主要方法例如以下:

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler。TaskScheduler依據調度策略分配給排隊的任務嗎,返回一批可運行的任務描寫敘述,SchedulerBackend負責launchTask,即終於把task塞到了executor模型上,executor裏的線程池會運行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
    throw new UnsupportedOperationException

粗粒度:進程常駐的模式。典型代表是standalone模式,mesos粗粒度模式,yarn

細粒度:mesos細粒度模式

這裏討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend

維護executor相關信息(包含executor的地址、通信端口、host、總核數。剩余核數),手頭上executor有多少被註冊使用了。有多少剩余,總共還有多少核是空的等等。

主要職能

  1. Driver端主要通過actor監聽和處理以下這些事件:
    • RegisterExecutor(executorId, hostPort, cores, logUrls)。這是executor加入的來源,通常worker拉起、重新啟動會觸發executor的註冊。CoarseGrainedSchedulerBackend把這些executor維護起來,更新內部的資源信息。比方總核數添加。最後調用一次makeOffer(),即把手頭資源丟給TaskScheduler去分配一次。返回任務描寫敘述回來。把任務launch起來。

      這個makeOffer()的調用會出如今不論什麽與資源變化相關的事件中,以下會看到。

    • StatusUpdate(executorId, taskId, state, data)。task的狀態回調。首先,調用TaskScheduler.statusUpdate上報上去。然後。推斷這個task是否運行結束了。結束了的話把executor上的freeCore加回去,調用一次makeOffer()
    • ReviveOffers

      這個事件就是別人直接向SchedulerBackend請求資源,直接調用makeOffer()

    • KillTask(taskId, executorId, interruptThread)。這個killTask的事件。會被發送給executor的actor,executor會處理KillTask這個事件。
    • StopExecutors。通知每一個executor,處理StopExecutor事件。

    • RemoveExecutor(executorId, reason)。從維護信息中,那這堆executor涉及的資源數減掉。然後調用TaskScheduler.executorLost()方法,通知上層我這邊有一批資源不能用了,你處理下吧。

      TaskScheduler會繼續把executorLost的事件上報給DAGScheduler,原因是DAGScheduler關心shuffle任務的output location。

      DAGScheduler會告訴BlockManager這個executor不可用了,移走它,然後把全部的stage的shuffleOutput信息都遍歷一遍,移走這個executor,而且把更新後的shuffleOutput信息註冊到MapOutputTracker上,最後清理下本地的CachedLocationsMap。

  2. reviveOffers()方法的實現。

    直接調用了makeOffers()方法,得到一批可運行的任務描寫敘述。調用launchTasks

  3. launchTasks(tasks: Seq[Seq[TaskDescription]])方法。
    • 遍歷每一個task描寫敘述。序列化成二進制。然後發送給每一個相應的executor這個任務信息
      • 假設這個二進制信息太大,超過了9.2M(默認的akkaFrameSize 10M 減去 默認 為akka留空的200K)。會出錯,abort整個taskSet。並打印提醒增大akka frame size
      • 假設二進制數據大小可接受,發送給executor的actor。處理LaunchTask(serializedTask)事件。

Executor

Executor是spark裏的進程模型。能夠套用到不同的資源管理系統上。與SchedulerBackend配合使用。

內部有個線程池,有個running tasks map,有個actor,接收上面提到的由SchedulerBackend發來的事件。

事件處理

  1. launchTask

    依據task描寫敘述。生成一個TaskRunner線程,丟盡running tasks map裏。用線程池運行這個TaskRunner

  2. killTask。從running tasks map裏拿出線程對象,調它的kill方法。

全文完 :)

Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend