1. 程式人生 > >Spark原始碼分析之三:Stage劃分

Spark原始碼分析之三:Stage劃分

        Stage劃分的大體流程如下圖所示:


        前面提到,對於JobSubmitted事件,我們通過呼叫DAGScheduler的handleJobSubmitted()方法來處理。那麼我們先來看下程式碼:

// 處理Job提交的函式
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    
    // 利用最後一個RDD(finalRDD),建立最後的stage物件:finalStage
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // 根據最後一個RDD獲取最後的stage
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    // 建立一個ActiveJob物件
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    
    // 清除RDD分割槽位置快取
    // private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
    clearCacheLocs()
    
    // 呼叫logInfo()方法記錄日誌資訊
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    
    // 將jobId-->ActiveJob的對應關係新增到HashMap型別的資料結構jobIdToActiveJob中去
    jobIdToActiveJob(jobId) = job
    
    // 將ActiveJob新增到HashSet型別的資料結構activeJobs中去
    activeJobs += job
    
    finalStage.setActiveJob(job)
    
    //2 獲取stageIds列表
    // jobIdToStageIds儲存的是jobId--stageIds的對應關係
    // stageIds為HashSet[Int]型別的
    // jobIdToStageIds在上面newResultStage過程中已被處理
    val stageIds = jobIdToStageIds(jobId).toArray
    // stageIdToStage儲存的是stageId-->Stage的對應關係
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    
    // 提交最後一個stage
    submitStage(finalStage)

    // 提交其他正在等待的stage
    submitWaitingStages()
  }
        這個handleJobSubmitted()方法一共做了這麼幾件事:

        第一,呼叫newResultStage()方法,生成Stage,包括最後一個Stage:ResultStage和前面的Parent Stage:ShuffleMapStage;

        第二,建立一個ActiveJob物件job;

        第三,清除RDD分割槽位置快取;

        第四,呼叫logInfo()方法記錄日誌資訊;

        第五,維護各種資料對應關係涉及到的資料結構:

        (1)將jobId-->ActiveJob的對應關係新增到HashMap型別的資料結構jobIdToActiveJob中去;

        (2)將ActiveJob新增到HashSet型別的資料結構activeJobs中去;

        第六,提交Stage;

        下面,除了提交Stage留在第三階段外,我們挨個分析第二階段的每一步。

        首先是呼叫newResultStage()方法,生成Stage,包括最後一個Stage:ResultStage和前面的Parent Stage:ShuffleMapStage。程式碼如下:

/**
   * Create a ResultStage associated with the provided jobId.
   * 用提供的jobId建立一個ResultStage
   */
  private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    
    // 根據fianl RDD獲取parent stage及id,這個id為ResultStage的stageId
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    
    // 建立一個ResultStage,即為整個Job的finalStage
    // 引數:id為stage的id,rdd為stage中最後一個rdd,func為在分割槽上執行的函式操作,
    // partitions為rdd中可以執行操作的分割槽,parentStages為該stage的父stages,jobId為該stage
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    
    // 將stage加入到stageIdToStage中
    stageIdToStage(id) = stage
    
    // 更新資料結構jobIdToStageIds
    updateJobIdStageIdMaps(jobId, stage)
    
    // 返回stage
    stage
  }
        首先,根據fianl RDD獲取parent stages及id,這個id為ResultStage的stageId;

        其次,建立一個ResultStage,即為整個Job的finalStage;

        然後,將stage加入到資料結構stageIdToStage中;

        接著,更新資料結構jobIdToStageIds;

        最後,返回這個ResultStage。

        我們一步步來看。首先呼叫getParentStagesAndId()方法,根據fianl RDD獲取parent stages及id,這個id為ResultStage的stageId。程式碼如下:

/**
   * Helper function to eliminate some code re-use when creating new stages.
   */
  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    // 獲取parent stages
    val parentStages = getParentStages(rdd, firstJobId)
    
    // 獲取下一個stageId,為AtomicInteger型別,getAndIncrement()能保證原子操作
    val id = nextStageId.getAndIncrement()
    
    // 返回parentStages和id
    (parentStages, id)
  }
        這個id即為下一個stageId,通過AtomicInteger型別的getAndIncrement()獲得,能夠保證原子性。繼續分析getParentStages()方法,通過它來獲取final RDD的parent stage。程式碼如下:
