1. 程式人生 > >Spark:TaskScheduler原理剖析與原始碼分析

Spark:TaskScheduler原理剖析與原始碼分析

TaskScheduler是一個介面,DAGScheduler在提交TaskSet給底層排程器的時候是面向介面TaskScheduler。TaskSchduler的核心任務是提交Taskset到叢集運算並彙報結果

原始碼分析
第一步:TaskScheduler 提交tasks的入口 submitTasks
原始碼地址:org.apache.spark.scheduler.TaskSchedulerImpl.scala

  /**
   * TaskScheduler提交任務入口
   */
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      /**
       * 為每一個taskSet建立一個taskSetManager
       * taskSetManager實際上,會負責它的TaskSet執行狀況的監視與管理
       * 當tasks失敗的時候重試task,(直到超過重試次數限制)
       */
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      // 構建一個<stageId,HashMap<stageAttemptId,TaskSetManager>對映
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      // 將這個建立TaskSetManager放入到對映中
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists {
        case (_, ts) =>
          ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map { _._2.taskSet.id }.mkString(",")}")
      }
       /**
       * 申請任務排程,有FIFO和FAIR兩種策略
       * 根據executor的空閒資源狀態及locality策略將task分配給executor。
       * 排程的資料結構封裝為Pool類,
       * 
       * 對於FIFO,Pool就是TaskSetManager的佇列
       * 對於Fair,則是TaskSetManager組成的樹
       * 
       * Pool維護TaskSet的優先順序,等待executor接受資源offer(resourceOffer)的時候出列並提交executor計算
       * 
       */
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      // 不是本地且沒有接收task,啟動一個timer定時排程,如果一直沒有task就警告,直到有task
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }

    /**
     * 建立TaskScheduler的時候,一件非常重要的事情就是 TaskSchedulerImpl建立一個StandaloneSchedulerBackend,
     * 這裡的backend就是StandaloneSchedulerBackend(executor啟動後會反向註冊到StandaloneSchedulerBackend)
     * 而且這個backend是負責建立AppClient,向Master註冊Application的
     */
    backend.reviveOffers()
  }

TaskSetManager功能:在TaskSchedulerImpl中,對一個單獨TaskSet的任務進行排程。這個類負責追蹤每一個task,如果task失敗話會負責重試task,直到超過重試次數的限制,並且通過延遲排程,為這個TaskSet處理本地化排程機制,主要介面是resourceOffers在這個介面中TaskSet會希望在一個節點上執行一個任務,並且接受任務的狀態改變訊息,來知道它負責的task的狀態改變了

第二步:backend.reviveOffers()方法(StandaloneSchedulerBackend父類CoarseGrainedSchedulerBackend中)
原始碼地址:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.scala

  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }
  var driverEndpoint: RpcEndpointRef = null

  protected def minRegisteredRatio: Double = _minRegisteredRatio

  override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = createDriverEndpointRef(properties)
  }

  protected def createDriverEndpointRef(
      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }

  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    new DriverEndpoint(rpcEnv, properties)
  }

