1. 程式人生 > >Akka併發程式設計——第二節:Actor模型(一)

Akka併發程式設計——第二節:Actor模型(一)

本節主要內容

  1. 定義Actor
  2. 建立Actor

1. 定義Actor

通過擴充套件akka.actor.Actor 特質並實現receive方法來定義Actor,程式碼示例如下

//通過擴充套件Actor並實現receive方法來定義Actor
class MyActor extends Actor {
    //獲取LoggingAdapter,用於日誌輸出
    val log = Logging(context.system, this)

    //實現receive方法,定義Actor的行為邏輯,返回的是一個偏函式
    def receive = {
      case
"test" => log.info("received test") case _ => log.info("received unknown message") } }

receive方法被定義在Actor當中,方法標籤如下

//Actor中的receive方法定義,
type Receive = PartialFunction[Any, Unit]
def receive: Actor.Receive

下面給出其完整使用程式碼:

object Example_01 extends App{
  import akka.actor.Actor
  import
akka.event.Logging import akka.actor.ActorSystem import akka.actor.Props class MyActor extends Actor { val log = Logging(context.system, this) def receive = { case "test" => log.info("received test") case _ => log.info("received unknown message") } } //建立ActorSystem物件
val system = ActorSystem("MyActorSystem") //返回ActorSystem的LoggingAdpater val systemLog=system.log //建立MyActor,指定actor名稱為myactor val myactor = system.actorOf(Props[MyActor], name = "myactor") systemLog.info("準備向myactor傳送訊息") //向myactor傳送訊息 myactor!"test" myactor! 123 //關閉ActorSystem,停止程式的執行 system.shutdown() }

程式碼執行結果:

[INFO] [04/02/2016 09:29:54.223] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test
[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received unknown message

輸出“[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test”中的[MyActorSystem-akka.actor.default-dispatcher-3]為對應的執行緒名,[akka://MyActorSystem/user/myactor]為Actor路徑資訊, received test為

def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }

方法處理後的輸出。關於[akka://MyActorSystem/user/myactor]路徑資訊,將在後續內容中進行詳細闡述。

也可以通過混入ActorLogging來實現日誌功能,具體程式碼如下:

class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

ActorLogging的定義如下:

trait ActorLogging { this: Actor ⇒
  private var _log: LoggingAdapter = _

  def log: LoggingAdapter = {
    // only used in Actor, i.e. thread safe
    if (_log eq null)
      _log = akka.event.Logging(context.system, this)
    _log
  }

}

完整程式碼如下:

/*
 *定義Actor時混入ActorLogging
 */
object Example_02 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

  //建立ActorSystem物件
  val system = ActorSystem("MyActorSystem")
  //返回ActorSystem的LoggingAdpater
  val systemLog=system.log
  //建立MyActor,指定actor名稱為myactor
  val myactor = system.actorOf(Props[MyActor], name = "myactor")

  systemLog.info("準備向myactor傳送訊息")
  //向myactor傳送訊息
  myactor!"test"
  myactor! 123

  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

程式碼執行結果:

[INFO] [04/02/2016 09:39:21.088] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 09:39:21.089] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test
[INFO] [04/02/2016 09:39:21.089] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received unknown message

程式碼原理與Example_01類似,這裡不再贅述。

2. 建立Actor

在前面兩個例子中,通過

 val myactor = system.actorOf(Props[MyActor], name = "myactor")

建立Actor,需要注意的是system.actorOf方法返回的是ActorRef物件,ActorRef為Actor的引用,使用ActorRef物件可以進行訊息的傳送等操作。Props為配置物件,在建立Actor時使用,它是不可變的物件,因此它是執行緒案例且完全可共享的。Akka中建立Actor時,也允許直接傳入MyActor物件的引用,例如

//直接通過new MyActor的方式傳入MyActor物件的引用,注意這裡是Props(new MyActor)
val myactor = system.actorOf(Props(new MyActor), name = "myactor")

但是Akka不推薦這麼做,官方文件給出的解釋是這種方式會導致不可序列化的Props物件且可能會導致競爭條件(破壞Actor的封裝性)。另外需要特別注意的是,不允許通過下列程式碼建立Actor

//下列兩行程式碼編譯可以通過,但執行時出丟擲異常
  val  myActor=new MyActor
  val myactor = system.actorOf(Props(myActor), name = "myactor")

完整執行程式碼如下:

/*
 *建立Actor
 */
object Example_03 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //下列兩行程式碼編譯可以通過,但執行時出丟擲異常
  val  myActor=new MyActor
  val myactor = system.actorOf(Props(myActor), name = "myactor")

  systemLog.info("準備向myactor傳送訊息")
  //向myactor傳送訊息
  myactor!"test"
  myactor! 123

  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

執行結果如下:

Exception in thread "main" akka.actor.ActorInitializationException: You cannot create an instance of [chapter02.Example_03$MyActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.
    at akka.actor.ActorInitializationException$.apply(Actor.scala:167)
    at akka.actor.Actor$class.$init$(Actor.scala:423)
    at chapter02.Example_03$MyActor.<init>(MyActor.scala:73)
    at chapter02.Example_03$delayedInit$body.apply(MyActor.scala:84)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at chapter02.Example_03$.main(MyActor.scala:68)
    at chapter02.Example_03.main(MyActor.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

從“You cannot create an instance of [chapter02.Example_03$MyActor] explicitly using the constructor (new). You have to use one of the ‘actorOf’ factory methods to create a new actor.”可以看到,不能通過顯式地呼叫建構函式建立Actor,只能使用actorOf工廠方法建立Actor。

下面介紹2種在實際中經常使用的Actor建立方法
(1)呼叫system.actorOf建立Actor

val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor2")

完整程式碼在Example_01、Example_02中已經演示過了,這裡需要說明的是通過system.actorOf工廠方法建立的Actor為頂級Actor
這裡寫圖片描述
在Akka框架中,每個Akka應用程式都會有一個守衛Actor,名稱為user,所有通過system.actorOf工廠方法建立的Actor都為user的子Actor,也是整個Akka程式的頂級Actor。

(2)呼叫context.actorOf建立Actor
完整程式碼如下:

/*
 *建立Actor,呼叫context.actorOf方法
 */
object Example_04 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props


  class FirstActor extends Actor with ActorLogging{
    //通過context.actorOf方法建立Actor
    val child = context.actorOf(Props[MyActor], name = "myChild")
    def receive = {
      case x => child ! x;log.info("received "+x)
    }

  }


  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //建立FirstActor物件
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

  systemLog.info("準備向myactor傳送訊息")
  //向myactor傳送訊息
  myactor!"test"
  myactor! 123
  Thread.sleep(5000)
  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

程式碼執行結果

[INFO] [04/02/2016 15:05:34.770] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myChild] received test
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myChild] received unknown message

通過程式碼的執行結果可以看到,FirstActor的Actor路徑資訊為akka://MyActorSystem/user/firstActor,而通過

class FirstActor extends Actor with ActorLogging{
    //通過context.actorOf方法建立Actor
    val child = context.actorOf(Props[MyActor], name = "myChild")
    def receive = {
      case x => child ! x;log.info("received "+x)
    }

  }

程式碼使用context.actorOf建立的MyActor,其Actor路徑資訊為[akka://MyActorSystem/user/firstActor/myChild],這意味著mychild為firstActor的子Actor,層次結構如下圖所示
這裡寫圖片描述

也就是說context.actorOf和system.actorOf的差別是system.actorOf建立的actor為頂級Actor,而context.actorOf方法建立的actor為呼叫該方法的Actor的子Actor