1. 程式人生 > >Akka源碼分析-Event Bus

Akka源碼分析-Event Bus

接口 ble mea times sta ive 保存 refs 知識

  akka中的EventBus其實是不常用,也最容易被忽略的一個組件。

  但如果你深入Cluster的實現就會發現,這個東西其實還挺有用的,而且它是ActorSystem系統中所有事件消息的一個橫切面,通過它你可以訂閱特定類型的消息,然後做出相應的動作。那讀者可能會問了,這個訂閱消息也很簡單的啊,我自己實現不就好了。嗯,其實你這個想法是對的,akka所有的功能都是基於actor和Actor模型的,所有復雜的功能實現起來都不是特別麻煩,至少實現的模型不會很復雜。不過你可能用不好這個EventBus,因為你並不一定會用,或者說不知道什麽時候用。

  對於Event Bus,也就是事件總線,普通場景下個人建議不要使用。Event Bus會使本來就復雜的消息通信更加復雜, 如果不用,開發過程中你明確知道跟某個actor通信的都有哪些actor,也就是說他們之間的通信協議是明確的。僅僅做到這一點,就會使actor系統很復雜了,再用個Event Bus把事件發送出去,會導致消息更加分散,某種意義上也是一種耦合。比如你把消息A發布出去,但卻不知道誰在訂閱它,如果某個版本升級你不消息忘了發布這個消息,那其他actor還能正常工作嗎?這明顯是給自己找麻煩。

  那什麽時候用呢?或者說使用的時候都有哪些限制呢?大概有兩種情況吧:1.發布的都是系統消息,跟業務無關;2.為了考慮系統後期的擴展和升級(當然了需要滿足第一個條件)。第一個規則是啥意思呢?就是你發布的消息不會變化或者不會有大的變化,比如只是發布了某個特定actor啟動、停止、退出的系統消息,這些消息無論格式還是內容都是固定的。如果後期系統功能升級,需要監控這些消息,由於消息固定,所以不會給版本帶來很大的問題。再加上不是業務消息,所以也不會給業務造成什麽影響。

  廢話不多說,來看看它的實現。當然EventBus實現比較復雜,簡單起見,我們只分析Event Stream。

// this provides basic logging (to stdout) until .start() is called below
  val eventStream = new EventStream(this, DebugEventStream)
  eventStream.startStdoutLogger(settings)

  在ActorSystemImpl中有上面兩行代碼,創建了一個eventStream,官方文檔說,提供了一個基本的日誌功能。其實這句話我覺得不應該說,容易給大家造成誤解。大家肯定想,既然這個是用來做日誌的,就沒啥用了唄。如果有這個認識的話,再對akka做擴展的時候會走很大的彎路。其實akka系統通過eventStream發布了很多重要的系統消息,比如actor生命周期狀態、remote模式下網絡生命周期事件,如果能夠合理的使用好這些系統消息,會給我們帶來極大的方便,偷偷的告訴你,cluster就是訂閱了一些網絡狀態事件實現了許多重要的功能。

/**
 * An Akka EventStream is a pub-sub stream of events both system and user generated,
 * where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object.
 * EventStreams employ SubchannelClassification, which means that if you listen to a Class,
 * you‘ll receive any message that is of that type or a subtype.
 *
 * The debug flag in the constructor toggles if operations on this EventStream should also be published
 * as Debug-Events
 */
class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingBus with SubchannelClassification 

  Akka EventStream是一個發布-訂閱事件流,包括系統和用戶產生的數據。訂閱某個特定類型的消息,不一定會收到對應的消息,前提是你自己或系統調用EventStream的發布接口把消息發布了出去。

/**
 * Classification which respects relationships between channels: subscribing
 * to one channel automatically and idempotently subscribes to all sub-channels.
 */
trait SubchannelClassification { this: EventBus ?

  SubchannelClassification,子頻道分類器,根據官方描述大概知道,它會自動的訂閱所有子頻道的消息。大概是會自動訂閱某個父類所有子類的消息吧。頻道是啥?當然是一個類或者接口了啊。

  LoggingBus具體做啥的就不分析了,反正是跟記日誌有關的。不過從它的繼承關系來看,它直接決定了EventStream是一個EventBus的某個子類。這個繼承關系我覺得官方實現的不夠合理,畢竟記日誌只是EventStream一個功能。EventStream首先應該是一個EventBus,只不過混入了Logging的功能而已,現在直接繼承LoggingBus從而繼承EventBus,顯得不夠優化!

class DeadLetterListener extends Actor {
  def receive = {
    case d: DeadLetter ? println(d)
  }
}

val listener = system.actorOf(Props[DeadLetterListener])
system.eventStream.subscribe(listener, classOf[DeadLetter])

  這是官方的一個例子,非常簡單,就是調用subscribe方法,訂閱了DeadLetter類型的消息,把消息發送給DeadLetterListener這個actor。那麽來看看subscribe如何實現,不過在這之前還需要看看它是如何初始化的。在ActorSystem的start方法中調用了eventStream.startUnsubscriber(),對eventStream實現了初始化。

  /**
   * ‘‘Must‘‘ be called after actor system is "ready".
   * Starts system actor that takes care of unsubscribing subscribers that have terminated.
   */
  def startUnsubscriber(): Unit =
    // sys may be null for backwards compatibility reasons
    if (sys ne null) EventStreamUnsubscriber.start(sys, this)

