1. 程式人生 > >Akka併發程式設計——第四節:Actor模型(三)

Akka併發程式設計——第四節:Actor模型(三)

本將主要內容:
1. Actor引用、Actor路徑

1. Actor引用、Actor路徑

下圖是Akka官方文件中給出的一張圖

這裡寫圖片描述

該圖清晰地說明了ActorPath,ActorRef,Actor及ActorSystem之間的關係,並說明了Actor整體的層次結構。前面我們提到,Akka應用程式會持有一個名稱為user的Actor,該Actor被稱為guardian supervisor(守衛監督器),無論是ActorSystem建立的Actor還是通過ActorContext建立的Actor都為user的子類,它是最頂級的Actor。

(一)ActorRef

對於ActorRef,我們已經很熟悉了,通過呼叫ActorSystem.actorOf方法可以建立Actor,返回的便是ActorRef,例如程式碼

 //建立FirstActor物件
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

返回的便是FirstActor的ActorRef物件,ActorRef最重要的作用便是向Actor傳送訊息,例如

//向myactor傳送訊息
myactor!"test"
myactor! 123

另外,還可以通過context隱式物件獲取父Actor和子Actor的ActorRef,示例程式碼如下:

/*
*Actor API:成員變數self及sender()方法的使用
*/
object Example_07
extends App{
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor with ActorLogging{ //通過context.actorOf方法建立Actor var child:ActorRef = _ override def preStart(): Unit ={ log.info("preStart() in FirstActor") //通過context上下文建立Actor
child = context.actorOf(Props[MyActor], name = "myActor") } def receive = { //向MyActor傳送訊息 case x => child ! x;log.info("received "+x) } } class MyActor extends Actor with ActorLogging{ var parentActorRef:ActorRef=_ override def preStart(): Unit ={ //通過context.parent獲取其父Actor的ActorRef parentActorRef=context.parent } def receive = { case "test" => log.info("received test");parentActorRef!"message from ParentActorRef" case _ => log.info("received unknown message"); } } val system = ActorSystem("MyActorSystem") val systemLog=system.log //建立FirstActor物件 val myactor = system.actorOf(Props[FirstActor], name = "firstActor") //獲取ActorPath val myActorPath=system.child("firstActor") //通過system.actorSelection方法獲取ActorRef val myActor1=system.actorSelection(myActorPath) systemLog.info("準備向myactor傳送訊息") //向myActor1傳送訊息 myActor1!"test" myActor1! 123 Thread.sleep(5000) //關閉ActorSystem,停止程式的執行 system.shutdown() }

程式碼執行結果

[INFO] [04/02/2016 20:28:08.941] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 20:28:08.942] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from ParentActorRef
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

程式碼

 class MyActor extends Actor with ActorLogging{
    var parentActorRef:ActorRef=_
    override def preStart(): Unit ={
      //通過context.parent獲取其父Actor的ActorRef
      parentActorRef=context.parent
    }
    def receive = {
      case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
      case _      => log.info("received unknown message");
    }

  }

中,使用

//通過context.parent獲取其父Actor的ActorRef
      parentActorRef=context.parent

獲取MyActor 的直接父Actor的ActorRef,在本例中為FirstActor,因為在FirstActor中,我們使用

 //通過context上下文建立Actor
      child = context.actorOf(Props[MyActor], name = "myActor")

建立了MyActor,FirstActor便自動成為MyActor的直接Supervisor。如此便可以通過程式碼

parentActorRef!"message from ParentActorRef"

傳送訊息。

另外,還可以通過

//建立FirstActor物件
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  //獲取ActorPath
  val myActorPath=system.child("firstActor")
  //通過system.actorSelection方法獲取ActorRef
  val myActor1=system.actorSelection(myActorPath)

system.actorSelection(myActorPath)方法獲取相應的ActorRef。然後再使用獲取到的ActorRef向Acto傳送訊息

 //向myActor1傳送訊息
  myActor1!"test"
  myActor1! 123

現在,讓我們來總結一下獲取ActorRef的方法:
(1)通過ActorSystem的actorOf方法,不過這種方式是通過建立Actor,然後返回其ActorRef
(2)通過context.actorOf方法,這種方式也是通過建立Actor,然後返回其ActorRef
(3)通過context.parent、context.self、context.children方法獲取當前Actor的父Actor、當前Actor及子Actor的ActorRef,這種方式是獲取已經建立好的Actor的ActorRef
(4)通過val myActor1=system.actorSelection(myActorPath)方法來獲取ActorRef,這種方式也是獲取已經建立好的Actor的ActorRef

(二)ActorPath

在前面的例子中,我們通過

 val myActorPath=system.child("firstActor")

已經使用到了ActorPath。在Akka中,ActorPath採用統一資源定位符的方式進行組織,例如

//本地ActorPath
"akka://my-sys/user/service-a/worker1" 
//遠端ActorPath
"akka.tcp://[email protected]:5678/user/service-b"

本地ActorPath是Akka併發程式設計中的ActorPath表示方式,而遠端ActorPath是Akka分散式程式設計中常用的ActorPath表示方式,”akka.tcp://[email protected]:5678/user/service-b”中的TCP表示使用的是TCP協議,也可以使用UPD協議,使用UDP協議的遠端ActorPath表示方式有如下形式

"akka.udp://my-sys@host.example.com:5678/user/service-b"

ActorPath當中,akka.udp表示的是使用的協議,my-sys表示的是ActorSytem名稱,host.example.com:5678為遠端機器地址及埠,user為guardian supervisor,service-b為當前應用程式的頂級Actor(通過system.actorOf方法建立的)

/*
*ActorPath
*/
object Example_08 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通過context.actorOf方法建立Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通過context上下文建立Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor傳送訊息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test");
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //建立FirstActor物件
  val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")

  //獲取ActorPath
  val firstActorPath=system.child("firstActor")
  systemLog.info("firstActorPath--->{}",firstActorPath)


  //通過system.actorSelection方法獲取ActorRef
  val myActor1=system.actorSelection(firstActorPath)

  //直接指定其路徑
  val myActor2=system.actorSelection("/user/firstActor")
  //使用相對路徑
  val myActor3=system.actorSelection("../firstActor")


  systemLog.info("準備向myactor傳送訊息")
  //向myActor1傳送訊息
  myActor2!"test"
  myActor2! 123
  Thread.sleep(5000)
  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

程式碼執行結果:

[INFO][04/02/2016 21:04:59.612][main][ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor
[INFO][04/02/2016 21:04:59.612][MyActorSystem-akka.actor.default-dispatcher-2][akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO][04/02/2016 21:04:59.615][main][ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor] received test
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor] received 123
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-3][akka://MyActorSystem/user/firstActor/myActor] received test
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-3][akka://MyActorSystem/user/firstActor/myActor] received unknown message

本例中的重點程式碼如下

//獲取ActorPath
  val firstActorPath=system.child("firstActor")
  systemLog.info("myActorPath--->{}",firstActorPath)


  //通過system.actorSelection方法獲取ActorRef
  val myActor1=system.actorSelection(firstActorPath)

  //直接指定其路徑
  val myActor2=system.actorSelection("/user/firstActor")
  //使用相對路徑
  val myActor3=system.actorSelection("../firstActor")

通過 val firstActorPath=system.child(“firstActor”)構造一個ActorPath,然後使用

val myActor1=system.actorSelection(firstActorPath)

獲取路徑下的ActorRef,除這種方式外,還可以通過絕對路徑 val myActor2=system.actorSelection(“/user/firstActor”)及相對路徑 val myActor3=system.actorSelection(“../firstActor”)的方式獲取ActorRef
,需要注意的是絕對路徑使用的是guardian supevisor,即/user/firstActor的這種方式。

如果要獲取myActor,則基方法是型別的,例如

//獲取ActorPath
  val myActorPath=system.child("firstActor").child("myActor")
  systemLog.info("firstActorPath--->{}",myActorPath)


  //通過system.actorSelection方法獲取ActorRef
  val myActor1=system.actorSelection(myActorPath)

  //直接指定其路徑
  val myActor2=system.actorSelection("/user/firstActor/myActor")
  //使用相對路徑
  val myActor3=system.actorSelection("../firstActor/myActor")

完整程式碼如下:

/*
*ActorPath,獲取myActor
*/
object Example_09 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通過context.actorOf方法建立Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通過context上下文建立Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor傳送訊息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test");
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //建立FirstActor物件
  val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")

  //獲取ActorPath
  val myActorPath=system.child("firstActor").child("myActor")
  systemLog.info("firstActorPath--->{}",myActorPath)


  //通過system.actorSelection方法獲取ActorRef
  val myActor1=system.actorSelection(myActorPath)

  //直接指定其路徑
  val myActor2=system.actorSelection("/user/firstActor/myActor")
  //使用相對路徑
  val myActor3=system.actorSelection("../firstActor/myActor")


  systemLog.info("準備向myactor傳送訊息")
  //向myActor1傳送訊息
  myActor1!"test"
  myActor1! 123
  Thread.sleep(5000)
  //關閉ActorSystem,停止程式的執行
  system.shutdown()
}

程式碼執行結果:

[INFO][04/02/2016 21:21:14.377][main][ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor/myActor
[INFO][04/02/2016 21:21:14.377][MyActorSystem-akka.actor.default-dispatcher-2][akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO][04/02/2016 21:21:14.381][main][ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO][04/02/2016 21:21:14.382][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor/myActor] received test
[INFO][04/02/2016 21:21:14.382][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor/myActor] received unknown message

關於遠端ActorRef的獲取,我們將在後續章節中進行講述。

Scala學習(公眾微訊號:ScalaLearning)每天為大家帶來一點Scala語言、Spark、Kafka、Flink、AKKA等大資料技術乾貨及相關技術資訊。技術永無止境,勇攀高峰,一往直前!
覺得文章不錯?掃描關注
這裡寫圖片描述