1. 程式人生 > >Scala基於Akka的Remote Actor實現的簡單RPC

Scala基於Akka的Remote Actor實現的簡單RPC

spark 1.3中的通訊是基於Akka實現的,Actor之間的互動都是通過訊息,並且所有動作都是非同步的。
本文基於spark 1.3通訊核心原理實現一個簡單的基於akka的rpc框架。
服務端:Server
//模式匹配 訊息型別
case class AkkaMessage(message: Any)
case class Response(response: Any)
class Server extends Actor {
override def receive: Receive = {
case msg:AkkaMessage=>{
println(“服務端收到訊息:”+msg.message)
sender ! Response(“response_” + msg.message)
}
case _ => println(“服務端不支援的訊息型別 .. “)

}
}
object Server {
//建立遠端Actor:ServerSystem
def main(args: Array[String]): Unit = {
val serverSystem = ActorSystem(“mxb”,ConfigFactory.parseString(“””
akka {
actor {
provider = “akka.remote.RemoteActorRefProvider”
}
remote {
enabled-transports = [“akka.remote.netty.tcp”]
netty.tcp {
hostname = “127.0.0.1”
port = 2555
}
}
}
“”“))
serverSystem.actorOf(Props[Server], “server”)
}
}
客戶端Client:

class Client extends Actor {
//遠端Actor
var remoteActor : ActorSelection = null
//當前Actor
var localActor : akka.actor.ActorRef = null
@throwsException
override def preStart(): Unit = {
remoteActor = context.actorSelection(“akka.tcp://[email protected]:2555/user/server”)
println(“遠端服務端地址 : ” + remoteActor)
}

override def receive: Receive = {
//接收到訊息型別為AkkaMessage後,將訊息轉發至遠端Actor
case msg: AkkaMessage => {
println(“客戶端傳送訊息 : ” + msg)
this.localActor = sender()
remoteActor ! msg
}
//接收到遠端Actor傳送的訊息型別為Response,響應
case res: Response => {
localActor ! res
}
case _ => println(“客戶端不支援的訊息型別 .. “)

}
}
object Client {
def main(args: Array[String]) : Unit = {
val clientSystem = ActorSystem(“ClientSystem”, ConfigFactory.parseString(“””
akka {
actor {
provider = “akka.remote.RemoteActorRefProvider”
}
}
“”“))

 var client = clientSystem.actorOf(Props[Client])
 var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4"))

  implicit val timeout = Timeout(3 seconds)

  msgs.foreach { x =>
    val future = client ? x
    val result = Await.result(future,timeout.duration).asInstanceOf[Response]
    println("收到的反饋: " + result)
  }

  //     msgs.foreach { x =>
  //       client ! x
  //     }

  clientSystem.shutdown()

}
}

執行結果:
server console:
[INFO] [03/13/2017 15:38:03.154] [main] [Remoting] Starting remoting
[INFO] [03/13/2017 15:38:03.653] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2555]
[INFO] [03/13/2017 15:38:03.653] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2555]
服務端收到訊息:message1
服務端收到訊息:message2
服務端收到訊息:message3
服務端收到訊息:message4
[ERROR] [03/13/2017 15:38:49.113] [mxb-akka.remote.default-remote-dispatcher-6] [akka.tcp://[email protected]:2555/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClientSystem%4010.60.98.79%3A2552-0/endpointWriter] AssociationError [akka.tcp://[email protected]:2555] <- [akka.tcp://[email protected]:2552]: Error [Shut down address: akka.tcp://[email protected]:2552] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://[email protected]:2552
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]

client console:
[INFO] [03/13/2017 15:38:48.409] [main] [Remoting] Starting remoting
[INFO] [03/13/2017 15:38:48.596] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [03/13/2017 15:38:48.596] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
遠端服務端地址 : ActorSelection[Anchor(akka.tcp://[email protected]:2555/), Path(/user/server)]
客戶端傳送訊息 : AkkaMessage(message1)
收到的反饋: Response(response_message1)
客戶端傳送訊息 : AkkaMessage(message2)
收到的反饋: Response(response_message2)
客戶端傳送訊息 : AkkaMessage(message3)
收到的反饋: Response(response_message3)
客戶端傳送訊息 : AkkaMessage(message4)
收到的反饋: Response(response_message4)
[INFO] [03/13/2017 15:38:49.035] [ClientSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClientSy[email protected]:2552/system/remoting-terminator] Shutting down remote daemon.
[INFO] [03/13/2017 15:38:49.050] [ClientSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://[email protected]:2552/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [03/13/2017 15:38:49.128] [ForkJoinPool-3-worker-15] [Remoting] Remoting shut down
[INFO] [03/13/2017 15:38:49.128] [ClientSystem-akka.remote.default-remote-dispatcher-7] [akka.tcp://[email protected]:2552/system/remoting-terminator] Remoting shut down.

Process finished with exit code 0