1. 程式人生 > >TasksetManager衝突導致SparkContext異常關閉

TasksetManager衝突導致SparkContext異常關閉

背景介紹

當正在悠閒敲著程式碼的時候,業務方兄弟反饋接收到大量線上執行的spark streaming任務的告警簡訊,檢視應用的web頁面資訊,發現spark應用已經退出了,第一時間拉起線上的應用,再慢慢的定位故障原因。本文程式碼基於spark 1.6.1。 問題定位

登陸到線上機器,檢視錯誤日誌,發現系統一直報Cannot call methods on a stopped SparkContext.,全部日誌如下

[ERROR][JobScheduler][2017-03-08+15:56:00.067][org.apache.spark.streaming.scheduler.JobScheduler]Error running job streaming job 1488959760000 ms.0 java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at: org.apache.spark.SparkContext.<init>(SparkContext.scala:82) org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874) org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) com.xxxx.xxxx.MainApp$.createStreamingContext(MainApp.scala:46) com.xxxx.xxxx.MainApp$$anonfun$15.apply(MainApp.scala:126) com.xxxx.xxxx.MainApp$$anonfun$15.apply(MainApp.scala:126) scala.Option.getOrElse(Option.scala:120) org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864) com.xxxx.xxxx.MainApp$.main(MainApp.scala:125) com.xxxx.xxxx.MainApp.main(MainApp.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

看到此處應該很清楚了,是SparkContext已經停止了,接下來我們分析下是什麼原因導致了SparkContext的停止,首先找到關閉的日誌;分析SparkContext的程式碼可知,在關閉結束後會列印一個成功關閉的詳情日誌。

logInfo("Successfully stopped SparkContext")

通過grep命令找到相應的日誌的位置,如下所示

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:30.841][org.apache.spark.SparkContext]Successfully stopped SparkContext

從日誌中可以看出是dag-scheduler-event-loop執行緒關閉了SparkContext,檢視該執行緒的日誌資訊,顯示如下

java.lang.IllegalStateException: more than one active taskSet for stage 4571114: 4571114.2,4571114.1 at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:173) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1052) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

上面顯示有一個stage同時啟動了兩個TasksetManager,TaskScheduler.submitTasks的程式碼如下:

override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } ......... }

看到這震驚了,怎麼會出現兩個呢。繼續看之前的日誌,發現stage4571114被resubmit了;

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.547][org.apache.spark.scheduler.DAGScheduler]Resubmitting ShuffleMapStage 4571114 (map at MainApp.scala:73) because some of its tasks had failed: 0 [INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.547][org.apache.spark.scheduler.DAGScheduler]Submitting ShuffleMapStage 4571114 (MapPartitionsRDD[3719544] at map at MainApp.scala:73), which has no missing parents

檢視stage重新提交的程式碼,以下程式碼擷取自DAGScheduler.handleTaskCompletion方法

case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the // epoch incremented to refetch them. // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, shuffleStage.outputLocInMapOutputTrackerFormat(), changeEpoch = true) clearCacheLocs() if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { // Mark any map-stage jobs waiting on this stage as finished if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { markMapStageJobAsFinished(job, stats) } } }

可以看出只有shuffleStage.pendingPartitions為空同時shuffleStage.isAvailable為false的時候才會觸發resubmit,我們來看下這兩個變數是什麼時候開始,pendingPartitions表示現在正在處理的partition的數量,當task執行結束後會刪除,

val stage = stageIdToStage(task.stageId) event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) //從正在處理的partition中移除 stage.pendingPartitions -= task.partitionId

isAvaible判斷的是已經告知driver的shuffle資料位置的partition數目是否等於總共的partition數目
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
這個變數也是在ShuffleTask執行結束後進行更新的,不過需要注意的是,只有在Shuffle資料所在的executor還是可用的時候才進行更新,如果執行shuffleTask的executor已經掛了,肯定也無法通過該executor獲取磁碟上的shuffle資料

case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) }

唯一的可能造成重新排程的就是該處了,根據關鍵資訊查詢下日誌資訊

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:27.427][org.apache.spark.scheduler.DAGScheduler]Ignoring possibly bogus ShuffleMapTask(4571114, 0) completion from executor 4

但就算此時剛執行完shuffleTask的executor掛掉了,造成了stage的重新排程,也不會導致TasksetManager衝突,因為此時taskset.isZombie狀態肯定變了為true,因為TasksetManager.handleSuccessfulTask方法執行在DAGScheduler.handleTaskCompletion之前。
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie

TasksetManager.handleSuccessfulTask

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markSuccessful() removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. //最終會提交一個CompletionEvent事件到DAGScheduler的事件佇列中等待處理 sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } failedExecutors.remove(index) maybeFinishTaskSet() }

可能有的同學已經看出問題來了,為了將問題說的更明白,我畫了一個task執行成功的時序圖TasksetManager衝突導致SparkContext異常關閉
task執行成功時序圖


結合時序圖和程式碼我們可以看出DAGSchduler.handleCompletion執行發生在了TasksetManager.handleSuccessfulTask方法中isZombie變為true之前,handleSuccessfulTask是在task-result-getter執行緒中執行的,導致isZombie還未變為true,DAGSchduler就觸發了stage的重新提交,最終導致TaskManger衝突。
以下日誌分別是resubmit提交的時間和handleSuccessfuleTask的結束時間,從側面(由於isZombie變為true並沒有馬上列印時間)也能夠看出resubmit重新提交的時間早於handleSuccessfuleTask。

handleSuccessfuleTask結束時間 [INFO][task-result-getter-2][2017-03-03+22:16:29.999][org.apache.spark.scheduler.TaskSchedulerImpl]Removed TaskSet 4571114.1, whose tasks have all completed, from pool resubmit stage任務重新提交時間 [INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.549][org.apache.spark.scheduler.TaskSchedulerImpl]Adding task set 4571114.2 with 1 tasks

事件發生的時間軸TasksetManager衝突導致SparkContext異常關閉
事件時間 問題修復

該問題修復其實很簡單,只需要修改TasksetManager.handleSuccessfulTask的方法,在isZombie=true後再發送CompletionEvent事件即可,程式碼修復如下

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markSuccessful() removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. if (!successful(index)) { tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } failedExecutors.remove(index) maybeFinishTaskSet() sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) }