1. 程式人生 > >Spark-1.6.0之Application執行資訊記錄器JobProgressListener

Spark-1.6.0之Application執行資訊記錄器JobProgressListener

  JobProgressListener類是Spark的ListenerBus中一個很重要的監聽器,可以用於記錄Spark任務的Job和Stage等資訊,比如在Spark UI頁面上Job和Stage執行狀況以及執行進度的顯示等資料,就是從JobProgressListener中獲得的。另外,SparkStatusTracker也會從JobProgressListener中獲取Spark執行資訊。外部應用也可以通過Spark提供的status相關API比如AllJobResource, AllStagesResource, OneJobResource, OneStageResource獲取到Spark程式的執行資訊。
  JobProgressListener類的繼承關係,以及該類中重要的屬性和方法,見下圖
  JobProgressListener類圖


  
  在Spark-1.6.0中,JobProgressListener物件生成後,會被add到一個LiveListenerBus型別的ListenerBus中。LiveListenerBus類的基礎關係,以及該類中重要方法和屬性見下圖
  LiveListenerBus類圖
  文章接下來分析在一個Spark Application中JobProgressListener的生命週期,以及其資料接收和傳遞的過程。

一、JobProgressListener生成和資料寫入

1、JobProgressListener生成

  在原始碼中,JobProgressListener在SparkContext物件建立時就生成了,

private[spark] val listenerBus = new LiveListenerBus //listenerBus
private var _jobProgressListener: JobProgressListener = _ //定義
...
_jobProgressListener = new JobProgressListener(_conf) //生成
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener //使用
listenerBus.addListener(jobProgressListener) //使用

  從上面的程式碼中看到,jobProgressListener在生成後,spark將其存入了LiveListenerBus物件中,其他任何接收到listenerBus的地方都能從中獲取到這個jobProgressListener物件。另外在建立SparkUI物件時,使用到了_jobProgressListener物件,使得Spark UI頁面能夠從該物件中獲取Spark應用程式的執行時資料。或者也可以像SparkStatusTracker物件那樣,直接從SparkContext物件中獲取jobProgressListener。
  最後,在SparkContext中呼叫setupAndStartListenerBus()方法,啟動和初始化listenerBus。我們可以看到,在該方法中最後呼叫了listenerBus.start(this)方法真正啟動listenerBus。
  

2、JobProgressListener接收事件

(1)事件進入LiveListenerBus
  LiveListenerBus繼承自AsynchronousListenerBus,可以看到這裡是多執行緒的方式。裡面維持了一個大小為10000的eventQueue,LinkedBlockingDeque型別。這個可以和DAGScheduler中提到的EventLoop類中的eventQueue對比分析。
  eventQueue接收事件呼叫的是post方法,這裡呼叫的是LinkedBlockingDeque.offer方法,而EventLoop中呼叫的是LinkedBlockingDeque.put,可以比較這兩者的區別。

  def post(event: E) {
    if (stopped.get) {
      // Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    val eventAdded = eventQueue.offer(event) // 向eventQueue提交event
    if (eventAdded) { 
      eventLock.release() // 如果提交成功則釋放鎖
    } else {
      onDropEvent(event) // 否則丟棄該事件
    }
  }

  所以說,各類事件都是呼叫AsynchronousListenerBus.post方法傳入eventQueue中的。比如,在DAGScheduler類中,可以看到總共有14個呼叫的地方,下面列舉出其中12個不同的。

DAGScheduler方法 SparkListenerEvent事件 描述
executorHeartbeatReceived SparkListenerExecutorMetricsUpdate executor向master傳送心跳錶示BlockManager仍然存活
handleBeginEvent SparkListenerTaskStart task開始執行事件
cleanUpAfterSchedulerStop SparkListenerJobEnd Job結束事件
handleGetTaskResult SparkListenerTaskGettingResult task獲取結果事件
handleJobSubmitted SparkListenerJobStart Job開始事件
handleMapStageSubmitted SparkListenerJobStart Job開始事件
submitMissingTasks SparkListenerStageSubmitted Stage提交事件
handleTaskCompletion SparkListenerTaskEnd Task結束事件
handleTaskCompletion SparkListenerJobEnd Job結束事件
markStageAsFinished SparkListenerStageCompleted Stage結束事件
failJobAndIndependentStages SparkListenerJobEnd Job結束事件
markMapStageJobAsFinished SparkListenerJobEnd Job結束事件

  分析到這裡,各種SparkListenerEvent事件傳遞到了eventQueue中,那麼如何進一步傳遞到JobProgessListener中呢?接下來JobProgressListener作為消費者,從eventQueue中消費這些SparkListenerEvent。
  
(2)事件進入到JobProgressListener

  從SparkContext中啟動LiveListenerBus執行緒開始,LiveListenerBus繼承自AsynchronousListenerBus的run方法便一直在多執行緒執行。在run中有一段主要邏輯,

val event = eventQueue.poll
if (event == null) {
  // Get out of the while loop and shutdown the daemon thread
  if (!stopped.get) {
    throw new IllegalStateException("Polling `null` from eventQueue means" +
      " the listener bus has been stopped. So `stopped` must be true")
  }
  return
}
postToAll(event)

  從eventQueue取出事件後,呼叫LiveListenerBus的postToAll方法,將事件分發到各Listener中。
  具體看一下LiveListenerBus的postToAll方法,這個方法從ListenerBus繼承。

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

  // 維持一個Array來儲存add到該bus中的所有listener
  private[spark] val listeners = new CopyOnWriteArrayList[L]

  /**
   * 呼叫addListener方法會把傳入的listener物件存入listeners中
   */
  final def addListener(listener: L) {
    listeners.add(listener)
  }

  /**
   * spark通過呼叫這個方法,spark的各種事件都會觸發listenerBus中所有listener對該事件作出響應
   */
  final def postToAll(event: E): Unit = {
    val iter = listeners.iterator
    while (iter.hasNext) {
      val listener = iter.next()
      try {
      /**
      * onPostEvent方法在SparkListenerBus類中具體實現,針對不同的事件採取不同的方法
      * 比如stageSubmitted, stageCompleted, jobStart, jobEnd, taskStart, 
      * applicationStart, blockManagerAdded,executorAdded等事件
      * 分別呼叫SparkListener中不同方法進行處理
      */
        onPostEvent(listener, event) 
      } catch {
        case NonFatal(e) =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      }
    }
  }
}

