1. 程式人生 > >spark任務執行完成後在driver端的處理邏輯

spark任務執行完成後在driver端的處理邏輯

回顧

上一篇,我們分析了了任務在executor端的執行流程,任務執行結束後,在Executor.launchTask方法最後,通過呼叫execBackend.statusUpdate方法將任務結果以及任務狀態傳送給driver。回到driver端,我們在driver的rpc服務端DriverEndPoint的receive方法中尋找對StatusUpdate訊息的處理邏輯。

DriverEndpoint.receive

case StatusUpdate(executorId, taskId, state, data) =>
    // 通知TaskScheduler任務已完成
    scheduler.statusUpdate(taskId, state, data.value)
    // 如果任務已經執行結束了,包括FINISHED, FAILED, KILLED, LOST這幾種狀態
    // 那麼說明任務佔用的資源已經釋放了,此時就可以回收這部分資源並重新分配任務
    if (TaskState.isFinished(state)) {
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          executorInfo.freeCores += scheduler.CPUS_PER_TASK
          makeOffers(executorId)
        case None =>
          // Ignoring the update since we don't know about the executor.
          logWarning(s"Ignored task status update ($taskId state $state) " +
            s"from unknown executor with ID $executorId")
      }
    }   

所以重點是scheduler.statusUpdate呼叫

TaskSchedulerImpl.statusUpdate

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
  try {
    taskIdToTaskSetManager.get(tid) match {
      case Some(taskSet) =>
        // 這個狀態不明,沒看什麼地方會產生這個狀態
        if (state == TaskState.LOST) {
          // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
          // where each executor corresponds to a single task, so mark the executor as failed.
          val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
            "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
          if (executorIdToRunningTaskIds.contains(execId)) {
            reason = Some(
              SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
            removeExecutor(execId, reason.get)
            failedExecutor = Some(execId)
          }
        }
        // 任務執行結束,包括這幾種狀態FINISHED, FAILED, KILLED, LOST
        if (TaskState.isFinished(state)) {
          // 清除關於這個task的一些簿記量
          cleanupTaskState(tid)
          // 將這個task從正在執行的task集合中移除
          taskSet.removeRunningTask(tid)
          if (state == TaskState.FINISHED) {
            // 啟動一個執行緒,用來非同步地處理任務成功的情況
            taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
          } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
            taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
          }
        }
      case None =>
        logError(
          ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
            "likely the result of receiving duplicate task finished status updates) or its " +
            "executor has been marked as failed.")
            .format(state, tid))
    }
  } catch {
    case e: Exception => logError("Exception in statusUpdate", e)
  }
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
  assert(reason.isDefined)
  dagScheduler.executorLost(failedExecutor.get, reason.get)
  backend.reviveOffers()
}
}

這裡,啟動了一個非同步任務,用來處理任務成功的情況,所以我們分析一下非同步任務的處理邏輯。

TaskResultGetter.enqueueSuccessfulTask

def enqueueSuccessfulTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  serializedData: ByteBuffer): Unit = {
// 啟動一個非同步任務
getTaskResultExecutor.execute(new Runnable {
  override def run(): Unit = Utils.logUncaughtExceptions {
    try {
      // 對傳回的結果進行反序列化
      val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
          // 如果是直接傳回的結果,那麼直接從反序列化的物件中取資料即可
        case directResult: DirectTaskResult[_] =>
          // 首先檢查結果大小是否超過閾值,預設是1g,
          // 也即最多能夠允許多大的結果放到driver端
          if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
            return
          }
          // deserialize "value" without holding any lock so that it won't block other threads.
          // We should call it here, so that when it's called again in
          // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
          // 對任務結果進行反序列化,呼叫該方法不會引起其他執行緒阻塞,
          directResult.value(taskResultSerializer.get())
          (directResult, serializedData.limit())
        case IndirectTaskResult(blockId, size) =>
          // 檢查結果大小是否超過限制
          if (!taskSetManager.canFetchMoreResults(size)) {
            // dropped by executor if size is larger than maxResultSize
            // 如果放棄了該任務,那麼需要將該任務在blockmanager中對應的block移除掉
            sparkEnv.blockManager.master.removeBlock(blockId)
            return
          }
          logDebug("Fetching indirect task result for TID %s".format(tid))
          // 這句話最終會通過DAGScheduler給事件匯流排投遞一條TaskGetting的事件
          scheduler.handleTaskGettingResult(taskSetManager, tid)
          // 通過blockManager遠端拉取結果資料
          // 而這個blockId對應的塊的位置資訊已經在之前由executor端傳回
          val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
          if (!serializedTaskResult.isDefined) {
            /* We won't be able to get the task result if the machine that ran the task failed
             * between when the task ended and when we tried to fetch the result, or if the
             * block manager had to flush the result. */
            // 這裡拉取資料失敗分為兩種情況:一種是由於任務序列化後體積太大主動丟棄
            // 另一種是executor節點網路異常,導致拉取失敗
            // 這兩種情況都算作任務失敗
            // 這個方法主要是對失敗的任務重新執行
            scheduler.handleFailedTask(
              taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
            return
          }
          // 將從blockManager拉取到的資料進行反序列化
          val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
            serializedTaskResult.get.toByteBuffer)
          // force deserialization of referenced value
          // 對任務結果進行反序列化
          deserializedResult.value(taskResultSerializer.get())
          // 將block移除,因為資料已經拉取到了
          sparkEnv.blockManager.master.removeBlock(blockId)
          (deserializedResult, size)
      }

      // Set the task result size in the accumulator updates received from the executors.
      // We need to do this here on the driver because if we did this on the executors then
      // we would have to serialize the result again after updating the size.
      // 處理累加器,主要是對任務結果大小的統計量需要做特殊處理
      result.accumUpdates = result.accumUpdates.map { a =>
        // 對於任務結果大小的統計量需要做特殊處理
        if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
          val acc = a.asInstanceOf[LongAccumulator]
          assert(acc.sum == 0L, "task result size should not have been set on the executors")
          acc.setValue(size.toLong)
          acc
        } else {
          a
        }
      }

      // 將反序列化好的結果資料告訴TaskSchedulerImpl做進一步處理
      scheduler.handleSuccessfulTask(taskSetManager, tid, result)
    } catch {
      case cnf: ClassNotFoundException =>
        val loader = Thread.currentThread.getContextClassLoader
        taskSetManager.abort("ClassNotFound with classloader: " + loader)
      // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
      case NonFatal(ex) =>
        logError("Exception while getting task result", ex)
        taskSetManager.abort("Exception while getting task result: %s".format(ex))
    }
  }
})
}

這裡會有好幾次反序列化,這時因為在executor端對任務結果資料處理時就是經過了好幾次序列化,

  • 首先會把任務執行的結果進行序列化,和累加器一起包裝成DirectTaskResult物件
  • 然後對DirectTaskResult物件進行序列化
  • 對於結果太大通過blockManager傳輸的情況,需要封裝一個IndirectTaskResult物件
  • 最後還有對IndirectTaskResult物件進行序列化

可以看到在結果傳回driver端後,是按照與上面相反的順序進行反序列化的。
最後拿到任務執行的結果資料以後,將結果資料交給TaskSchedulerImpl做進一步處理。

TaskSchedulerImpl.handleSuccessfulTask

def handleSuccessfulTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  taskResult: DirectTaskResult[_]): Unit = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}

TaskSetManager.handleSuccessfulTask

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
// 更新一些簿記量
val info = taskInfos(tid)
val index = info.index
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
  successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)

// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
// 對於這個任務的其他執行中的副本,全部都要殺掉,主要是推測執行機制會對同一個任務同時執行多個副本
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
    s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
    s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
  killedByOtherAttempt(index) = true
  // 通過排程後端傳送殺死任務的資訊
  sched.backend.killTask(
    attemptInfo.taskId,
    attemptInfo.executorId,
    interruptThread = true,
    reason = "another attempt succeeded")
}
// 檢查是不是第一次,如果是第一次才會更新這些簿記量
// 這麼做主要是為了防止多個任務副本多次更新造成不一致
if (!successful(index)) {
  tasksSuccessful += 1
  logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
    s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
    s" ($tasksSuccessful/$numTasks)")
  // Mark successful and stop if all the tasks have succeeded.
  successful(index) = true
  // 如果全部的任務都完成了,就說明這個任務集(stage)完成了
  if (tasksSuccessful == numTasks) {
    isZombie = true
  }
} else {
  logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
    " because task " + index + " has already completed successfully")
}
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
// 進一步通知DAG排程器做進一步處理,
// 這裡可見在任務提交執行是的處理順序是從DAGScheduler -> TaskScheduler -> SchedulerBackend -> executor
// 而任務執行結束後結果返回處理的順序則與上面的順正好反過來。
// 此外,也能看出TaskScheduler也充當了DAGScheduler和SchedulerBackend中間人的角色,傳遞訊息
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
// 更新一些簿記量
maybeFinishTaskSet()
}

這個方法的主要工作是更新一些簿記量;殺掉其他的任務副本;
然後通知DAGScheduler做進一步處理。

DAGScheduler.handleTaskCompletion

這個方法很長,所以我們把這個方法的主要邏輯做一個總結:

  • 處理累加器。對於ResultTask型別的任務不會進行重複累加,而對於ShuffleMapTask型別的任務則會進行重複累加(推測執行)
  • 首先,向事件匯流排中投遞一個任務結束的事件
  • 針對任務執行成功的情況做處理。如果是ResultTask型別的任務,需要更新一些簿記量,並在整個stage的所有任務完成時將stage標記為完成,並且通知作業監聽器;對於ShuffleMapTask型別的任務處理要複雜一些,同樣要更新一些簿記量,並且在mapOutputTracker元件中註冊這個任務的輸出block資訊,如果所有的分割槽全部完成,那麼還要將這個stage標記為完成。
  • 處理拉取資料失敗的情況。除了更新一些簿記量,主要做的事就是判斷是否要再次提交stage,如果不能再次提交(衝提交次數超過閾值)那麼就需要將關聯的job取消掉,否則再次提交這個stage。這裡需要注意的是,再次提交stage並不會把所有的任務全部再重新執行一遍,只會把那些因失敗而導致沒有完成的任務重新提交,通過mapOutputTrackerMaster元件追蹤mShuffleMap任務的輸出情況。

     private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
     val taskId = event.taskInfo.id
     val stageId = task.stageId
     val taskType = Utils.getFormattedClassName(task)
    
     // 通知outputCommitCoordinator元件對任務完成的事件做一些處理
     // outputCommitCoordinator元件需要對失敗的任務
     outputCommitCoordinator.taskCompleted(
       stageId,
       task.partitionId,
       event.taskInfo.attemptNumber, // this is a task attempt number
       event.reason)
    
     if (!stageIdToStage.contains(task.stageId)) {
       // The stage may have already finished when we get this event -- eg. maybe it was a
       // speculative task. It is important that we send the TaskEnd event in any case, so listeners
       // are properly notified and can chose to handle it. For instance, some listeners are
       // doing their own accounting and if they don't get the task end event they think
       // tasks are still running when they really aren't.
       // 在獲取這個事件時對應的stage可能已經完成了。比如,當前完成的task可能是一個推測執行的task。
       // 但是,無論如何,我們都有必要向事件匯流排中投遞一個任務結束的事件,
       // 這樣才能正確第通知監聽器,以使得監聽器能夠做出正確的處理。
       // 例如有的監聽器會對所有完成的任務(包括推測執行)進行計數,如果監聽器獲取不到任務完成的事件
       // 他們就會認為任務還在執行。
       postTaskEnd(event)
    
       // Skip all the actions if the stage has been cancelled.
       // 由於stage在之前已經被處理過了,所以這裡直接返回
       return
     }
    
     val stage = stageIdToStage(task.stageId)
    
     // Make sure the task's accumulators are updated before any other processing happens, so that
     // we can post a task end event before any jobs or stages are updated. The accumulators are
     // only updated in certain cases.
     // 這裡應該思考一個問題:既然任務的多個副本可能會同時完成,
     // 那麼也就有可能會同時傳送任務結束事件,
     // 也就說這個方法可能因為任務的多個副本在同一段時間內完成而被同時執行
     // 那麼這裡沒有加鎖,也沒有CAS或其他的一些同步措施,這樣不會嘗試執行緒不安全問題嗎??
     // 答案在於EventLoop類中,這個類處理事件的執行緒只有一個,
     // 所以實際上所有的事件都是序列執行的,自然也就不會有執行緒不安全的問題了
    
    
     // 這一步主要是處理累加器
     event.reason match {

    case Success =>
    task match {
    case rt: ResultTask[, ] =>
    val resultStage = stage.asInstanceOf[ResultStage]
    resultStage.activeJob match {
    case Some(job) =>
    // Only update the accumulator once for each result task.
    // 對於ResultTask的累加器只計算一次,不會重複計算
    if (!job.finished(rt.outputId)) {
    updateAccumulators(event)
    }
    case None => // Ignore update if task's job has finished.
    }
    case _ =>
    // 對於ShuffleMapTask則不會考慮累加器的重複計數,
    // 也就意味著ShufleMapTask中執行的累加器會重複計數
    updateAccumulators(event)
    }
    case : ExceptionFailure => updateAccumulators(event)
    case
    =>
    }
    // 向事件匯流排投遞一個任務完成的事件
    postTaskEnd(event)

        // 這一步主要是對作業的一些簿記量的更新維護
        // 如果作業的全部分割槽都已完成,那麼移除掉這個作業
        // 並移除作業內不被其他作業依賴的stage的資訊
          event.reason match {
          case Success =>
         task match {
           case rt: ResultTask[_, _] =>
            // Cast to ResultStage here because it's part of the ResultTask
            // TODO Refactor this out to a function that accepts a ResultStage
             val resultStage = stage.asInstanceOf[ResultStage]
             resultStage.activeJob match {
              case Some(job) =>
               if (!job.finished(rt.outputId)) {
               job.finished(rt.outputId) = true
               job.numFinished += 1
               // If the whole job has finished, remove it
               // 如果作業的全部分割槽都已完成,那麼移除掉這個作業
               // 並移除作業內不被其他作業依賴的stage的資訊
               if (job.numFinished == job.numPartitions) {
                 // 把這個stage標記為已完成
                 markStageAsFinished(resultStage)
                 // 移除作業內不被其他作業依賴的stage的資訊
                 cleanupStateForJobAndIndependentStages(job)
                 // 向事件匯流排追蹤投遞一個作業結束的事件
                 listenerBus.post(
                   SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
               }
    
               // taskSucceeded runs some user code that might throw an exception. Make sure
               // we are resilient against that.
               // 最後,需要呼叫作業監聽器的回撥函式,以通知作業監聽器
               try {
                 job.listener.taskSucceeded(rt.outputId, event.result)
               } catch {
                 case e: Exception =>
                   // TODO: Perhaps we want to mark the resultStage as failed?
                   job.listener.jobFailed(new SparkDriverExecutionException(e))
               }
             }
           case None =>
             logInfo("Ignoring result from " + rt + " because its job has finished")
         }
    
         // 處理shuffleMapTask的情況
       case smt: ShuffleMapTask =>
         val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
         val status = event.result.asInstanceOf[MapStatus]
         val execId = status.location.executorId
         logDebug("ShuffleMapTask finished on " + execId)
         if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
           // This task was for the currently running attempt of the stage. Since the task
           // completed successfully from the perspective of the TaskSetManager, mark it as
           // no longer pending (the TaskSetManager may consider the task complete even
           // when the output needs to be ignored because the task's epoch is too small below.
           // In this case, when pending partitions is empty, there will still be missing
           // output locations, which will cause the DAGScheduler to resubmit the stage below.)
           // 如果如果task的stageAttemptId與當前最新的stage資訊相同,
           // 說明該任務已經完成
           shuffleStage.pendingPartitions -= task.partitionId
         }
         // 如果這個任務的epoch比被標記為失敗的epoch要早,那麼忽略這次執行結果
         if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
           logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
         } else {
           // The epoch of the task is acceptable (i.e., the task was launched after the most
           // recent failure we're aware of for the executor), so mark the task's output as
           // available.
           // 這個任務的epoch被接收,那麼在mapOutputTracker元件中將這個任務標記為成功
           // 然後就能通過mapOutputTracker元件獲取到這個分割槽的結果狀態了
           mapOutputTracker.registerMapOutput(
             shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
           // Remove the task's partition from pending partitions. This may have already been
           // done above, but will not have been done yet in cases where the task attempt was
           // from an earlier attempt of the stage (i.e., not the attempt that's currently
           // running).  This allows the DAGScheduler to mark the stage as complete when one
           // copy of each task has finished successfully, even if the currently active stage
           // still has tasks running.
           // 同樣將這個分割槽標記為已完成
           shuffleStage.pendingPartitions -= task.partitionId
         }
    
         // 如果stage的所有分割槽都已完成,那麼將這個stage標記為已完成
         if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
           markStageAsFinished(shuffleStage)
           logInfo("looking for newly runnable stages")
           logInfo("running: " + runningStages)
           logInfo("waiting: " + waitingStages)
           logInfo("failed: " + failedStages)
    
           // This call to increment the epoch may not be strictly necessary, but it is retained
           // for now in order to minimize the changes in behavior from an earlier version of the
           // code. This existing behavior of always incrementing the epoch following any
           // successful shuffle map stage completion may have benefits by causing unneeded
           // cached map outputs to be cleaned up earlier on executors. In the future we can
           // consider removing this call, but this will require some extra investigation.
           // See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
           mapOutputTracker.incrementEpoch()
    
           // 清除RDD的分割槽結果位置快取
           // 以便在訪問快取是重新從blockManager中或rdd分割槽結果的位置資訊
           clearCacheLocs()
    
           if (!shuffleStage.isAvailable) {
             // Some tasks had failed; let's resubmit this shuffleStage.
             // 如果有部分任務失敗,那麼需要重新提交這個stage
             // TODO: Lower-level scheduler should also deal with this
             logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
               ") because some of its tasks had failed: " +
               shuffleStage.findMissingPartitions().mkString(", "))
             submitStage(shuffleStage)
           } else {
             // Mark any map-stage jobs waiting on this stage as finished
             // 將所有依賴於這個stage的job標記為執行結束
             if (shuffleStage.mapStageJobs.nonEmpty) {
               val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
               for (job <- shuffleStage.mapStageJobs) {
                 markMapStageJobAsFinished(job, stats)
               }
             }
             // 提價下游的子stage
             submitWaitingChildStages(shuffleStage)
           }
         }
     }
    
     //處理重複提交的情況
     case Resubmitted =>
     logInfo("Resubmitted " + task + ", so marking it as still running")
     stage match {
       case sms: ShuffleMapStage =>
         sms.pendingPartitions += task.partitionId
    
       case _ =>
         assert(false, "TaskSetManagers should only send Resubmitted task statuses for " +
           "tasks in ShuffleMapStages.")
     }
    
     // 處理拉取資料失敗的情況
     case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
     val failedStage = stageIdToStage(task.stageId)
     val mapStage = shuffleIdToMapStage(shuffleId)
    
     // 如果這個任務的attempId與stage最近一次的attemptId不同,
     // 那麼忽略這個異常,因為又一次更新的stage的嘗試正在執行中
     if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
       logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
         s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
         s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
     } else {
       // It is likely that we receive multiple FetchFailed for a single stage (because we have
       // multiple tasks running concurrently on different executors). In that case, it is
       // possible the fetch failure has already been handled by the scheduler.
       // 將這個stage標記為已結束
       if (runningStages.contains(failedStage)) {
         logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
           s"due to a fetch failure from $mapStage (${mapStage.name})")
         markStageAsFinished(failedStage, Some(failureMessage))
       } else {
         logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +
           s"longer running")
       }
    
       // 把拉取失敗的stage的attemptId記錄下來
       failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
       // 如果stage的嘗試次數已經超過最大允許值,那麼將直接將取消該stage
       val shouldAbortStage =
         failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
         disallowStageRetryForTest
    
       if (shouldAbortStage) {
         val abortMessage = if (disallowStageRetryForTest) {
           "Fetch failure will not retry stage due to testing config"
         } else {
           s"""$failedStage (${failedStage.name})
              |has failed the maximum allowable number of
              |times: $maxConsecutiveStageAttempts.
              |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
         }
         // 取消這個stage, 做一些處理
         abortStage(failedStage, abortMessage, None)
       } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued
         // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064
         val noResubmitEnqueued = !failedStages.contains(failedStage)
         // 將這個stage新增到失敗的stage佇列中,
         // 這個佇列是等待重新提交的stage佇列
         failedStages += failedStage
         failedStages += mapStage
         if (noResubmitEnqueued) {
           // We expect one executor failure to trigger many FetchFailures in rapid succession,
           // but all of those task failures can typically be handled by a single resubmission of
           // the failed stage.  We avoid flooding the scheduler's event queue with resubmit
           // messages by checking whether a resubmit is already in the event queue for the
           // failed stage.  If there is already a resubmit enqueued for a different failed
           // stage, that event would also be sufficient to handle the current failed stage, but
           // producing a resubmit for each failed stage makes debugging and logging a little
           // simpler while not producing an overwhelming number of scheduler events.
           logInfo(
             s"Resubmitting $mapStage (${mapStage.name}) and " +
             s"$failedStage (${failedStage.name}) due to fetch failure"
           )
           // 200毫秒之後給內部的事件處理執行緒傳送一個重新提交stage的事件
           // 以通知DAGSchedduler重新提交失敗的stage
           messageScheduler.schedule(
             new Runnable {
               override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
             },
             DAGScheduler.RESUBMIT_TIMEOUT,
             TimeUnit.MILLISECONDS
           )
         }
       }
       // Mark the map whose fetch failed as broken in the map stage
       // 從mapOutputTracker中將這個任務的map輸出資訊移除掉
       if (mapId != -1) {
         mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
       }
    
       // TODO: mark the executor as failed only if there were lots of fetch failures on it
       // 將拉取失敗的block所在的executor移除掉,通知DriverEndpoint移除
       // 並且在blockManagerMaster中將對應的executor上的所有block資訊全部移除
       if (bmAddress != null) {
         val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled &&
           unRegisterOutputOnHostOnFetchFailure) {
           // We had a fetch failure with the external shuffle service, so we
           // assume all shuffle data on the node is bad.
           Some(bmAddress.host)
         } else {
           // Unregister shuffle data just for one executor (we don't have any
           // reason to believe shuffle data has been lost for the entire host).
           None
         }
         removeExecutorAndUnregisterOutputs(
           execId = bmAddress.executorId,
           fileLost = true,
           hostToUnregisterOutputs = hostToUnregisterOutputs,
           maybeEpoch = Some(task.epoch))
       }
     }
     case commitDenied: TaskCommitDenied =>
     // Do nothing here, left up to the TaskScheduler to decide how to handle denied commit
     case exceptionFailure: ExceptionFailure =>
     // Nothing left to do, already handled above for accumulator updates.
     case TaskResultLost =>
     // Do nothing here; the TaskScheduler handles these failures and resubmits the task.
    
     case _: ExecutorLostFailure | _: TaskKilled | UnknownReason =>
     // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
     // will abort the job.
     }
     }