1. 程式人生 > >Scala學習筆記(10)—— Akka 實現簡單 RPC 框架

Scala學習筆記(10)—— Akka 實現簡單 RPC 框架

1 Akka 介紹

目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。

Akka基於Actor模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。Actor模型:在電腦科學領域,Actor模型是一個平行計算(Concurrent Computation)模型,它把actor作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個actor能夠自己做出一些決策,如建立更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。 在這裡插入圖片描述

Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:

  • 提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發
  • 提供了非同步非阻塞的、高效能的事件驅動程式設計模型
  • 超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

2 分析

2.1 總體分析

  1. ActorSystem 是這個程序中的 Actor 的老大,負責建立和監控所有的 actor
  2. ActorSystem 是單例的
  3. actor 負責通訊

2.2 Master 和 Worker

  1. 先建立連線
  2. 拿到 Master 的代理物件
  3. 向 Master 傳送訊息
  4. Master 向 worker 反饋訊息

3 原始碼

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion> <groupId>cn.tzb.com</groupId> <artifactId>my-akka-rpc</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <scala.compat.version>2.10</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.10</artifactId> <version>2.3.14</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.10</artifactId> <version>2.3.14</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>

3.1 Master

package cn.tzb.rpc

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Master extends Actor {

  println("constructor invoked!")

  override def preStart(): Unit = {
    println("preStart invoked !")
  }

  override def receive: Receive = {

    case "connect" => {
      println("a client connected")
      sender ! "reply"
    }

    case "hello" => {
      println("hello!!!")
    }
  }

}

object Master {
  def main(args: Array[String]): Unit = {

    val host = args(0)
    val port = args(1).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    val config = ConfigFactory.parseString(configStr)

    //    ActorSystem 老大, 輔助建立和監控下面的 Actor, 是單例的
    val actorSystem = ActorSystem("MasterSystem", config)

    //建立 Actor
    val master = actorSystem.actorOf(Props(new Master), "Master")

    master ! "hello"

  }
}

在這裡插入圖片描述

3.2 Worker

package cn.tzb.rpc

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Worker(val masterHost: String, val masterPort: Int) extends Actor {

  var master: ActorSelection = _

  //建立連線
  override def preStart(): Unit = {
    master = context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")
    master ! "connect"
  }

  override def receive: Receive = {
    case "reply" => {
      println("a reply from master")
    }
  }
}

object Worker {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt

    val masterHost = args(2)
    val masterPort = args(3).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    val config = ConfigFactory.parseString(configStr)

    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker(masterHost,masterPort)), "Worker")
    actorSystem.awaitTermination()


  }

}

在這裡插入圖片描述

3.3 執行結果

Master 在這裡插入圖片描述

Worker 在這裡插入圖片描述

4 打包

4.1 Master 打包

在這裡插入圖片描述

在這裡插入圖片描述 在這裡插入圖片描述

4.2 Mater 執行

將打包好的 jar 重新命名為 master.jar, 然後上傳到 node1

[[email protected] ~]$ java -jar master.jar 192.168.30.131 8888
[INFO] [10/14/2018 10:40:59.344] [main] [Remoting] Starting remoting
[INFO] [10/14/2018 10:40:59.563] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:8888]
[INFO] [10/14/2018 10:40:59.563] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:8888]
constructor invoked!
preStart invoked !
hello!!!

4.3 Worker 打包

修改pom 檔案 在這裡插入圖片描述 maven 打包,打包後的檔案重新命名為 worker.jar ,上傳到 node2

4.4 worker 執行

[[email protected] ~]$ java -jar worker.jar 192.168.30.132 9999 192.168.30.131 8888
[INFO] [10/14/2018 10:53:50.627] [main] [Remoting] Starting remoting
[INFO] [10/14/2018 10:53:50.939] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:9999]
[INFO] [10/14/2018 10:53:50.940] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:9999]
a reply from master