1. 程式人生 > >【圖文詳細 】Scala——Akka Actor

【圖文詳細 】Scala——Akka Actor

4、Akka Actor 

 

4.1、Akka 概述 

Akka 基於 Actor 模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快 速響應的(Responsive)應用程式的平臺。 
 
Actor 模型:在電腦科學領域,Actor 模型是一個平行計算(Concurrent Computation)模型, 它把 actor 作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個 actor 能夠自 己做出一些決策,如建立更多的 actor,或傳送更多的訊息,或者確定如何去響應接收到的 下一個訊息。 

Actor 是 Akka 中最核心的概念,它是一個封裝了狀態和行為的物件,Actor 之間可以通過交 換訊息的方式進行通訊,每個 Actor 都有自己的收件箱(Mailbox)。通過 Actor 能夠簡化鎖 及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統。 
 
Actor 具有如下特性:

1、提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下 的程式設計開發

2、提供了非同步非阻塞的、高效能的事件驅動程式設計模型

3、超級輕量級事件處理(每 GB 堆記憶體幾百萬 Actor) 

 

4.2、重要 API 介紹 


4.2.1、ActorSystem 
在 Akka 中,ActorSystem 是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中, ActorSystem 通常是一個單例物件,我們可以使用這個 ActorSystem 的 actorOf 方法建立很多 Actor。 

4.2.2、Actor 
在 Akka 中,Actor 負責通訊,在 Actor 中有一些重要的生命週期方法。 
1、preStart()方法:該方法在 Actor 物件構造方法執行後執行,整個 Actor 生命週期中僅執行 一次。

2、receive()方法:該方法在 Actor 的 preStart 方法執行完成後執行,用於接收訊息,會被反 復執行。 

4.2.3、ActorSystem 和 Actor 對比 
Actor: 就是用來做訊息傳遞的 用來接收和傳送訊息的,一個 Actor 就相當於是一個老師或者是學生。 如果我們想要多個老師,或者學生,就需要建立多個 Actor 例項。 
ActorSystem: 用來建立和管理 Actor,並且還需要監控 Actor。ActorSystem 是單例的(object) 在同一個程序裡面,只需要一個 ActorSystem 就可以了 

4.3、利用 Akka 構建 RPC 應用案例 

4.3.1、需求 
目前大多數的分散式架構底層通訊都是通過 RPC 實現的,RPC 框架非常多,比如前我們學過 的 Hadoop 專案的 RPC 通訊框架,但是 Hadoop 在設計之初就是為了執行長達數小時的批量 而設計的,在某些極端的情況下,任務提交的延遲很高,所有 Hadoop 的 RPC 顯得有些笨重。 
Spark 的 RPC 是通過 Akka 類庫實現的,Akka 用 Scala 語言開發,基於 Actor 併發模型實現, Akka 具有高可靠、高效能、可擴充套件等特點,使用 Akka 可以輕鬆實現分散式 RPC 功能。 

4.3.2、應用架構 

4.2.3、具體實現 
Master 程式碼實現: 

class Master extends  Actor{ 
  def doHello(): Unit ={     println("我是 Master, 我接收到了 Worker 的 hello 的訊息"); 
  } 
  /** 
    * 其實就是一個死迴圈 : 接收訊息
    * while(true) 
    */ 
  override def receive: Receive = { 
case "hello" =>{ 
      doHello() 
      //sender()   誰傳送過來訊息這個就是誰
       //sender()   ! "hi"  給 sender() 傳送一個 hi的訊息 
      sender() ! "hi" 
    } 
  } 
} 
 
object Master { 
 
  def main(args: Array[String]): Unit = { 
    val str= 
      """ 
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider" 
        |akka.remote.netty.tcp.hostname = localhost 
        |akka.remote.netty.tcp.port = 6790 
      """.stripMargin 
 
    val conf: Config = ConfigFactory.parseString(str) 
    // def apply(name: String, config: Config) 
    val actorSystem = ActorSystem("MasterActorSystem", conf) 
    // 建立並啟動 actor   def actorOf(props: Props, name: String): ActorRef 
    //new Master() 會導致主建構函式會執行!!
     actorSystem.actorOf(Props(new Master()), "MasterActor") 
  } 
}

Worker 程式碼實現: 

class Worker extends Actor{// 生命週期
   def doHi(): Unit ={
     println("我是 Worker,我接收到了 Master 的 hi 的訊息"); 
  } 
   // 如果 actor一執行首先執行的是這個方法,只執行一次。 
  override def preStart(): Unit = { 
 // 實現的是給 Master 傳送訊息  地址
     val workerActor = 
context.actorSelection("akka.tcp://[email protected]:6790/user/MasterActo
r") 
    workerActor ! "hello" 
  } 
 
  override def receive: Receive = { 
    case "hi"  => { 
      doHi() 
    } 
  } 
} 
object Worker { 
  def main(args: Array[String]): Unit = { 
 
    val str= 
      """ 
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider" 
        |akka.remote.netty.tcp.hostname = localhost 
      """.stripMargin 
 
    val conf = ConfigFactory.parseString(str) 
    val actorSystem = ActorSystem("WorkerActorSystem", conf) 
    actorSystem.actorOf(Props(new Worker()), "WorkerActor") 
  } 
}

 

4.2.4、執行測試 
先啟動 Master,再啟動 Worker