1. 程式人生 > >Akka源碼分析-Cluster-Distributed Publish Subscribe in Cluster

Akka源碼分析-Cluster-Distributed Publish Subscribe in Cluster

lin doc elong ror con div 隨機 ali 進行

  在ClusterClient源碼分析中,我們知道,他是依托於“Distributed Publish Subscribe in Cluster”來實現消息的轉發的,那本文就來分析一下Pub/Sub是如何實現的。

  還記得之前分析Cluster源碼的文章嗎?其實Cluster只是把集群內各個節點的信息通過gossip協議公布出來,並把節點的信息分發出來。但各個actor的地址還是需要開發者自行獲取或設計的,比如我要跟worker通信,那就需要知道這個actor在哪個節點,通過actorPath或actorRef通信。

  “Distributed Publish Subscribe”就是用來屏蔽Actor位置的一個組件,通過它你可以給actor發消息而不需要知道actor的網咯位置。其實就是提供了一個類似kafka的消息發布、訂閱的機制,其實吧,如果這個功能讓你實現,你準備怎麽做?肯定是在集群層面提供一個proxy,來屏蔽目標actor的網絡位置啊。簡單來說,就是提供一個通用的actor,來對消息進行轉發,發送者只需要提供目標actor的路徑就好了(比如/user/serviceA)。不過還是那句話,akka的都是對的,akka的都是好的。akka幫你實現這個事兒,就不用你自己考慮通用、穩定的問題啦。

  消息訂閱發布模式提供了一個中繼actor:akka.cluster.pubsub.DistributedPubSubMediator。它管理actor的註冊引用、分發實例引用給端actor,而且必須在所有的節點或一組節點內啟動。它可以通過DistributedPubSub擴展啟動,也可以像普通actor那樣啟動。

  服務actor的註冊是最終一致的,也就是說服務信息在變化時並不能立即通知給其他節點,過一段時間參會分發給所有節點。當然了每次都是以增量的信息分發這些信息。

  消息的發送有兩種模式:Send和Publish。簡單來說就是點對點、廣播。

  Publish模式下,只有註冊到命名的topic的actor才會收到消息,topic是啥?。其實這才是真正的訂閱、發布模式。

    class Subscriber extends Actor with ActorLogging {
      import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
      val mediator = DistributedPubSub(context.system).mediator
      // subscribe to the topic named "content"
      mediator ! Subscribe("content", self)

      def receive = {
        case s: String ?
          log.info("Got {}", s)
        case SubscribeAck(Subscribe("content", None, `self`)) ?
          log.info("subscribing")
      }
    }

    class Publisher extends Actor {
      import DistributedPubSubMediator.Publish
      // activate the extension
      val mediator = DistributedPubSub(context.system).mediator

      def receive = {
        case in: String ?
          val out = in.toUpperCase
          mediator ! Publish("content", out)
      }
    }

  上面是官方的demo,可以看出,訂閱者actor訂閱了名為“content”的topic,在發布者actor發送指定topic的消息時,會自動收到對應的消息。怎麽樣,是不是很簡單。其實吧,mediator只需要維護一個topic到訂閱者的映射列表就好了,當收到對應topic的消息時,取出對應的訂閱者(也就是ActorRef或actorSelection)把消息轉發給他就好了。

  Send模式就是一個點對點模式,每個消息被發送給一個目標,而不用知道這個目標actors的位置。既然之前我們說了,這是通過ActorPath發送的,那如果集群中同時有多個節點命中了這個ActorPath怎麽辦呢?那就路由唄,提供一個RoutingLogic 路由策略。默認策略是隨機發送,當然了我們是可以修改這個策略的。與Publish模式不同,這裏註冊服務actor是通過Put消息實現的。不過實現原理都差不多,反正都要維護列表。

    class Destination extends Actor with ActorLogging {
      import DistributedPubSubMediator.Put
      val mediator = DistributedPubSub(context.system).mediator
      // register to the path
      mediator ! Put(self)

      def receive = {
        case s: String ?
          log.info("Got {}", s)
      }
    }

    class Sender extends Actor {
      import DistributedPubSubMediator.Send
      // activate the extension
      val mediator = DistributedPubSub(context.system).mediator

      def receive = {
        case in: String ?
          val out = in.toUpperCase
          mediator ! Send(path = "/user/destination", msg = out, localAffinity = true)
      }
    }

  當然了,我們還是可以通過SendToAll把消息發送給所有命中指定path的actor的。

  這裏需要註意的是,官方的訂閱發布組件只能保證至少一次投遞,想想都是這樣的,哈哈。廢話不多說了,上代碼。

object DistributedPubSub extends ExtensionId[DistributedPubSub] with ExtensionIdProvider {
  override def get(system: ActorSystem): DistributedPubSub = super.get(system)

  override def lookup = DistributedPubSub

  override def createExtension(system: ExtendedActorSystem): DistributedPubSub =
    new DistributedPubSub(system)
}

  很顯然DistributedPubSub這個擴展也是可以通過配置直接實例化的,不需要我們自行寫代碼實例化。由於其源碼非常簡單就是定義並創建了mediator這個actor(DistributedPubSubMediator),下面直接轉到DistributedPubSubMediator源碼的分析。

/**
 * This actor manages a registry of actor references and replicates
 * the entries to peer actors among all cluster nodes or a group of nodes
 * tagged with a specific role.
 *
 * The `DistributedPubSubMediator` actor is supposed to be started on all nodes,
 * or all nodes with specified role, in the cluster. The mediator can be
 * started with the [[DistributedPubSub]] extension or as an ordinary actor.
 *
 * Changes are only performed in the own part of the registry and those changes
 * are versioned. Deltas are disseminated in a scalable way to other nodes with
 * a gossip protocol. The registry is eventually consistent, i.e. changes are not
 * immediately visible at other nodes, but typically they will be fully replicated
 * to all other nodes after a few seconds.
 *
 * You can send messages via the mediator on any node to registered actors on
 * any other node. There is three modes of message delivery.
 *
 * You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
 * [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
 * `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
 * local actor system as the mediator. `Subscribe` is used together with `Publish`.
 * Actors are automatically removed from the registry when they are terminated, or you
 * can explicitly remove entries with [[DistributedPubSubMediator.Remove]] or
 * [[DistributedPubSubMediator.Unsubscribe]].
 *
 * Successful `Subscribe` and `Unsubscribe` is acknowledged with
 * [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]]
 * replies.
 *
 * Not intended for subclassing by user code.
 */
@DoNotInherit
class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor with ActorLogging with PerGroupingBuffer

  PerGroupingBuffer這個trait不再分析源碼,從代碼和命名來看,就是給每個group提供一個消息緩存的列表。其實這個actor最重要的功能是要能夠感知集群節點的變化和對應服務actor的變化,並及時的把這些信息分發給其他DistributedPubSubMediator,還有就是能夠把消息路由給指定的訂閱者。為了簡化分析,我們忽略第一個功能點,只分析是如何路由消息的。分析這點需要關註幾個消息的處理邏輯:PutSubscribePublishSendSendToAll

  先來看Subscribe

    case msg @ Subscribe(topic, _, _) ?
      // each topic is managed by a child actor with the same name as the topic

      val encTopic = encName(topic)

      bufferOr(mkKey(self.path / encTopic), msg, sender()) {
        context.child(encTopic) match {
          case Some(t) ? t forward msg
          case None    ? newTopicActor(encTopic) forward msg
        }
      }

  Subscribe消息表明某個actor需要訂閱某個topic的消息,簡單來說就是先判斷是否需要緩存,不需要的話就執行{}代碼塊。很顯然,剛開始的時候是不需要緩存的。上面的邏輯就是從當前的children中查找encTopic的一個actor,然後把消息轉發給它;不存在則創建之後再轉發給它。那猜一下這個子actor的功能?其實吧,它應該是一個actor負責維護某個topic與所有訂閱者的關系,所有發給這個topic的消息都會轉發給所有的訂閱者。

def newTopicActor(encTopic: String): ActorRef = {
    val t = context.actorOf(Props(classOf[Topic], removedTimeToLive, routingLogic), name = encTopic)
    registerTopic(t)
    t
  }

  很顯然newTopicActor創建了Topic這個actor,名字就是topic的值,並傳入了兩個參數:removedTimeToLive、routingLogic。第二個是路由策略。

def registerTopic(ref: ActorRef): Unit = {
    put(mkKey(ref), Some(ref))
    context.watch(ref)
  }

  put這個函數的功能我們先略過,其功能大概是把這個actor註冊到系統內,把它與當前地址、版本號做關聯並保存,在適當的時機分發出去。

技術分享圖片

  Topic這個actor只有兩個方法,所以還需要去看下TopicLike的代碼。

技術分享圖片

  可以看到TopicLike中有一個subscribers列表,這也是預期之中的。這個actor的消息會被business和defaultReceive處理,business在Topic中重新實現了,且會優先處理。

case msg @ Subscribe(_, Some(group), _) ?
          val encGroup = encName(group)
          bufferOr(mkKey(self.path / encGroup), msg, sender()) {
            context.child(encGroup) match {
              case Some(g) ? g forward msg
              case None    ? newGroupActor(encGroup) forward msg
            }
          }
          pruneDeadline = None

  收到Subscribe消息後,做了跟DistributedPubSubMediator類似的邏輯,又創建了一個子actor(Group),並把消息轉發給了它。其實這一點在官方也有說過,也就是說,topic也是可以分組的,一個消息並不一定會發給所有訂閱者,可以發給一組訂閱者,其實吧,這一點我不太喜歡,感覺功能有點過了,如果要對topic劃分子topic,用戶自定義實現好了啊,搞得現在源碼這麽復雜。

