1. 程式人生 > >Akka筆記之訊息傳遞

Akka筆記之訊息傳遞

英文原文連結譯文連結,原文作者:Arun Manivannan ,譯者:有孚

在Akka筆記第一篇的介紹中,我們大致介紹了下Akka工具箱中的Actor。在第二篇當中,我們來看一下Actor訊息傳遞的功能。這裡還是延用之前使用的那個學生-老師的例子。

在Actor訊息的第一部分中,我們會建立一個老師的Actor,但學生Actor則先不建立,而是使用一個叫做StudentSimulatorApp的主程式。

仔細回顧下學生-老師模型

我們現在只考慮StudentSimulatorApp傳送給TeacherActor的訊息。這裡我所說的StudentSimulatorApp指的是一個正常的主程式。

從圖中可以看到:
(如果有陌生的術語,沒關係,後面我們會詳細解釋的)

1. 學生建立了一個叫ActorSystem的東西。
2. 他通過ActorSystem來建立了一個叫ActorRef的物件。QuoteRequest訊息就是傳送給ActorRef的(它是TeacherActor的一個代理)
3. ActorRef將訊息傳送給Dispatcher
4. Dispatcher將訊息投遞到目標Actor的郵箱中。
5. 隨後Dispatcher將Mailbox扔給一個執行緒去執行(這點下節會重點講到)
6. MailBox將訊息出隊並最終將其委託給真實的Teacher Actor的接收方法去處理。

正如我所說的,看不懂也別擔心。現在我們來一步步地詳細地分析下。全部講完後你可以再回過頭來看下這五個步驟。

STUDENTSIMULATORAPP程式

我們用這個STUDENTSIMULATORAPP來啟動JVM並初始化ActorSystem。

從圖中可以看到,StudentSimulatorApp

1. 建立了一個ActorSystem
2. 通過ActorSystem建立了一個Teacher Actor的代理(ActorRef)
3. 將QuoteRequest訊息傳送給代理

我們現在只關注這三點。

1. 建立ActorSystem

ActorSystem是進入到Actor的世界的一扇大門。通過它你可以建立或中止Actor。甚至還可以把整個Actor環境給關閉掉。

另一方面來說,Actor是一個分層的結構,ActorSystem之於Actor有點類似於java.lang.Object或者scala.Any的角色——也就是說,它是所有Actor的根物件。當你通過ActorSystem的actorOf方法建立了一個Actor時,你其實建立的是ActorSystem下面的一個Actor。

初始化ActorSystem的程式碼是這樣的:

val system=ActorSystem("UniversityMessageSystem")





UniversityMessageSystem只是你給ActorSystem起的一個可愛的名字而已。

2. 建立一個TeacherActor的代理?

我們來看下下面這段程式碼:

val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

actorOf是ActorSystem中建立Actor的方法。但是正如你所看到的,它並不會返回我們所需要的TeacherActor。它返回的是一個ActorRef。

這個ActorRef扮演了真實的Actor的一個代理的角色。客戶端並不會直接和Actor通訊。這也正是Actor模型中避免直接訪問TeacherActor中任何的自定義/私有方法或者變數的一種方式。

再重複一遍,訊息只會傳送給ActorRef,最終才會到達真正的Actor。你是絕對無法直接和Actor進行通訊的。如果你真的找到了什麼拙劣的方式來直接通訊,大家會恨你入骨的。

image

將訊息傳送給代理

還是隻有一行程式碼。你只需告訴說把QuoteRequest訊息傳送到ActorRef就好了。Actor中的這個告訴的方式就是一個!號。(ActorRef中確實也有一個tell方法,不過它只是把這個呼叫委託給了!號)

//send a message to the Teacher Actor
teacherActorRef!QuoteRequest


這就可以了!

如果你認為我在騙你的話,看一下下面StudentSimulatorApp的完整程式碼:

STUDENTSIMULATORAPP.SCALA
package me.rerun.akkanotes.messaging.actormsg1

import akka.actor.ActorSystem  
import akka.actor.Props  
import akka.actor.actorRef2Scala  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._


object StudentSimulatorApp extends App{