第三步:DriverEndpoint類中receive()方法

    override def receive: PartialFunction[Any, Unit] = {
    // 如果接收StatusUpdate訊息,用於狀態更新  
    case StatusUpdate(executorId, taskId, state, data) =>
      // 呼叫TaskSchedulerImpl#statusUpdate進行更新  
      scheduler.statusUpdate(taskId, state, data.value)
         // 如果Task處於完成狀態
        if (TaskState.isFinished(state)) {
          // 通過executor id獲取ExecutorData
          executorDataMap.get(executorId) match {
            // 如果存在資料
            case Some(executorInfo) =>
              // 則更新executor的cpu核數
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              // 獲取叢集中可用的executor列表,發起task
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }
      // 如果傳送ReviveOffers訊息
      case ReviveOffers =>
         // 獲取叢集中可用的executor列表,發起task
        makeOffers()
       // 如果是KillTask訊息,表示kill掉這個task
      case KillTask(taskId, executorId, interruptThread, reason) =>
        executorDataMap.get(executorId) match {
         // 向Executor傳送KillTask的訊息
          case Some(executorInfo) =>
            executorInfo.executorEndpoint.send(
              KillTask(taskId, executorId, interruptThread, reason))
          case None =>
            // Ignoring the task kill since the executor is not registered.
            logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
        }

......

}

第三步:makeOffers()方法

    //makeOffers獲取有效的executor,開始發起任務
    private def makeOffers() {
      // Make sure no executor is killed while some task is launching on it
      /**
       * 1,過濾出活著的Executors
       * 2,新建WorkerOffer物件,呼叫scheduler.resourceOffers()分配資源到各個Executor上去;
       * 3,分配好task到Executor上之後,執行自己的lauchTasks(),將分配的task傳送launchTasks資訊;
       */
      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
        // Filter out executors under killing
        // 當tasks在正在啟動的時候,確保沒有Executor被殺掉,過濾掉被殺掉的Executor
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        // application所有可用的executor,並且封裝成workOffer(代表了所有可用的每個Executor可用的CPU資源)
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort))
        }.toIndexedSeq
        // 呼叫scheduler.resourceOffers()分配資源到各個Executor上去;
        scheduler.resourceOffers(workOffers)
      }
      if (!taskDescs.isEmpty) {
        // 啟動 tasks
        launchTasks(taskDescs)
      }
    }

第四步:task分配演算法 resourceOffers
計算每一個TaskSetMangaer的本地化級別(locality_level);並且對task set嘗試使用最小的本地化級別(locality_level), 將task set的task在executor上啟動;如果啟動不了,放大本地化級別,以此類推直到某種本地化級別嘗試成功。