  其中sys就是我們傳入的ActorSystem實例。

/**
 * INTERNAL API
 *
 * Provides factory for [[akka.event.EventStreamUnsubscriber]] actors with **unique names**.
 * This is needed if someone spins up more [[EventStream]]s using the same [[akka.actor.ActorSystem]],
 * each stream gets it‘s own unsubscriber.
 */
private[akka] object EventStreamUnsubscriber {

  private val unsubscribersCount = new AtomicInteger(0)

  final case class Register(actor: ActorRef)

  final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef)

  private def props(eventStream: EventStream, debug: Boolean) =
    Props(classOf[EventStreamUnsubscriber], eventStream, debug)

  def start(system: ActorSystem, stream: EventStream) = {
    val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream")
    system.asInstanceOf[ExtendedActorSystem]
      .systemActorOf(props(stream, debug), "eventStreamUnsubscriber-" + unsubscribersCount.incrementAndGet())
  }

}

  官方說EventStreamUnsubscriber是個工廠類,用來給EventStreamUnsubscriber提供一個唯一的名字,如果開發者啟動了多個EventStream不至於會出現沖突。其實吧,個人覺得完全沒必要,多創建一個EventStream,這都屬於高級用法了,akka還沒普及,遠到不了這個地步。

/**
 * INTERNAL API
 *
 * Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated.
 *
 * Assumptions note:
 * We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor,
 * thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to
 * needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down
 * subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes
 * watching a few actors too much - we opt for the 2nd choice here.
 */
protected[akka] class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor 

  從官方註釋來看,EventStreamUnsubscriber是所有訂閱eventStream的監督者,當訂閱者(也就是某個actor)stop的時候,把對應的訂閱消息移除,以便發送不必要的消息。那EventStreamUnsubscriber和EventStream的關系是怎麽樣的呢?其實吧,這裏又做了一個分層,EventStreamUnsubscriber負責監控對應的actor,把消息發送個它,而EventStream負責訂閱相關的狀態維護。

  初始化完成後,下面來看subscribe的實現。

override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
    registerWithUnsubscriber(subscriber)
    super.subscribe(subscriber, channel)
  }

@tailrec
  private def registerWithUnsubscriber(subscriber: ActorRef): Unit = {
    // sys may be null for backwards compatibility reasons
    if (sys ne null) initiallySubscribedOrUnsubscriber.get match {
      case value @ Left(subscribers) ?
        if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber)))
          registerWithUnsubscriber(subscriber)

      case Right(unsubscriber) ?
        unsubscriber ! EventStreamUnsubscriber.Register(subscriber)
    }
  }

/** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */
  private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))

  initiallySubscribedOrUnsubscriber的定義還是很奇怪的,不過根據上下文來分析,registerWithUnsubscriber應該就是給EventStreamUnsubscriber發送EventStreamUnsubscriber.Register(subscriber)消息,然後調用super.subscribe

def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized {
    val diff = subscriptions.addValue(to, subscriber)
    addToCache(diff)
    diff.nonEmpty
  }

  super.subscribe是在SubchannelClassification中實現的。

  // must be lazy to avoid initialization order problem with subclassification
  private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()

  第一行的addVelue,應該就是把類型和對應的Subscriber做索引,當然了同一個Classifier是可以有多個訂閱者的。Subscriber是啥?當然是一個ActorRef了。這個在EventStream繼承的ActorEventBus中定義。

@volatile
  private var cache = Map.empty[Classifier, Set[Subscriber]]

  cache其實就是一個map,保存類型與訂閱者集合的映射。邏輯是不是也很清晰呢?簡單來說,訂閱某個消息,就是把消息的類型和對應的actorRef做一個綁定,然後在某個對應類型的消息產生時,調用actorRef的tell函數就行了。

def publish(event: Event): Unit = {
    val c = classify(event)
    val recv =
      if (cache contains c) cache(c) // c will never be removed from cache
      else subscriptions.synchronized {
        if (cache contains c) cache(c)
        else {
          addToCache(subscriptions.addKey(c))
          cache(c)
        }
      }
    recv foreach (publish(event, _))
  }

  那我們來看看publish的具體實現,EventStream中定義了Event就是一個AnyRef,其實就是可以發布任意引用類型的消息。這段代碼也比較容易理解,在分析classify之前可以猜一猜,其實就是找出傳入的AnyRef具體類型,然後從cache中找到對應的訂閱者,在調用publish發布消息。

  protected def classify(event: AnyRef): Class[_] = event.getClass

  EventStream重寫了classify函數,很簡單,就是getClass。

protected def publish(event: AnyRef, subscriber: ActorRef) = {
    if (sys == null && subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! event
  }

  publish呢?就是調用subscriber的! 方法,把消息發送出去。

  其實分析到這裏,基本就結束了,特別簡單。訂閱消息就是把對應的類型和actor關聯起來,publish的時候通過消息的類型找到對應的訂閱者(也就是actor),把消息發給訂閱者就結束了,自己實現也特別簡單。不過為了通用和穩定,akka還是做了很多工作的。比如某個actor被Terminat的時候,可以自動取消訂閱,畢竟actor還可能意外終止,沒有來得及調用unsubscribe方法取消訂閱。

  EventStream就分析到這裏了,不過介紹這個知識點有兩個出發點。首先這個EventStream作為所有消息的截面,特殊情況下,還是很有用的。另外就是在分析cluster的時候,這個點還是比較重要的,畢竟cluster用eventStream實現了某些特殊功能,雖然這點我不太喜歡。

Akka源碼分析-Event Bus