/**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    // 用HashSet儲存parents stage
    val parents = new HashSet[Stage]
    
    // 用HashSet儲存已經被訪問過的RDD
    val visited = new HashSet[RDD[_]]
    
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 儲存需要被處理的RDD。Stack中得RDD都需要被處理
    val waitingForVisit = new Stack[RDD[_]]
    
    // 定義一個visit函式,根據傳入的RDD,如果之前沒有處理過,標記為已處理,迴圈此RDD的依賴關係dependencies
    // 如果是ShuffleDependency,獲取其parents;如果不是,則說明為同一stage,並壓入Stack:waitingForVisit頂部
    def visit(r: RDD[_]) {
      if (!visited(r)) {// visited中沒有的話
        // 將RDD r加入到visited,表示已經處理過了
        visited += r
        
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        // 迴圈Rdd r的依賴關係
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              // 如果是ShuffleDependency,獲取其parents,新增到parents中去
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              // 否則,屬於同一個stage,壓入Stack頂部,後續再遞迴處理
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    
    // 將rdd壓入Stack頂部
    waitingForVisit.push(rdd)
    
    // 迴圈waitingForVisit,彈出每個rdd
    while (waitingForVisit.nonEmpty) {
      // 呼叫visit()方法,處理每個rdd
      visit(waitingForVisit.pop())
    }
    
    // 返回得到的parents列表
    parents.toList
  }

        getParentStages()方法在其內部定義瞭如下資料結構:

        parents:用HashSet儲存parents stages,即finalRDD的所有parent stages,也就是ShuffleMapStage;

        visited:用HashSet儲存已經被訪問過的RDD,在RDD被處理前先存入該HashSet,保證儲存在裡面的RDD將不會被重複處理;

        waitingForVisit:儲存需要被處理的RDD。Stack中得RDD都需要被處理。

        getParentStages()方法在其內部還定義了一個visit()方法,傳入一個RDD,如果之前沒有處理過,標記為已處理,並迴圈此RDD的依賴關係dependencies,如果是ShuffleDependency,呼叫getShuffleMapStage()方法獲取其parent stage;如果不是,則說明為同一stage,並壓入Stack:waitingForVisit頂部,等待後續通過visit()方法處理。所以,getParentStages()方法從finalRDD開始,逐漸往上查詢,如果是窄依賴,證明在同一個Stage中,繼續往上查詢,如果是寬依賴,通過getShuffleMapStage()方法獲取其parent stage,就能得到整個Job中所有的parent stages,也就是ShuffleMapStage。

        接下來,我們看下getShuffleMapStage()方法的實現。程式碼如下:

/**
   * Get or create a shuffle map stage for the given shuffle dependency's map side.
   * 針對給定的shuffle dependency的map端,獲取或者建立一個ShuffleMapStage
   */
  private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    
    // 從資料結構shuffleToMapStage中根據shuffleId獲取,如果有直接返回,否則
    // 獲取ShuffleDependency中的rdd,呼叫getAncestorShuffleDependencies()方法,
    // 迴圈每個parent,呼叫newOrUsedShuffleStage()方法,建立一個新的ShuffleMapStage,
    // 並加入到資料結構shuffleToMapStage中去
    // 
    // 它的定義為:private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage // 有則直接返回
      case None => // 沒有
        // We are going to register ancestor shuffle dependencies
        // 呼叫getAncestorShuffleDependencies()方法,傳入ShuffleDependency中的rdd
        
        // 發現還沒有在shuffleToMapStage中註冊的祖先shuffle dependencies
        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // 並迴圈返回的parents,呼叫newOrUsedShuffleStage()方法,建立一個新的ShuffleMapStage,
          // 並加入到資料結構shuffleToMapStage中去
          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
        }
        
        // Then register current shuffleDep
        // 最後註冊當前shuffleDep,並加入到資料結構shuffleToMapStage中,返回stage
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        stage
    }
  }

        從getShuffleMapStage()方法的註釋就能看出,這個方法的主要作用就是針對給定的shuffle dependency的map端,獲取或者建立一個ShuffleMapStage。為何是Get or create呢?通過原始碼得知,getShuffleMapStage()方法首先會根據shuffleDep.shuffleId從資料結構shuffleToMapStage中查詢哦是否存在對應的stage,如果存在則直接返回,如果不存在,則呼叫newOrUsedShuffleStage()方法建立一個Stage並新增到資料結構shuffleToMapStage中,方便後續需要使用此Stage者直接使用。在此之前,會根據入參ShuffleDependency的rdd發現還沒有在shuffleToMapStage中註冊的祖先shuffle dependencies,然後遍歷每個ShuffleDependency,呼叫newOrUsedShuffleStage()方法為每個ShuffleDependency產生Stage並新增到資料結構shuffleToMapStage中。

        下面,我們看下這個getAncestorShuffleDependencies()方法的實現,程式碼如下:

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
  // 根據傳入的RDD,發現還沒有在shuffleToMapStage中未註冊過的祖先shuffle dependencies
  private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    
    // 存放parents的棧:Stack
    val parents = new Stack[ShuffleDependency[_, _, _]]
    
    // 存放已經處理過的RDD的雜湊表:HashSet
    val visited = new HashSet[RDD[_]]
    
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 存放等待呼叫visit的RDD的棧:Stack
    val waitingForVisit = new Stack[RDD[_]]
    
    // 定義方法visit()
    def visit(r: RDD[_]) {
      if (!visited(r)) {// 如果之前沒有處理過
        visited += r // 標記為已處理
        
        // 迴圈RDD的所有依賴
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => // 如果是ShuffleDependency
              // 如果shuffleToMapStage中沒有,新增到parents中
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
                parents.push(shufDep)
              }
            case _ =>
          }
          
          // 將該dependence的rdd壓入waitingForVisit棧頂部
          waitingForVisit.push(dep.rdd)
        }
      }
    }

    // 將RDD壓入到waitingForVisit頂部
    waitingForVisit.push(rdd)
    // 迴圈waitingForVisit,針對每個RDD呼叫visit()方法
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    
    // 返回parents
    parents
  }
        通過程式碼我們可以發現,它和getParentStages()方法的程式碼風格非常相似。在其內部也定義了三個資料結構:

        parents:存放parents的棧,即Stack,用於存放入參RDD的在shuffleToMapStage中未註冊過的祖先shuffle dependencies;

        visited:存放已經處理過的RDD的雜湊表,即HashSet;

        waitingForVisit:存放等待被處理的RDD的棧,即Stack;

        定義了一個visit()方法,入參為RDD,針對傳入的RDD,如果之前沒有處理過則標記為已處理,並迴圈RDD的所有依賴,如果是如果是ShuffleDependency,並且其依賴的shuffleId在shuffleToMapStage中沒有,新增到parents中,否則直接跳過,最後無論為何種Dependency,都將該dependence的rdd壓入waitingForVisit棧頂部,等待後續處理。

        接下來,我們再看下newOrUsedShuffleStage()方法,其程式碼如下:

/**
   * Create a shuffle map Stage for the given RDD.  The stage will also be associated with the
   * provided firstJobId.  If a stage for the shuffleId existed previously so that the shuffleId is
   * present in the MapOutputTracker, then the number and location of available outputs are
   * recovered from the MapOutputTracker
   *
   * 為給定的RDD建立一個ShuffleStage
   */
  private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    
    // 從shuffleDep中獲取RDD 
    val rdd = shuffleDep.rdd
    
    // 獲取RDD的分割槽個數,即未來的task數目
    val numTasks = rdd.partitions.length
    
    // 構造一個ShuffleMapStage例項
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
    
    
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // 如果mapOutputTracker中存在
    
      // 根據shuffleId從mapOutputTracker中獲取序列化的多個MapOutputStatus物件
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      
      // 反序列化
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      
      // 迴圈
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          // 將
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      // 如果mapOutputTracker中不存在,註冊一個
    
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      // 註冊的內容為
      // 1、根據shuffleDep獲取的shuffleId;
      // 2、rdd中分割槽的個數
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }
        這個方法的主要完成了以下兩件事:

        1、構造一個ShuffleMapStage例項stage;

        2、判斷是否在mapOutputTracker中存在:

           (1)如果不存在,呼叫mapOutputTracker的registerShuffle()方法註冊一個,註冊的內容為根據shuffleDep獲取的shuffleId和rdd中分割槽的個數;

           (2)如果存在,根據shuffleId從mapOutputTracker中獲取序列化的多個MapOutputStatus物件,反序列化後迴圈,逐個新增到stage中。

        緊接著,看下newShuffleMapStage()方法,其程式碼如下:

/**
   * Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
   * newOrUsedShuffleStage.  The stage will be associated with the provided firstJobId.
   * Production of shuffle map stages should always use newOrUsedShuffleStage, not
   * newShuffleMapStage directly.
   */
  private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    
    // 獲得parentStages和下一個stageId
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
    
    // 建立一個ShuffleMapStage
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      firstJobId, callSite, shuffleDep)

    // 將stage加入到資料結構stageIdToStage
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(firstJobId, stage)
    stage
  }
        可以發現,這個方法也呼叫了getParentStagesAndId()方法,這樣,就形成了一個遞迴,按照RDD的依賴關係,由後往前,逐漸生成Stage。程式碼剩餘的部分就是建立一個ShuffleMapStage,並將stage加入到資料結構stageIdToStage,以及呼叫updateJobIdStageIdMaps()方法更新相關資料結構。這個updateJobIdStageIdMaps()方法留待下面分析。

        下面,簡單看下mapOutputTracker註冊的程式碼。

// 註冊shuffle
  def registerShuffle(shuffleId: Int, numMaps: Int) {
    // 將shuffleId、numMaps大小和MapStatus型別的Array陣列的對映關係,放入mapStatuses中
    // mapStatuses為TimeStampedHashMap[Int, Array[MapStatus]]型別的資料結構
    if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
    }
  }
        很簡單,將shuffleId、numMaps大小和MapStatus型別的Array陣列的對映關係,放入mapStatuses中,mapStatuses為TimeStampedHashMap[Int, Array[MapStatus]]型別的資料結構。
       經歷了這多又長又大篇幅的敘述,現在返回newResultStage()方法,在通過getParentStagesAndId()方法獲取parent stages及其result stage的id後,緊接著建立一個ResultStage,並將stage加入到stageIdToStage中,最後在呼叫updateJobIdStageIdMaps()更新資料結構jobIdToStageIds後,返回stage。

        下面,簡單看下updateJobIdStageIdMaps()方法。程式碼如下:

/**
   * Registers the given jobId among the jobs that need the given stage and
   * all of that stage's ancestors.
   */
  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
    // 定義一個函式updateJobIdStageIdMapsList()
    def updateJobIdStageIdMapsList(stages: List[Stage]) {
      
      if (stages.nonEmpty) {
        
        // 獲取列表頭元素
        val s = stages.head
        
        // 將jobId新增到Stage的jobIds中
        s.jobIds += jobId
        
        // 更新jobIdToStageIds,將jobId與stageIds的對應關係新增進去
        jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
        
        val parents: List[Stage] = getParentStages(s.rdd, jobId)
        
        val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
        updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
      }
    }
    // 呼叫函式updateJobIdStageIdMapsList()
    updateJobIdStageIdMapsList(List(stage))
  }

        這個方法的實現比較簡單,在其內部定義了一個函式updateJobIdStageIdMapsList(),首選傳入result stage,將jobId新增到stage的jobIds中,更新jobIdToStageIds,將jobId與stageIds的對應關係新增進去,然後根據給定stage的RDD獲取其parent stages,過濾出不包含此JobId的parents stages,再遞迴呼叫updateJobIdStageIdMapsList()方法,直到全部stage都處理完。

        至此,第二階段Stage劃分大體流程已分析完畢,有遺漏或不清楚的地方,以後再查缺補漏以及細化及更正錯誤。