本地化級別分類:

  • PROCESS_LOCAL:程序本地化,程式碼和資料在同一個程序中,也就是在同一個executor中;計算資料的task由executor執行,資料在executor的BlockManager中,效能最好
  • NODE_LOCAL:節點本地化,程式碼和資料在同一個節點中;比如說,資料作為一個HDFS block塊在節點上,而task在節點上某個executor中執行;或者是資料和task在一個節點上的不同executor中,資料需要在程序間進行傳輸
  • NO_PREF:對於task來說,資料從哪裡獲取都一樣,沒有好壞之分,比如說SparkSQL讀取MySql中的資料
  • RACK_LOCAL:機架本地化,資料和task在一個機架的兩個節點上,資料需要通過網路在節點之間進行傳輸
  • ANY:資料和task可能在叢集中的任何地方,而且不在一個機架中,效能最差
  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false

    // 遍歷有可用資源的Executor
    for (o <- offers) {
      // 如果沒有包含了這個executor的host,初始化一個集合,存放host
      if (!hostToExecutors.contains(o.host)) {
        hostToExecutors(o.host) = new HashSet[String]()
      }
       // 如果<executorId,taskId集合>不包含這個executorId
      if (!executorIdToRunningTaskIds.contains(o.executorId)) {
        // 通知DAGScheduler新增Executors
        hostToExecutors(o.host) += o.executorId
        executorAdded(o.executorId, o.host)
        executorIdToHost(o.executorId) = o.host
        executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
        newExecAvail = true
      }
       // 遍歷主機所在機架
      for (rack <- getRackForHost(o.host)) {
        // 更新hosts和機架的對映
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
    // this here to avoid a separate thread and added synchronization overhead, and also because
    // updating the blacklist is only relevant when task offers are being made.
    blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

    val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
      offers.filter { offer =>
        !blacklistTracker.isNodeBlacklisted(offer.host) &&
          !blacklistTracker.isExecutorBlacklisted(offer.executorId)
      }
    }.getOrElse(offers)

    //首先,將可用的Executor進行Shuffle,儘可能打散做到負載均衡
    val shuffledOffers = shuffleOffers(filteredOffers)
    // Build a list of tasks to assign to each worker.
    /**
     * 構建分配給每個worker的任務列表。
     * tasks可以理解為是一個二維陣列arrayBuffer,元素又是一個arrayBuffer
     * 每個子arrayBuffer的數量是固定的,也就是Executor可用的核數
     */
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
    // 有效可用的CPU核數
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
    /**
     * 從rootpool中取出排序之後的TaskSet
     * TaskScheduler初始化的時候,建立完TaskSchedulerImpl之後執行的一個initialize()這個方法中會建立一個排程池
     * 這裡相當於是說,所提交的taskset,首先會放到這個排程池中之後
     * 排程task的分配演算法的時候,會從這個排程池中,取出排序好隊的taskset
     */
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    // 如果有新加入的executor,需要重新計算資料本地性
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
     /**
      * 任務分配演算法的核心
      * 
      * 雙重迴圈遍歷所有的taskset,已經每一種本地化級別
      * 本地化級別分類:
      *  1. PROCESS_LOCAL : 程序本地化,RDD的Partition與task進入一個Executor內,速度最快
      *  2. NODE_LOCAL : 節點本地化,RDD的Partition與task不在一個Executor,即不在一個程序,但是在一個Worker上
      *  3. NO_PREF : 無所謂本地化級別
      *  4. RACK_LOCAL : 機架本地化,至少RDD的Partition與task在一個機架上
      *  5. ANY :任意的本地化級別
      */
    for (taskSet <- sortedTaskSets) {
      // Skip the barrier taskSet if the available slots are less than the number of pending tasks.
      if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
        // Skip the launch process.
        // TODO SPARK-24819 If the job requires more slots than available (both busy and free
        // slots), fail the job on submit.
        logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
          s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
          s"number of available slots is $availableSlots.")
      } else {
        var launchedAnyTask = false
        // Record all the executor IDs assigned barrier tasks on.
        val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
         // 對於每一個taskSet,從最好的一個本地化級別  開始遍歷
        for (currentMaxLocality <- taskSet.myLocalityLevels) {
          var launchedTaskAtCurrentMaxLocality = false
          do {
            /**
             * 對於當前的taskSet,嘗試優先使用最小的本地化級別,將taskSet的task,在executor上啟動
             * 如果啟動不了,就跳出do while 迴圈,進入下一種本地化級別,也就是放大本地化級別
             * 以此類推,直到嘗試將taskSet在某種本地化級別下,將task在executor上全部啟動
             */
            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
              taskSet,
              currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
          } while (launchedTaskAtCurrentMaxLocality)
        }
        // 如果這個task在任何本地化級別都啟動不了,有可能在黑名單
        if (!launchedAnyTask) {
          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
        }
        if (launchedAnyTask && taskSet.isBarrier) {
          // Check whether the barrier tasks are partially launched.
          // TODO SPARK-24818 handle the assert failure case (that can happen when some locality
          // requirements are not fulfilled, and we should revert the launched tasks).
          require(
            addressesWithDescs.size == taskSet.numTasks,
            s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
              s"because only ${addressesWithDescs.size} out of a total number of " +
              s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
              "been blacklisted or cannot fulfill task locality requirements.")

          // materialize the barrier coordinator.
          maybeInitBarrierCoordinator()

          // Update the taskInfos into all the barrier task properties.
          val addressesStr = addressesWithDescs
            // Addresses ordered by partitionId
            .sortBy(_._2.partitionId)
            .map(_._1)
            .mkString(",")
          addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

          logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
            s"stage ${taskSet.stageId}.")
        }
      }
    }

    // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
    // launched within a configured time.
    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

主要策略如下:

  1. 隨機打亂executor,避免分配到同一個work中
  2. 對TaskSet排序
  3. 為TaskSet分配資源,按照TaskSet最大的locality開始分配
  4. 取第一個TaskSet,再取當前TaskSet的最大的locality,呼叫resourceOfferSingleTaskSet

