1. 程式人生 > >Spark2.1.0事件匯流排分析——LiveListenerBus詳解

Spark2.1.0事件匯流排分析——LiveListenerBus詳解

LiveListenerBus繼承了SparkListenerBus,並實現了將事件非同步投遞給監聽器,達到實時重新整理UI介面資料的效果。LiveListenerBus主要由以下部分組成:

  • eventQueue:是SparkListenerEvent事件的阻塞佇列,佇列大小可以通過Spark屬性spark.scheduler.listenerbus.eventqueue.size進行配置,預設為10000(Spark早期版本中屬於靜態屬性,固定為10000,這導致佇列堆滿時,只得移除一些最老的事件,最終導致各種問題與bug);
  • started:標記LiveListenerBus的啟動狀態的AtomicBoolean型別的變數;
  • stopped:標記LiveListenerBus的停止狀態的AtomicBoolean型別的變數;
  • droppedEventsCounter:使用AtomicLong型別對刪除的事件進行計數,每當日誌列印了droppedEventsCounter後,會將droppedEventsCounter重置為0;
  • lastReportTimestamp:用於記錄最後一次日誌列印droppedEventsCounter的時間戳;
  • processingEvent:用來標記當前正有事件被listenerThread執行緒處理;
  • logDroppedEvent:AtomicBoolean型別的變數,用於標記是否由於eventQueue已滿,導致新的事件被刪除;
  • eventLock:用於當有新的事件到來時釋放訊號量,當對事件進行處理時獲取訊號量;
  • listeners:繼承自LiveListenerBus的監聽器陣列;
  • listenerThread:處理事件的執行緒。

非同步事件處理執行緒

         listenerThread用於非同步處理eventQueue中的事件,為了便於說明,這裡將展示listenerThread及LiveListenerBus中的主要程式碼片段,見程式碼清單1。

程式碼清單1         LiveListenerBus主要邏輯的程式碼片段

  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

  private def validateAndGetQueueSize(): Int = {
    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    if (queueSize <= 0) {
      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
    }
    queueSize
  }

  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)
  private val droppedEventsCounter = new AtomicLong(0L)
  @volatile private var lastReportTimestamp = 0L
  private var processingEvent = false
  private val logDroppedEvent = new AtomicBoolean(false)
  private val eventLock = new Semaphore(0)

  private val listenerThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      LiveListenerBus.withinListenerThread.withValue(true) {
        while (true) {
          eventLock.acquire() // 獲取訊號量
          self.synchronized {
            processingEvent = true
          }
          try {
            val event = eventQueue.poll //從eventQueue中獲取事件
            if (event == null) {
              // Get out of the while loop and shutdown the daemon thread
              if (!stopped.get) {
                throw new IllegalStateException("Polling `null` from eventQueue means" +
                  " the listener bus has been stopped. So `stopped` must be true")
              }
              return
            }
            postToAll(event) // 事件處理
          } finally {
            self.synchronized {
              processingEvent = false
            }
          }
        }
      }
    }
  }

通過分析程式碼清單1,listenerThread的工作步驟為:

  1. 不斷獲取訊號量(當可以獲取訊號量時,說明還有事件未處理);
  2. 通過同步控制,將processingEvent設定為true;
  3. 從eventQueue中獲取事件;
  4. 呼叫超類ListenerBus的postToAll方法(postToAll方法對監聽器進行遍歷,並呼叫SparkListenerBus的doPostEvent方法對事件進行匹配後執行監聽器的相應方法);
  5. 每次迴圈結束依然需要通過同步控制,將processingEvent設定為false;

值得一提的是,listenerThread的run方法中呼叫了Utils的tryOrStopSparkContext,tryOrStopSparkContext方法可以保證當listenerThread的內部迴圈丟擲異常後啟動一個新的執行緒停止SparkContext(SparkContext的內容將在第4章詳細介紹,tryOrStopSparkContext方法的具體實現請閱讀《附錄A Spark2.1核心工具類Utils》)。

LiveListenerBus的訊息投遞

         在解釋了非同步執行緒listenerThread的工作內容後,還有一個要點沒有解釋:eventQueue中的事件是如何放進去的呢?由於eventQueue定義在LiveListenerBus中,因此ListenerBus和SparkListenerBus中並沒有操縱eventQueue的方法,要將事件放入eventQueue只能依靠LiveListenerBus自己了,其post方法就是為此目的而生的,見程式碼清單2。

程式碼清單2        向LiveListenerBus投遞SparkListenerEvent事件

  def post(event: SparkListenerEvent): Unit = {
    if (stopped.get) {
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    val eventAdded = eventQueue.offer(event) // 向eventQueue中新增事件
    if (eventAdded) {
      eventLock.release()
    } else {
      onDropEvent(event)
      droppedEventsCounter.incrementAndGet()
    }
    // 列印刪除事件數的日誌
    val droppedEvents = droppedEventsCounter.get
    if (droppedEvents > 0) {
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
            new java.util.Date(prevLastReportTimestamp))
        }
      }
    }
  }

從程式碼清單2看到post方法的處理步驟如下:

  1. 判斷LiveListenerBus是否已經處於停止狀態;
  2. 向eventQueue中新增事件。如果新增成功,則釋放訊號量進而催化listenerThread能夠有效工作。如果eventQueue已滿造成新增失敗,則移除事件,並對刪除事件計數器droppedEventsCounter進行自增;
  3. 如果有事件被刪除,並且當前系統時間距離上一次列印droppedEventsCounter超過了60秒則將droppedEventsCounter列印到日誌。

LiveListenerBus與監聽器

         與LiveListenerBus配合使用的監聽器,並非是父類SparkListenerBus的型別引數SparkListenerInterface,而是繼承自SparkListenerInterface的SparkListener及其子類。圖1列出了Spark中監聽器SparkListener以及它的6種最常用的實現[1]

圖1     SparkListener的類繼承體系

SparkListener雖然實現了SparkListenerInterface中的每個方法,但是其實都是空實現,具體的實現需要交給子類去完成。

         《Spark2.1.0之原始碼分析——事件匯流排》中首先對事件匯流排的介面定義進行了一些介紹,之後《Spark2.1.0事件匯流排分析——ListenerBus的繼承體系》一文展示了ListenerBus的繼承體系,然後《Spark2.1.0事件匯流排分析——SparkListenerBus詳解》選擇ListenerBus的子類SparkListenerBus進行分析,最後本文選擇LiveListenerBus作為具體的實現例子進行分析,這裡將通過圖2更加直觀的展示ListenerBus、SparkListenerBus及LiveListenerBus的工作原理。

圖2     LiveListenerBus的工作流程圖

最後對於圖2作一些補充說明:圖中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件來源,它們都是通過呼叫LiveListenerBus的post方法將訊息交給非同步執行緒listenerThread處理的。

[1] 除了本節列出的的六種SparkListener的子類外,還有很多其他的子類,這裡就不一一列出了,感興趣的讀者可以查閱Spark相關文件或閱讀原始碼知曉。

關於《Spark核心設計的藝術 架構設計與實現》

經過近一年的準備,《Spark核心設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:

紙質版售賣連結如下: