1. 程式人生 > >Akka中Actor訊息通訊的實現原理(原始碼解析)

Akka中Actor訊息通訊的實現原理(原始碼解析)

Akka中通過下面的方法向actor傳送訊息

  • ! tell 意味著 “fire-and-forget”,即非同步的傳送訊息無需等待返回結果
  • ? ask 非同步傳送訊息並返回代表可能回覆的Future。

    訊息在每個發件人的基礎上是有序的。

MailBox

Akka郵箱包含發往Actor的訊息。通常每個Actor都有自己的郵箱,但是也有例外,比如BalancingPool所有路由將共享一個郵箱例項。

其中MessageQueue(akka.dispatch.MessageQueue)是形成Akka郵箱的心元件之一。
傳送給Actor的普通訊息將被排入佇列(並隨後出佇列)它至少需要支援N個生產者和1個消費者的執行緒安全。 它實現了入佇列,出佇列等方法

  def enqueue(receiver: ActorRef, handle: Envelope): Unit
  def dequeue(): Envelope
  def numberOfMessages: Int
  def hasMessages: Boolean
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit

其中Envelope封裝了message:Any和sender:ActorRef兩個成員

final case class Envelope private (val message: Any, val sender: ActorRef)

SystemMessageQueue提供了systemEnqueue(入佇列)和systemDrain(全部出佇列)方法。MailBox繼承自系統訊息佇列SystemMessageQueue和ForkJoinTask,實現了Runnable介面,同時包含ActorCell成員和MessageQueue成員

private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
  var
actor: ActorCell = _ }

其中ForkJoinTask是用少數執行緒執行海量獨立任務的極好架構(獨立任務指的是任務和任務之間不要有共享資料,否則會有併發訪問的問題)
MailBox代理了MessageQueue的所有方法。MessageQueue的具體型別,根據MailBoxType的不同而不同。

tell 操作

在建立ActorSystem時,初始化預設的dispatcher,預設ForkJoinPool(ExecutorService)
在使用actorRef ! Message傳送訊息時,呼叫了actorCell對應的sendMessage方法,其中呼叫了dispatcher.dispatch方法

可以在ActorRef中可以看到

    def ! (message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

在ActorCell.scala中

    final def sendMessage(message: Any, sender: ActorRef): Unit =
        sendMessage(Envelope(message, sender, system))

之後可以追蹤到dungeon的Dispatch.scala檔案

  def sendMessage(msg: Envelope): Unit =
    try {
      val msgToDispatch =
        if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
        else msg

      dispatcher.dispatch(this, msgToDispatch)
    } catch handleException

而程式碼裡的dispatcher.dispatch可以在dispatch.Dispatcher中找到:

     /**
      * INTERNAL API
      */
     protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
       val mbox = receiver.mailbox
       mbox.enqueue(receiver.self, invocation)
       registerForExecution(mbox, true, false)
     }

     protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
       if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
         if (mbox.setAsScheduled()) {
           try {
             executorService execute mbox
             true
           } catch {
             case e: RejectedExecutionException ⇒
               try {
                 executorService execute mbox
                 true
               } catch { //Retry once
                 case e: RejectedExecutionException ⇒
                   mbox.setAsIdle()
                   eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
                   throw e
               }
           }
         } else false
       } else false
     }

dispatch方法做了兩件事情:
一是將訊息放到actorCell的訊息佇列中(maiBox 是 ActorCell 的成員變數)
二是呼叫dispather底層的執行緒池executor execute mbox執行mbox.run()(mailBox繼承了
Runnable 介面所以能放入ExecutorService 中執行),

  override final def run(): Unit = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }

  /**
     * Process the messages in the mailbox
     */
    @tailrec private final def processMailbox(
      left:       Int  = java.lang.Math.max(dispatcher.throughput, 1),
      deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
      if (shouldProcessMessage) {
        val next = dequeue()
        if (next ne null) {
          if (Mailbox.debug) println(actor.self + " processing message " + next)
          actor invoke next
          if (Thread.interrupted())
            throw new InterruptedException("Interrupted while processing actor messages")
          processAllSystemMessages()
          if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
            processMailbox(left - 1, deadlineNs)
        }
      }

執行mbox.run()中,先從SystemMessage連結串列中處理系統訊息,
然後從MessageQueue成員中處理使用者訊息。
處理使用者訊息時,run 是一個遞迴函式,每次呼叫處理一個訊息,
處理邏輯通過呼叫actorCell的invoke方法實現,根據dispatcher
的throughput決定處理多少條訊息,
根據dispatcher的throughputDeadlineTime決定處理多長時間,
長度和時間在處理完一條訊息後檢查一次。

  final def invoke(messageHandle: Envelope): Unit = {
    val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
    try {
      currentMessage = messageHandle
      if (influenceReceiveTimeout)
        cancelReceiveTimeout()
      messageHandle.message match {
        case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
        case msg                      ⇒ receiveMessage(msg)
      }
      currentMessage = null // reset current message after successful invocation
    } catch handleNonFatalOrInterruptedException { e ⇒
      handleInvokeFailure(Nil, e)
    } finally {
      if (influenceReceiveTimeout)
        checkReceiveTimeout // Reschedule receive timeout
    }
  }

 final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)

對 PoisonKill, Terminate 系統訊息的處理在 autoReceiveMessage 中,
對普通訊息的處理在 receiveMessage 中,

private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack

可以看到behaviorStack 是一個 List[Actor.Receive],

type Receive = PartialFunction[Any, Unit]

其中Receive (PartialFunction[Any, Unit])函式就是我們寫的對 message 的處理邏輯。
因為 Actor 支援通過 become/unbecome 切換形態,
所以behaviorStack.head就是當前的Receive處理邏輯。

對於ForkJoinPool這種executor,每次執行execute(mbox)時,實
際上都是先建立一個繼承自ForkJoinTask的MailboxExecutionTask,
其中的exec方法呼叫mbox.run方法,因此每次執行都會建立一個ForkJoinTask物件。

還有一點,訊息佇列都是放到actor對應的mailbox中(以Envelope的形式封裝訊息本身和sender),
而執行的task物件會放到Executor的每個執行緒對應的工作佇列中,task和訊息分別使用不同的佇列。