class Group(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike {
      def business = {
        case SendToOneSubscriber(msg) ?
          if (subscribers.nonEmpty)
            Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(wrapIfNeeded(msg), sender())
      }
    }

  Group的代碼還這麽簡單,它又把Subscribe發給了TopicLike的defaultReceive

def defaultReceive: Receive = {
        case msg @ Subscribe(_, _, ref) ?
          context watch ref
          subscribers += ref
          pruneDeadline = None
          context.parent ! Subscribed(SubscribeAck(msg), sender())

  上面是defaultReceive對Subscribe消息的處理,就是watch,然後把訂閱者添加到subscribers列表中,再告訴父actor(就是Topic這個actor)訂閱成功了。

  聰明的讀者可能會問了,為啥topic還需要弄個消息緩存呢?其實吧,如果是我實現,肯定不搞這麽麻煩啊。消息丟了就丟了啊,沒有訂閱者的時候,消息緩存起來等有訂閱者的時候再發送出去?哈哈,有點浪費內存啊。不過為了穩定性、功能性、完善性,akka還是做了很多額外努力的。不過吧,建議還是把這個隊列的大小調小一點,要不然太浪費內存了。不過很不幸的告訴你,目前沒有這個開關。

  既然訂閱topic的邏輯跟我們的猜測差不多,那麽發布消息的邏輯就應該也符合我們的猜測嘍。其實就是獲取某個topic對應的訂閱者,然後foreach把消息發出去。

case Publish(topic, msg, sendOneMessageToEachGroup) ?
      if (sendOneMessageToEachGroup)
        publishToEachGroup(mkKey(self.path / encName(topic)), msg)
      else
        publish(mkKey(self.path / encName(topic)), msg)

  簡單起見,我們只分析消息不分組的情況

def publish(path: String, msg: Any, allButSelf: Boolean = false): Unit = {
    val refs = for {
      (address, bucket) ← registry
      if !(allButSelf && address == selfAddress) // if we should skip sender() node and current address == self address => skip
      valueHolder ← bucket.content.get(path)
      ref ← valueHolder.ref
    } yield ref
    if (refs.isEmpty) ignoreOrSendToDeadLetters(msg)
    else refs.foreach(_.forward(msg))
  }

  還記得registry什麽時候賦值的嘛?如果忘了,可以翻翻registerTopic的代碼,因為我沒有分析,哈哈。不過不重要了,其實就是獲取當前的Topic的Group的ActorRef,然後把消息轉發給它。

        case msg ?
          subscribers foreach { _ forward msg }

  Group繼承的TopicLike中的defaultReceive方法處理了消息,其實就是把消息轉發給所有的subscribers。

  pub/sub的邏輯就分析到這裏了,其實這裏面的邏輯還是有點復雜的,當然了有一部分是因為topic分組帶來的,其他的都是gossip協議分發訂閱者、發布者的相關信息帶來的。

  下面分析Send模式。從Put消息的處理入手。

case Put(ref: ActorRef) ?
      if (ref.path.address.hasGlobalScope)
        log.warning("Registered actor must be local: [{}]", ref)
      else {
        put(mkKey(ref), Some(ref))
        context.watch(ref)
      }

  這就有點簡單了,就是把ref註冊一下,然後watch。這個ref的key是ActorRef值,其實就是ActorPath.toString

    case Send(path, msg, localAffinity) ?
      val routees = registry(selfAddress).content.get(path) match {
        case Some(valueHolder) if localAffinity ?
          (for {
            routee ← valueHolder.routee
          } yield routee).toVector
        case _ ?
          (for {
            (_, bucket) ← registry
            valueHolder ← bucket.content.get(path)
            routee ← valueHolder.routee
          } yield routee).toVector
      }

      if (routees.isEmpty) ignoreOrSendToDeadLetters(msg)
      else Router(routingLogic, routees).route(wrapIfNeeded(msg), sender())

  其實就是從registry中優先找當前節點的訂閱者,然後通過Router和指定的策略把消息發送出去,這個比pub/sub模式稍微簡單點。wrapIfNeeded的功能不再分析,其實就是為了防止與用戶本身的路由消息發生沖突。

  關於節點信息同步,感興趣的讀者可以自行閱讀源碼,不過我看下來還是有幾個問題的。比如當前註冊信息的版本是通過時間戳來標誌的,如果節點間時間不同步,會發生意外的結果啊;另外所謂的gossip協議,其實就是隨機把註冊信息發送給其他節點,也就是說集群內的節點都會把消息按照心跳時間,把註冊信息隨機發送給本身節點以外的節點,達到最終註冊信息的同步。如果是我來實現,直接就是粗暴的廣播註冊信息,哈哈,不過這在集群規模比較大的時候比較耗時,啊哈哈。

Distributed Publish Subscribe in Cluster

Akka源碼分析-Cluster-Distributed Publish Subscribe in Cluster