第五步:resourceOfferSingleTaskSet()方法

  private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
    addressesWithDescs: ArrayBuffer[(String, TaskDescription)]): Boolean = {
    var launchedTask = false
    // nodes and executors that are blacklisted for the entire application have already been
    // filtered out by this point
    // 遍歷所有executor
    for (i <- 0 until shuffledOffers.size) {
      // 獲取executorId和host
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
       // 如果當前executor的cpu數量大於每個task要使用的cpu數量,預設是1
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
           //呼叫resourceOffer方法找到在executor上,哪些TaskSet的task可以通過當前本地化級別啟動
           // 遍歷在該executor上當前本地化級別可以執行的task
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            // 如果存在,則把每一個task放入要在當前executor執行的task二維數組裡面,即指定executor要執行的task
            tasks(i) += task
            // 將相應的分配資訊加入記憶體快取
            val tid = task.taskId
            taskIdToTaskSetManager.put(tid, taskSet)
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            // Only update hosts for a barrier task.
            if (taskSet.isBarrier) {
              // The executor address is expected to be non empty.
              addressesWithDescs += (shuffledOffers(i).address.get -> task)
            }
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

第六步:resourceOffer()方法

/**
   * 判斷這個executor在這個本地化級別之前的等待時間是多少
   * 如果說,本地化級別的等待時間在一定範圍內,那麼就認為task使用本地化級別可以在executor啟動
   */
  @throws[TaskNotSerializableException]
  def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
      blacklist.isNodeBlacklistedForTaskSet(host) ||
        blacklist.isExecutorBlacklistedForTaskSet(execId)
    }
    if (!isZombie && !offerBlacklisted) {
      val curTime = clock.getTimeMillis()

      var allowedLocality = maxLocality  //記錄本地性 process_local

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime) //尋找當前允許的本地性
        if (allowedLocality > maxLocality) {
          // 如果允許的本地性低,還是用原來的本地性
          // 假設getAllowedLocalityLevel返回的是NODE_LOCAL, 比原來PROCESS低,還是用PROCESS
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

  ......
  
  }

第七步:getAllowedLocalityLevel()方法

/**
   * 重新計算當前時間節點的最高本地性級別,由於存在延遲排程,所以我們需要根據基於等待時間的延遲排程演算法來獲取當前的本地性
   */
  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
        currentLocalityIndex += 1
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

第八步:dequeueTask()方法

 /**
   * 根據不同的Task的本地性級別進行不同的處理
   */
  private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    // dequeueTaskFromList()方法:從給定的列表中取消一個掛起的任務並返回它的索引。如果列表為空,則返回None。
    // PROCESS_LOCAL: 資料在同一個 JVM 中,即同一個 executor 上。這是最佳資料 locality。
    for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    // NODE_LOCAL: 資料在同一個節點上。比如資料在同一個節點的另一個 executor上;或在 HDFS 上,
    // 恰好有 block 在同一個節點上。速度比 PROCESS_LOCAL 稍慢,因為資料需要在不同程序之間傳遞或從檔案中讀取
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

     // NO_PREF: 資料從哪裡訪問都一樣快,不需要位置優先
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }

    // RACK_LOCAL: 資料在同一機架的不同節點上。需要通過網路傳輸資料及檔案 IO,比 NODE_LOCAL 慢
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

    // ANY: 資料在非同一機架的網路上,速度最慢
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
        return Some((index, TaskLocality.ANY, false))
      }
    }
    // 如果所有其他任務都安排好了,就去找一個推測的任務。
    // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }

第九步:提交task到相應的executor上 launchTasks

  • 首先將每一個要執行的task資訊,統一進行序列化
  • 找到對應的executo
  • 將executor的資源減去使用的cpu資源
  • 向executor上發LaunchTask的資訊,在executor上啟動task
    /**
     * 根據分配好的情況,去executor上啟動相應的task
     */
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        //首先將每一個要執行的task資訊,統一進行序列化
        val serializedTask = TaskDescription.encode(task)
        if (serializedTask.limit() >= maxRpcMessageSize) {
          Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        } else {
          //找到對應的executor
          val executorData = executorDataMap(task.executorId)
          //將executor的資源減去使用的cpu資源
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")
          //向executor上發LaunchTask的資訊,在executor上啟動task
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

先將task進行序列化, 如果當前task序列化後的大小超過了128MB-200KB,跳過當前task,並把對應的taskSetManager置為zombie模式,若大小不超過限制,則傳送訊息到executor啟動task執行