1. 程式人生 > >spark2原理分析-Stage的實現原理

spark2原理分析-Stage的實現原理

概述

本文介紹Spark任務執行框架中Stage的原理,並分析其實現機制。

Stage的基本概念

一個Stage是一個並行任務(Task實體)集,它們執行相同的計算邏輯,並作為Spark任務執行的一部分,所有的任務都具有相同的shuffle依賴。

排程器執行的每個任務DAG,在shuffle的邊界處(發生shuffling時)被分解成多個stage,然後DAGScheduler以拓撲順序執行這些階段(Stage)。

前面一篇介紹Job的文章中提到過,Job中的分割槽對應RDD的分割槽,而在Spark中RDD中的一個分割槽對應了Stage中的一個任務,它屬於一個RDD用於計算執行函式的部分結果,這些結果作為Spark Job的一部分。
在這裡插入圖片描述

DAGScheduler將一個Job分成Stage集。每個Stage包含一系列narrow transformations(窄轉換),這些轉換操作可以在不進行shuffling的情況下完成,這些階段在shuffle的邊界(例如:shuffle發生的地方)處被分離,因此,可以說Stage是RDD graph在shuffle階段分裂的結果。

在每個階段RDD的窄轉換(例如:map()或filter()等)操作被pipeline(多個任務形成流水線,中間結果在記憶體中,以便加快計算效能)成一個任務集,但是shuffle操作卻需要依賴多個Stage。

在Stage生命中的某個時間點,Stage的每個分割槽都會轉換為一個Tasks - 分別為ShuffleMapStage和ResultStage的ShuffleMapTask和ResultTask。

分割槽在Job中計算,並且結果階段可能並不總是需要計算其目標RDD中的所有分割槽,例如,對於first()和lookup()等操作。

在這裡插入圖片描述

Stage的實現合約

在Spark中,有兩種不同的Stage,實現這兩種型別的Stage需要遵循Stage的實現合約(實現Stage抽象類)。
Stage的抽象類宣告如下:

private[scheduler] abstract class Stage

為了能夠更好的理解後面的兩種具體的Stage,下面對Stage的抽象類的成員做一個說明:

成員名 型別 說明
id Int Stage的id,是Stage的唯一標識
rdd RDD[_] 此Stage依賴的Stage列表(通過shuffle依賴關係得到)
numTasks Int 此Stage的總任務數。
parents List[Stage] 此Stage依賴的Stage列表
firstJobId Int 對於FIFO排程來說,此變數是此Stage屬於的第一個Job的ID
callSite CallSite 使用者編寫的程式中與該Stage相關的部分
numPartitions Int RDD的分割槽數
jobIds 此Stage屬於的Job集
nextAttemptId 此Stage下一次嘗試的ID
_latestInfo 返回最近的Stage資訊結構:StageInfo
fetchFailedAttemptIds 記錄Stage嘗試失敗的次數
makeNewStageAttempt 為該Stage建立一個新的StageInfo和ID
latestInfo 返回該Stage最新的StageInfo資訊
findMissingPartitions 返回需要計算(missing)但還沒計算的分割槽id集合

對於這些成員,有幾個需要重點說明:

  • numTasks

    該成員代表的是Stage的總任務數。但對於result stages可能不需要計算所有分割槽,例如:first(),lookup(),take()等。

ResultStage

ResultStages階段在RDD的各個分割槽上執行一些功能函式,來處理RDD的Action轉換的執行結果。

該實體的定義如下:

private[spark] class ResultStage(
    id: Int,
    rdd: RDD[_],
    val func: (TaskContext, Iterator[_]) => _,
    val partitions: Array[Int],
    parents: List[Stage],
    firstJobId: Int,
    callSite: CallSite)
  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

ResultStage物件會捕獲函式func去執行,並把該函式應用引數’partitions’表示的分割槽ID集中的每一個分割槽。

注意:ResultStage是Job的最後一個Stage。
在提交Stage時會先遞迴找到該Stage依賴的父級Stage,並先提交父級Stage。

ShuffleMapStage

在執行Stage的DAG(有向無環圖)中,ShuffleMapStage是一箇中間階段,為其他階段生成資料。

它為一個shuffle過程產生map操作的輸出檔案。它也可能是自適應查詢規劃/自適應排程工作的最後階段。
在這裡插入圖片描述

Stage的提交過程

通過上一節的分析可知,在提交Job時會先建立一個ResultStage,再根據RDD的血緣關係(lineage)查詢與ResultStage相關聯的RDD的分割槽,再根據這些分割槽來建立新的Stage。
下圖是Job提交的總體流程:
在這裡插入圖片描述

下圖是提交Stage的流程:
在這裡插入圖片描述
在提交Stage時,也是通過遞迴提交最先依賴的Stage,最後提交ResultStage。實現過程如下:

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    
    ...
      // 建立一個階段:最後的一個階段
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    ...
    
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    
    // 向事件處理匯流排發起SparkListenerJobStart事件(會在後面的文章中講到)
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
      
    // 提交最後一個階段
    submitStage(finalStage)  

我們看一下階段的提交的實現,階段提交在submitStage函式中實現:

/** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    
    if (jobId.isDefined) {
      ...
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        // 查詢並獲取依賴的父Stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        ...
        
        if (missing.isEmpty) {
           ...
          // 已經找到全部的依賴Stage並已提交,最後提交最後一個Stage
          submitMissingTasks(stage, jobId.get)
        } else {
          // 先提交依賴的父Stage
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

從提交Stage的程式碼實現中可以看出,先建立最後一個Stage,而在提交時,提交Stage時會先提交依賴的父Stage。

總結

本文說明了Stage的實現原理,並對Stage的提交過程進行了分析。