【圖文詳細 】Scala——Akka Actor
4、Akka Actor
4.1、Akka 概述
Akka 基於 Actor 模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快 速響應的(Responsive)應用程式的平臺。
Actor 模型:在電腦科學領域,Actor 模型是一個平行計算(Concurrent Computation)模型, 它把 actor 作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個 actor 能夠自 己做出一些決策,如建立更多的 actor,或傳送更多的訊息,或者確定如何去響應接收到的 下一個訊息。
Actor 是 Akka 中最核心的概念,它是一個封裝了狀態和行為的物件,Actor 之間可以通過交 換訊息的方式進行通訊,每個 Actor 都有自己的收件箱(Mailbox)。通過 Actor 能夠簡化鎖 及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統。
Actor 具有如下特性:
1、提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下 的程式設計開發
2、提供了非同步非阻塞的、高效能的事件驅動程式設計模型
3、超級輕量級事件處理(每 GB 堆記憶體幾百萬 Actor)
4.2、重要 API 介紹
4.2.1、ActorSystem
在 Akka 中,ActorSystem 是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中, ActorSystem 通常是一個單例物件,我們可以使用這個 ActorSystem 的 actorOf 方法建立很多 Actor。
4.2.2、Actor
在 Akka 中,Actor 負責通訊,在 Actor 中有一些重要的生命週期方法。
1、preStart()方法:該方法在 Actor 物件構造方法執行後執行,整個 Actor 生命週期中僅執行 一次。
2、receive()方法:該方法在 Actor 的 preStart 方法執行完成後執行,用於接收訊息,會被反 復執行。
4.2.3、ActorSystem 和 Actor 對比
Actor: 就是用來做訊息傳遞的 用來接收和傳送訊息的,一個 Actor 就相當於是一個老師或者是學生。 如果我們想要多個老師,或者學生,就需要建立多個 Actor 例項。
ActorSystem: 用來建立和管理 Actor,並且還需要監控 Actor。ActorSystem 是單例的(object) 在同一個程序裡面,只需要一個 ActorSystem 就可以了
4.3、利用 Akka 構建 RPC 應用案例
4.3.1、需求
目前大多數的分散式架構底層通訊都是通過 RPC 實現的,RPC 框架非常多,比如前我們學過 的 Hadoop 專案的 RPC 通訊框架,但是 Hadoop 在設計之初就是為了執行長達數小時的批量 而設計的,在某些極端的情況下,任務提交的延遲很高,所有 Hadoop 的 RPC 顯得有些笨重。
Spark 的 RPC 是通過 Akka 類庫實現的,Akka 用 Scala 語言開發,基於 Actor 併發模型實現, Akka 具有高可靠、高效能、可擴充套件等特點,使用 Akka 可以輕鬆實現分散式 RPC 功能。
4.3.2、應用架構
4.2.3、具體實現
Master 程式碼實現:
class Master extends Actor{
def doHello(): Unit ={ println("我是 Master, 我接收到了 Worker 的 hello 的訊息");
}
/**
* 其實就是一個死迴圈 : 接收訊息
* while(true)
*/
override def receive: Receive = {
case "hello" =>{
doHello()
//sender() 誰傳送過來訊息這個就是誰
//sender() ! "hi" 給 sender() 傳送一個 hi的訊息
sender() ! "hi"
}
}
}
object Master {
def main(args: Array[String]): Unit = {
val str=
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
|akka.remote.netty.tcp.port = 6790
""".stripMargin
val conf: Config = ConfigFactory.parseString(str)
// def apply(name: String, config: Config)
val actorSystem = ActorSystem("MasterActorSystem", conf)
// 建立並啟動 actor def actorOf(props: Props, name: String): ActorRef
//new Master() 會導致主建構函式會執行!!
actorSystem.actorOf(Props(new Master()), "MasterActor")
}
}
Worker 程式碼實現:
class Worker extends Actor{// 生命週期
def doHi(): Unit ={
println("我是 Worker,我接收到了 Master 的 hi 的訊息");
}
// 如果 actor一執行首先執行的是這個方法,只執行一次。
override def preStart(): Unit = {
// 實現的是給 Master 傳送訊息 地址
val workerActor =
context.actorSelection("akka.tcp://[email protected]:6790/user/MasterActo
r")
workerActor ! "hello"
}
override def receive: Receive = {
case "hi" => {
doHi()
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
val str=
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
""".stripMargin
val conf = ConfigFactory.parseString(str)
val actorSystem = ActorSystem("WorkerActorSystem", conf)
actorSystem.actorOf(Props(new Worker()), "WorkerActor")
}
}
4.2.4、執行測試
先啟動 Master,再啟動 Worker