  //Initialize the ActorSystem
  val actorSystem=ActorSystem("UniversityMessageSystem")

  //construct the Teacher Actor Ref
  val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])

  //send a message to the Teacher Actor
  teacherActorRef!QuoteRequest

  //Let's wait for a couple of seconds before we shut down the system
  Thread.sleep (2000) 

  //Shut down the ActorSystem.
  actorSystem.shutdown()

} 


好吧,我承認我撒了點小謊。你還得關掉ActorSystem,不然JVM會一直執行下去的。我還讓主執行緒睡眠了一小會兒,以便給點時間讓TeacherActor去完成它的任務。我知道這聽起來很愚蠢。別擔心。後面我們會通過些優雅的測試用例來替換掉這種取巧的方式。

訊息

我們剛傳送了一個QuoteMessage給ActorRef,但是,還壓根兒沒看著過這個訊息類呢!

說曹操,曹操到:

(實踐中推薦你把訊息封裝成一個好點的物件,這樣維護起來容易些)

TeacherProtocol
package me.rerun.akkanotes.messaging.protocols

object TeacherProtocol{

  case class QuoteRequest()
  case class QuoteResponse(quoteString:String)

}


正如你所想的那樣,QuoteRequest就是傳送給TeacherActor的那個訊息。Actor會回覆一個QuoteResponse。

分發器及郵箱

ActorRef把訊息處理功能委託給了Dispatcher。實際上,當我們建立ActorSystem和ActorRef的時候,就已經建立了一個Dispatcher和MailBox了。我們來看下它們是幹什麼的。

郵箱

每個Actor都有一個MailBox(後面會介紹一種特殊的情況)。在我們這個比喻當中,每個老師也有一個郵箱。老師得去檢查郵箱並處理訊息。在Actor的世界中,則是另一種形式——郵箱一有機會就會要求Actor去完成自己的任務。

同樣的,郵箱裡也有一個佇列來以FIFO的方式來儲存並處理訊息——它和實際的郵箱還有點不同,真實的郵箱新的信總是在最上面的。

現在講到分發器了

Dispatcher會完成一些很酷的事。從它的角度來看,它只是從ActorRef中取出一條訊息然後將它傳給了MailBox。但是,在這後面發生了一件不可意義的事情:

Dispatcher會封裝一個ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去執行。

看下Dispatcher裡面的一段程式碼:

protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {  
    ...
    try {
        executorService execute mbox
    ...
}


什麼,你是說要執行一下郵箱?

是的。我們看到MailBox中包含了佇列裡面的訊息。由於Executor得去執行MailBox,所以它得是一個Thread型別。是的沒錯。MailBox的宣告及構造器就是這樣的。

下面是MailBox的簽名信息。

private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable


TeacherActor

當MailBox的run方法執行的時候,它會從佇列中取出一條訊息,然後將它傳給Actor去處理。

當你把訊息傳給ActorRef的時候,最終呼叫的實際是目標Actor裡面的一個receive方法。

TeacherActor只是一個很簡單的類,它有一個名言的列表,而receive方法很明顯就是用來處理訊息的。

來看下程式碼:

TeacherActor.scala

package me.rerun.akkanotes.messaging.actormsg1

import scala.util.Random

import akka.actor.Actor  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._

/*
 * Your Teacher Actor class. 
 * 
 * The class could use refinement by way of  
 * using ActorLogging which uses the EventBus of the Actor framework
 * instead of the plain old System out
 * 
 */

class TeacherActor extends Actor {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  def receive = {

    case QuoteRequest => {

      import util.Random

      //Get a random Quote from the list and construct a response
      val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size)))

      println (quoteResponse)

    }

  }

}

TeacherActor的receive方法的模式匹配只會匹配一種訊息——QuoteRequest (事實上,模式匹配中最好匹配下預設的情況,不過這個就說來話長了)

receive方法做的就是

1. 匹配QuoteRequest的模式
2. 從名言列表中隨機選取一條
3. 構造出一個QuoteResponse
4. 將QuoteResponse列印到控制檯上

程式碼

整個專案的程式碼可以從Github中下載到。

本文最早釋出於我的個人部落格: Java譯站