2、JobProgressListener對各種事件的響應

  那麼接下來,從JobProgressListener對各種事件的響應方法出發,對其狀態變更邏輯作一個簡要梳理,很多方法從其命名上就能看出其主要功能,有需要的可以進入具體方法中做進一步的研究。JobProgressListener能做出響應的所有SparkListenerEvent事件,基本上都列在前面的表格中了。各類事件基本上都是從DAGScheduler中傳入的,可以參考Spark Scheduler模組原始碼分析之DAGScheduler
(1)Job級別資訊
  這裡主要涉及到Job開始和結束的兩個方法

  • onJobStart(SparkListenerJobStart)
      在Job開始時,獲取job的一些基本資訊,比如引數spark.jobGroup.id 確定的JobGroup。然後生成一個JobUIData物件,用於在Spark UI頁面上顯示Job的ID,提交時間,執行狀態,這個Job包含的Stage個數,完成、跳過、失敗的Stage個數。以及總的Task個數,以及完成、失敗、跳過、正在執行的Task個數。該Job中包含的所有Stage都存入pendingStages中。
  • onJobEnd(SparkListenerJobEnd)
      在Job完成時,根據該Job的最終狀態是成功還是失敗,分別把該job的相關資訊存入completedJob物件和failedJobs物件中,同時把成功或者失敗的job數加一。然後迴圈處理該Job的每一個Stage,將該Stage對應的當前Job移除,如果移除後發現該Stage再沒有其他Job使用了,就把該Stage從activeStage列表中移除。接下來,如果這個Stage的提交時間為空,則表示該Stage被跳過執行,更新一下skipped的Stage個數,以及skipped的Task個數。(成功和失敗的Stage的邏輯在下面一小節中)

(2)Stage級別資訊
  有關Stage的狀態變更處理邏輯,這裡也有Stage的submit和complete方法

  • onStageSubmitted(SparkListenerStageSubmitted)
      在Stage提交後,將該Stage存入activeStages中,並且從pendingStages中移除該Stage。首先獲得當前的排程池名稱,如果是FIFO模式,則是default(實際上不起任何作用),然後根據該排程池,將這個Stage放入排程池中。然後把所屬job的numActiveStages加一,
  • onStageCompleted(SparkListenerStageCompleted)
      在Stage完成後,從排程池中將該Stage移除,同時也從activeStages中移除。根據該Stage是成功還是失敗,繼續更新completedStages或failedStages,並更新這類Stage的統計數。然後把對應Job中activeStages值減一,如果這個Stage是成功的(判斷依據是failureReason為空),則把對應job的成功Stage數加一,否則把對應Job的失敗Stage數加一。
      
    (3)Task級別資訊
      有關Task的方法有task開始,結束兩個方法

  • onTaskStart(SparkListenerTaskStart)
      當一個Task開始執行時,會把對應Stage中active狀態的Task計數加一,並且把這個Task相關的資訊記入對應Stage中,同時更新該Task所屬Job中Active狀態Task的個數。

  • onTaskEnd(SparkListenerTaskEnd)
      當一個Task執行完成時,獲取該Task對應Stage的executorSummary資訊,這個executorSummary中記錄了每個Executor對應的ExecutorSummary資訊,其中包括task開始時間,失敗task個數,成功task個數,輸入輸出位元組數,shuffle read/write位元組數等。然後根據這個Task所屬的executorId,找到當前Task的執行統計資訊execSummary。如果這個Task執行成功,就將成功task個數加一,否則就將失敗task個數加一。然後根據Task執行狀態,更新對應Stage中失敗或成功Task個數。進一步,更新對應Job中失敗或成功的Task個數。

二、SparkUI頁面從JobProgressListener讀取資料

  JobProgressListener主要用在向Spark UI頁面傳遞資料上。