1. 程式人生 > >Spark原始碼學習(4)——Scheduler

Spark原始碼學習(4)——Scheduler

本文要解決的問題:
從scheduler各個類的具體方法閱讀原始碼,進一步瞭解Spark的scheduler的工作原理和過程。

Scheduler的基本過程

使用者提交的Job到DAGScheduler後,會封裝成ActiveJob,同時啟動JobWaiter監聽作業的完成情況。同時依據job中RDD的dependency和dependency屬性(窄依賴NarrowDependency,寬依賴ShufflerDependecy),DAGScheduler會根據依賴關係的先後產生出不同的stage DAG(result stage, shuffle map stage)。在每一個stage內部,根據stage產生出相應的task,包括ResultTask或是ShuffleMapTask,這些task會根據RDD中partition的數量和分佈,產生出一組相應的task,並將其包裝為TaskSet提交到TaskScheduler上去。

DAGScheduler

DAGScheduler是高層級別的排程器。實現了stage-oriented排程。它計算一個DAG中stage的工作。並將這些stage輸出落地物化。

最終提交stage以taskSet方式提交給TaskScheduler。DAGScheduler需要接收上下層的訊息,它也是一個actor。這裡主要看看他的一些事件處理。以下是的所處理的事件。

private[scheduler] case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_])
=> _,
partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent private[scheduler] case class JobCancelled(jobId: Int) extends
DAGSchedulerEvent
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent private[scheduler] case object AllJobsCancelled extends DAGSchedulerEvent private[scheduler] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent 還有很多,不一一羅列。

JobSubmitted

//處理提交的作業,完成Job到stage的 轉換,生成finalStage,並SubmitStage。
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

進入submitStage方法。submitStage提交stage,第一個提交的是沒有父依賴關係的。

/** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //沒有stage依賴
          submitMissingTasks(stage, jobId.get)
        } else {
        //有父依賴遞迴處理父stage
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

如果計算中發現當前的stage沒有任何的依賴關係。則直接提交task。

原始碼中的getMissingParentStages是獲取父stage。原始碼如下:

//查詢所有的stage封裝成list集合
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
            //如果是ShufflerDependecy,則新建一個ShuffleMapStage,且該stage是可用的話,則加入missing中
            //ShufflerDependecy表示Shuffle過程的依賴
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

繼續submitStage,進入submitMissingTasks方法。該方法將stage根據parition拆分成task。然後生成TaskSet,並提交到TaskScheduler。該方法在之前有貼出來過,這裡就不貼出來了。

DAGScheduler的主要功能:

1、接收使用者提交的job。

2、以stage的形式劃分job,並記錄物化的stage。在stage內產生的task以taskSet的方式提交給taskScheduler。

TaskScheduler

TaskScheduler低級別的任務排程程式的介面,目前由TaskSchedulerImpl完全實現。該介面允許插入不同的任務排程。TaskScheduler接收DAGScheduler提交的taskSet,並負責傳送任務到叢集上執行。

TaskScheduler會根據部署方式而選擇不同的SchedulerBackend來處理。針對不同部署方式會有不同的TaskScheduler與SchedulerBackend進行組合:

  • Local模式:TaskSchedulerImpl+ LocalBackend

  • Spark叢集模式:TaskSchedulerImpl+ SparkDepolySchedulerBackend

  • Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend

  • Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend

TaskScheduler類負責任務排程資源的分配,SchedulerBackend負責與Master、Worker通訊收集Worker上分配給該應用使用的資源情況。

TaskSchedulerImpl
TaskSchedulerImpl類就是負責為Task分配資源的。在CoarseGrainedSchedulerBackend獲取到可用資源後就會通過makeOffers方法通知TaskSchedulerImpl對資源進行分配。

TaskSchedulerImpl的resourceOffers方法就是負責為Task分配計算資源的,在為Task分配好資源後又會通過lauchTasks方法傳送LaunchTask訊息通知Worker上的Executor執行Task。

下面看下TaskSchedulerImpl中的幾個方法。
1.initialize:

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

initialize方法主要就是初始化選擇排程模式,這個可以由使用者自己配置。

2.Start

override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

3.submitTasks

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

TaskScheduler中實際執行task時會呼叫Backend.reviveOffers,在spark內有多個不同的backend。

Stage

一個stage是一組由相同函式計算出來的任務集合,它執行spark上的job。這裡所有的任務都有相同的shuffle依賴。每個stage都是map函式計算,shuffle隨機產生的,在這種情況下,它的任務的結果被輸給stage,或者其返回一個stage,在這種情況下,它的任務直接計算髮起的作業的動作(例如,count()),save()等)。都是ShuffleMapStage我們也可以跟蹤每個節點上的輸出分割槽。

Stage的構造如下:

private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite)

Task

Task: 一個執行單元,在Spark有兩種實現:

org.apache.spark.scheduler.ShuffleMapTask

org.apache.spark.scheduler.ResultTask

一個Spark工作會包含一個或者多個stages。一個ResultTask執行任務,並將任務輸出driver應用。一個ShuffleMapTask執行的任務,並把任務輸出到多個buckets(基於任務的分割槽)

1.TaskSet
由TaskScheduler提交的一組Task集合

2.TaskSetManager
在TaskSchedulerImpl單內使用taskset排程任務.此類跟蹤每個任務,重試任務如果失敗(最多的有限次數),並經由延遲排程處理區域性性感知排程此使用taskset。其主要介面有它resourceOffer,它要求使用taskset是否願意在一個節點上執行一個任務,statusUpdate,它告訴它其任務之一狀態發生了改變

private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet,
    val maxTaskFailures: Int,
    clock: Clock = new SystemClock())
  extends Schedulable with Logging {

  val conf = sched.sc.conf

方法addPendingTask

新增一個任務的所有沒有被執行的任務列表,它是PendingTask。原始碼如下。

private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
              logInfo(s"Pending task $index has a cached location at ${e.host} " +
                ", where there are executors " + set.mkString(","))
            case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                ", but there are no executors alive there.")
          }
        case _ =>
      }
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }

    if (tasks(index).preferredLocations == Nil) {
      pendingTasksWithNoPrefs += index
    }

    allPendingTasks += index  // No point scanning this whole list to find the old task there
  }

resourceOffer

解決如何在taskset內部schedule一個task。原始碼如下:

 override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
    stateLock.synchronized {
      if (stopCalled) {
        logDebug("Ignoring offers during shutdown")
        // Driver should simply return a stopped status on race
        // condition between this.stop() and completing here
        offers.asScala.map(_.getId).foreach(d.declineOffer)
        return
      }

      logDebug(s"Received ${offers.size} resource offers.")

      val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
        val offerAttributes = toAttributeMap(offer.getAttributesList)
        matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
      }

      declineUnmatchedOffers(d, unmatchedOffers)
      handleMatchedOffers(d, matchedOffers)
    }
  }

Conf

Property Name Default Meaning
spark.task.cpus 1 Number of cores to allocate for each task.
spark.task.maxFailures 4 Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.
spark.scheduler.mode FIFO he scheduling mode between jobs submitted to the same SparkContext. Can be set to FAIR to use fair sharing instead of queueing jobs one after another. Useful for multi-user services.
spark.cores.max (not set) application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark’s standalone cluster manager, or infinite (all available cores) on Mesos.
spark.mesos.coarse false If set to “true”, runs over Mesos clusters in “coarse-grained” sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job
spark.speculation false If set to “true”, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
spark.speculation.interval 100 How often Spark will check for tasks to speculate, in milliseconds.
spark.speculation.quantile 0.75 Percentage of tasks which must be complete before speculation is enabled for a particular stage.
spark.speculation.multiplier 1.5 How many times slower a task is than the median to be considered for speculation.
spark.locality.wait 3000 Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
spark.locality.wait.process spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
spark.locality.wait.node spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
spark.locality.wait.rack spark.locality.wait Customize the locality wait for rack locality.
spark.scheduler.revive.interval 1000 The interval length for the scheduler to revive the worker resource offers to run tasks (in milliseconds).
spark.scheduler.minRegisteredResourcesRatio 0.0 for Mesos and Standalone mode, 0.8 for YARN The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode) to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredResourcesWaitingTime.
spark.scheduler.maxRegisteredResourcesWaitingTime 30000 Maximum amount of time to wait for resources to register before scheduling begins (in milliseconds).
spark.localExecution.enabled false Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver.

相關推薦

Spark原始碼學習4——Scheduler

本文要解決的問題: 從scheduler各個類的具體方法閱讀原始碼,進一步瞭解Spark的scheduler的工作原理和過程。 Scheduler的基本過程 使用者提交的Job到DAGScheduler後,會封裝成ActiveJob,同時啟動Job

Vue原始碼學習4——資料響應系統

Vue原始碼學習(4)——資料響應系統:通過initData() 看資料響應系統     下面,根據理解我們寫一個簡略的原始碼:參考 治癒watcher在:vm.$mount(vm.$options.el)    Function de

Spark原始碼學習---Master和Worker的啟動以及Actor通訊流程

在《Spark原始碼學習(一)》中通過Spark的啟動指令碼,我們看到Spark啟動Master的時候實際上是啟動了org.apache.spark.deploy.master.Master,下面我們就從這2個類入手,通過閱讀Spark的原始碼,瞭解Spark的啟動流程。

Spark學習4——scala條件控制與迴圈

if表示式 If表示式的定義:在scala中,if表示式是有值得,就是if或者else中最後一行語句返回的值 例如:val age =30;if(age>18) 1 else 0 執行結果: 將if表示式賦值給一個變數:val  result = if(age&

java util包學習4 Arrays 原始碼分析

package java.util; import java.lang.reflect.*; public class Arrays { // Suppresses default constructor, ensuring non-instantiabil

HLS學習HLSDownloader原始碼分析4解析Master PlayList

解析Master PlayList     PlayList就是m3u8檔案或者索引檔案,Master PlayList也叫一級索引檔案。 解析Master PlayList的過程如下: 1、

構建之法學習4

控制 重要 protect 運算 包裝 二義性 lin c++ 基類 本周學習的內容是兩人合作 計算機只關心編譯生成的機器碼,你的程序采用哪種縮進風格,變量名有無統一的規範等,與機器碼的執行無關。但是,做一個有商業價值的項目,或者在團隊裏工作,代碼規範相當重要。“代碼規

基於Qt的OpenGL可編程管線學習4- 使用Subroutine繪制不同光照的模型

qt opengl shader subroutine 使用Subroutine在shader中封裝不同的函數,在CPU端選擇調用那個函數效果如下圖所示左側:環境光中間:環境光 + 漫反射右側:環境光 + 漫反射 + 高光1、Subroutine 在shader中的內容subroutine v

Java學習4:統計一個文件中的英文,中文,數字,其他字符以及字符總數

port let args str reader 文件路徑 要求 cnblogs pub 要求:統計一個文件中的英文,中文,數字,其他字符以及字符總數(此隨筆以txt文件為例) import java.io.BufferedReader; import java.io.F

Vue深度學習4-方法與事件處理器

() 一個 span 修飾 語句 特殊變量 方法 left stop 方法處理器 可以用 v-on 指令監聽 DOM 事件: <div id="app"> <button v-on:click = "greet">Greet<

Spark機器學習

控制 常用 nbsp 建立 判斷 測試數據 話題 with 分享圖片 1、機器學習概念 1.1 機器學習的定義 在維基百科上對機器學習提出以下幾種定義:l“機器學習是一門人工智能的科學,該領域的主要研究對象是人工智能,特別是如何在經驗學習中改善具體算法的性能”。l“機

maven--學習4--創建java項目

system key maven 要求 導入項目 一段 ring following start 1. 從 Maven 模板創建一個項目 在終端(* UNIX或Mac)或命令提示符(Windows)中,瀏覽到要創建 Java 項目的文件夾。鍵入以下命令: mvn arche

perl學習4正則表達式處理文本

接收 沒有 per tdi 自動 int con class 開頭 一:  進行本章學習前的一些知識儲備 1:  Perl最喜歡用的默認變量:$_(摘於perl語言入門78頁)    假如在foreach循環開頭省略空置變量,Perl就會用它最喜歡的默認變量$_。

Spring框架學習4spring整合hibernate

location host mage too 自動 exception 4.0 數據庫連接 find 內容源自:spring整合hibernate spring整合註解形式的hibernate 這裏和上一部分學習一樣用了模板模式, 將hibernate開發流程封裝在O

caffe的python接口學習4mnist實例手寫數字識別

數字 interval with lac EDA 變化 mode 指數 lB 以下主要是摘抄denny博文的內容,更多內容大家去看原作者吧   一 數據準備   準備訓練集和測試集圖片的列表清單;   二 導入caffe庫,設定文件路徑    # -*- coding: u

Spring學習4IOC容器配置bean:定義與實例化

dimp 工廠類 def 流程 行為 更多 多個 scrip 編譯報錯 一. IOC容器配置   1. 一些概念   (1)IOC容器:   定義:具有管理對象和管理對象之間的依賴關系的容器。   作用:應用程序無需自己創建對象,對象由IOC容器創建並組裝。BeanFac

python學習4

相等 append 求值 4.3 urn erro utf read pri 按位運算 按位 & | ^ ~ >> 3&33>> 3&11>> 2|13 異或^相同為0,不用為1 Is判斷兩個對象是否相等,兩個

Linux基礎學習4

在一起 src info 鏈接 常用 auto ali 分享圖片 ls -l 第四章——Linux常用命令 一.文件處理命令 1.命令格式與目錄處理命令ls: (1)命令格式:命令 [-選項] [參數] 例:ls -la /etc

區塊鏈學習4交易

圖靈 比特幣 生成 入棧 部分 pub png 組合 數字 上次大致講了比特幣的交易模式。接著講講比特幣的交易。比特幣的交易驗證引擎依賴於兩類腳本來驗證比特幣交易:一個鎖定腳本和一個解鎖腳本。 鎖定腳本鎖定了一個輸出值,同時它明確了今後花費這筆輸出的條件。鎖定腳本往往含有一

python學習4--字符串格式化之format()方法

light 網站 pytho com date 其中 格式化字符串 ont python 一、格式化字符串的函數 str.format()增強了字符串格式化的功能。通過 {} 和 : 來代替以前的 % 。 其中format 函數可以接受不限個參數,位置可以不按順序。 st