1. 程式人生 > >Akka併發程式設計——第三節:Actor模型(二)

Akka併發程式設計——第三節:Actor模型(二)

本節主要內容:

  1. Actor API解析

1. Actor API解析

Actor中的主要成員變數和方法定義如下:

package akka.actor
trait Actor extends scala.AnyRef {
  type Receive = akka.actor.Actor.Receive

  //context變數暴露當前Actor的上下文資訊及當前訊息
  implicit val context : akka.actor.ActorContext = { /* compiled code */ }

  //self作為當前ActorRef的引用
  implicit final val self : akka.actor.ActorRef = { /* compiled code */ }

  //當前Actor接收到最後一條訊息對應的訊息傳送者(Actor)
  final def
sender() :
akka.actor.ActorRef = { /* compiled code */ } //receive方法,抽象方法,定義Actor的行為邏輯 def receive : akka.actor.Actor.Receive //內部使用API protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ } protected[akka] def aroundPreStart
() :
scala.Unit = { /* compiled code */ } protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ } protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } protected[akka] def aroundPostRestart(reason : scala.Throwable)
:
scala.Unit = { /* compiled code */ } //監督策略,用於Actor容錯處理 def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ } //Hook方法,用於Actor生命週期監控 @scala.throws[T](classOf[scala.Exception]) def preStart() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postStop() : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ } @scala.throws[T](classOf[scala.Exception]) def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ } //傳送給Actor的訊息,Actor沒有定義相應的處理邏輯時,會呼叫此方法 def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ } } object Actor extends scala.AnyRef { type Receive = scala.PartialFunction[scala.Any, scala.Unit] //空的行為邏輯 @scala.SerialVersionUID(1) object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive { def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ } def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ } } //Sender為null @scala.SerialVersionUID(1) final val noSender : akka.actor.ActorRef = { /* compiled code */ } }

(1) Hook方法,preStart()、postStop()方法的使用

/*
 *Actor API: Hook方法
 */
  object Example_05 extends App{
    import akka.actor.Actor
    import akka.actor.ActorSystem
    import akka.actor.Props


    class FirstActor extends Actor with ActorLogging{
      //通過context.actorOf方法建立Actor
      var child:ActorRef = _

      //Hook方法,preStart(),Actor啟動之前呼叫,用於完成初始化工作
      override def preStart(): Unit ={
        log.info("preStart() in FirstActor")
        //通過context上下文建立Actor
        child = context.actorOf(Props[MyActor], name = "myChild")
      }
      def receive = {
        //向MyActor傳送訊息
        case x => child ! x;log.info("received "+x)
      }

      //Hook方法,postStop(),Actor停止之後呼叫
      override def postStop(): Unit = {
        log.info("postStop() in FirstActor")
       }
    }


    class MyActor extends Actor with ActorLogging{
      //Hook方法,preStart(),Actor啟動之前呼叫,用於完成初始化工作
      override def preStart(): Unit ={
        log.info("preStart() in MyActor")
      }
      def receive = {
        case "test" => log.info("received test")
        case _      => log.info("received unknown message")
      }

      //Hook方法,postStop(),Actor停止之後呼叫
      override def postStop(): Unit = {
        log.info("postStop() in MyActor")
      }
    }

    val system = ActorSystem("MyActorSystem")
    val systemLog=system.log

    //建立FirstActor物件
    val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

    systemLog.info("準備向myactor傳送訊息")
    //向myactor傳送訊息
    myactor!"test"
    myactor! 123
    Thread.sleep(5000)
    //關閉ActorSystem,停止程式的執行
    system.shutdown()
  }

程式碼執行結果:

[INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message
[INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor
[INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor

在程式碼

 class FirstActor extends Actor with ActorLogging{
      //通過context.actorOf方法建立Actor
      var child:ActorRef = _

      //Hook方法,preStart(),Actor啟動之前呼叫,用於完成初始化工作
      override def preStart(): Unit ={
        log.info("preStart() in FirstActor")
        //通過context上下文建立Actor
        child = context.actorOf(Props[MyActor], name = "myChild")
      }
      def receive = {
        //向MyActor傳送訊息
        case x => child ! x;log.info("received "+x)
      }

      //Hook方法,postStop(),Actor停止之後呼叫,用於完成初始化工作
      override def postStop(): Unit = {
        log.info("postStop() in FirstActor")
      }
    }

中分別對postStop、preStart方法進行了重寫,在preStart方法中通過程式碼

 child = context.actorOf(Props[MyActor], name = "myChild")

對成員變數child進行初始化,然後在postStop方法中使用

 //通過context上下文停止MyActor的執行
        context.stop(child)

停止MyActor的執行。在使用程式碼

//建立FirstActor物件
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

建立FirstActor時,便會呼叫preStart方法完成MyActor的建立,因此首先會執行FirstActor中的preStart()方法

dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor

然後在建立MyActor時執行MyActor中定義的preStart方法

[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor

在執行

//關閉ActorSystem,停止程式的執行
    system.shutdown()

FirstActor作為MyActor的Supervisor,會先停止MyActor,再停止自身,因此先呼叫MyActor的postStop方法,再呼叫FirstActor的postStop方法。

(2) 成員變數self及成員方法sender方法的使用

整體程式碼如下:

  /*
 *Actor API:成員變數self及sender()方法的使用
 */
  object Example_05 extends App{
    import akka.actor.Actor
    import akka.actor.ActorSystem
    import akka.actor.Props


    class FirstActor extends Actor with ActorLogging{
      //通過context.actorOf方法建立Actor
      var child:ActorRef = _

      override def preStart(): Unit ={
        log.info("preStart() in FirstActor")
        //通過context上下文建立Actor
        child = context.actorOf(Props[MyActor], name = "myActor")
      }
      def receive = {
        //向MyActor傳送訊息
        case x => child ! x;log.info("received "+x)
      }


    }


    class MyActor extends Actor with ActorLogging{
      self!"message from self reference"
      def receive = {
        case "test" => log.info("received test");sender()!"message from MyActor"
        case "message from self reference"=>log.info("message from self refrence")
        case _      => log.info("received unknown message");
      }

    }

    val system = ActorSystem("MyActorSystem")
    val systemLog=system.log

    //建立FirstActor物件
    val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

    systemLog.info("準備向myactor傳送訊息")
    //向myactor傳送訊息
    myactor!"test"
    myactor! 123
    Thread.sleep(5000)
    //關閉ActorSystem,停止程式的執行
    system.shutdown()
  }

執行結果:

[INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

程式碼:

class MyActor extends Actor with ActorLogging{
      self!"message from self reference"
      def receive = {
        case "test" => log.info("received test");sender()!"message from MyActor"
        case "message from self reference"=>log.info("message from self refrence")
        case _      => log.info("received unknown message");
      }

    }

中使用

self!"message from self reference"

向自身傳送了一條訊息,receive方法通過

        case "message from self reference"=>log.info("message from self refrence")

對這條訊息進行處理。receive方法在處理

def receive = {
        case "test" => log.info("received test");sender()!"message from MyActor"

“test”訊息時,會呼叫

sender()!"message from MyActor"

向sender(本例中為FirstActor)傳送”message from MyActor”訊息,FirstActor使用

 def receive = {
        //MyActor傳送訊息
        case x => child ! x;log.info("received "+x)
      }

處理訊息時又向MyActor回送該訊息,因此最終的輸出有兩個unknown message,分別對應123和”message from MyActor”

[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

(3) unhandled方法的使用

unhandled方法用於處理沒有被receive方法處理的訊息,下面的程式碼給出的是當不重寫unhandled方法時的程式碼

/*
*Actor API:unhandled方法
*/
object Example_06 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props
  class FirstActor extends Actor with ActorLogging{
    def receive = {
      //向MyActor傳送訊息
      case "test" => log.info("received test")
    }


  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //建立FirstActor物件
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

  systemLog.info("準備向myactor傳送訊息")
  //向myactor傳送訊息
  myactor!"test"
  myactor! 123
  Thread.sleep(5000)
  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

程式碼輸出:

[INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test

不難看出,對於

 myactor! 123

傳送的這條訊息沒有被處理,沒有任何的處理邏輯。在實際開發過程中,可能會對不能被處理的訊息增加一些應對邏輯,此時可以重寫unhandled方法,程式碼如下:

/*
*Actor API:unhandled方法的使用
*/
object Example_06 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props


  class FirstActor extends Actor with ActorLogging{
    def receive = {
      //向MyActor傳送訊息
      case "test" => log.info("received test")
    }

    //重寫unhandled方法
    override def unhandled(message: Any): Unit = {
      log.info("unhandled message is {}",message)
    }
  }



  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //建立FirstActor物件
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

  systemLog.info("準備向myactor傳送訊息")
  //向myactor傳送訊息
  myactor!"test"
  myactor! 123
  Thread.sleep(5000)
  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

程式碼輸出結果:

[INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123

其它如preRestart等方法的使用將在Akka容錯部分進行講解。

Scala學習(公眾微訊號:ScalaLearning)每天為大家帶來一點Scala語言、Spark、Kafka、Flink、AKKA等大資料技術乾貨及相關技術資訊。技術永無止境,勇攀高峰,一往直前!
覺得文章不錯?掃描關注
這裡寫圖片描述