1. 程式人生 > >Scala學習筆記(11)—— RPC 通訊框架

Scala學習筆記(11)—— RPC 通訊框架

1 通訊業務邏輯

定義2個類 Master, Worker。首先啟動Master,然後啟動Worker

  1. Worker 啟動後,在 preStart 方法中與 Master 建立連線,向 Master 傳送註冊,將 Worker 的資訊通過 case class 封裝起來發送給 Master.
  2. Master 接收 Worker 的註冊訊息後,將Worker 的資訊儲存起來。然後向Worker反饋註冊成功。
  3. Worker 定期向 Master 傳送心跳
  4. Master 會定時處理超時的 Worker

2 原始碼

2.1 WorkerInfo.scala

package cn.tzb.rpc

class
WorkerInfo(val id: String, val memory: Int, val cores: Int) { //TODO 上一次心跳 var lastHeartbeatTime: Long = _ }

2.2 RemoteMessage.scala

package cn.tzb.rpc

trait RemoteMessage extends Serializable

// Worker -> Master
case class RegisterWorker(id: String, memory: Int, cores: Int) extends RemoteMessage
case class Heartbeat(val id: String) extends RemoteMessage //Master -> Worker case class RegisteredWorker(masterUrl: String) extends RemoteMessage //Worker -> self case object SendHeartbeat //Master -> self case object CheckTimeOutWorker

2.3 Master.scala

package cn.tzb.rpc

import akka.actor.
{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ class Master(val host: String, val port: Int) extends Actor { //id -> WorkerInfo val idToWorker = new mutable.HashMap[String, WorkerInfo]() val workers = new mutable.HashSet[WorkerInfo]() //超時檢測的間隔 val CHECK_INTERVAL = 15000 override def preStart(): Unit = { println("preStart invoked !") //匯入隱式轉換 import context.dispatcher context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker) } override def receive: Receive = { case RegisterWorker(id, memory, cores) => { //判斷是否註冊過 if (!idToWorker.contains(id)) { //封裝 Worker資訊儲存到記憶體 val workerInfo = new WorkerInfo(id, memory, cores) idToWorker(id) = workerInfo workers += workerInfo sender ! RegisteredWorker(s"akka.tcp://[email protected]$host:$port/user/Master") } } case Heartbeat(id) => { if (idToWorker.contains(id)) { val workerInfo = idToWorker(id) //報活 val currentTime = System.currentTimeMillis() workerInfo.lastHeartbeatTime = currentTime } } case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL) for (w <- toRemove) { workers -= w idToWorker -= w.id } println(workers.size) } } } object Master { def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) // ActorSystem 老大, 輔助建立和監控下面的 Actor, 是單例的 val actorSystem = ActorSystem("MasterSystem", config) //建立 Actor val master = actorSystem.actorOf(Props(new Master(host, port)), "Master") actorSystem.awaitTermination() } }

在這裡插入圖片描述

2.4 Worker.scala

package cn.tzb.rpc

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

class Worker(val masterHost: String, val masterPort: Int, val memory: Int, val cores: Int) extends Actor {

  var master: ActorSelection = _
  val workerId = UUID.randomUUID().toString

  val HEART_INTERVAL = 10000

  //建立連線
  override def preStart(): Unit = {
    //與Master 建立連線
    master = context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")

    //向 Master 傳送註冊資訊
    master ! RegisterWorker(workerId, memory, cores)
  }

  override def receive: Receive = {
    case RegisteredWorker(masterUrl) => {
      println(masterUrl)
      //傳送心跳,啟動定時器,

      //匯入隱式轉換
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat)
    }

    case SendHeartbeat => {
      println("send heartbeat to master")
      master ! Heartbeat(workerId)
    }
  }
}

object Worker {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt

    val masterHost = args(2)
    val masterPort = args(3).toInt

    val memory = args(4).toInt
    val cores = args(5).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    val config = ConfigFactory.parseString(configStr)

    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort,memory,cores)), "Worker")
    actorSystem.awaitTermination()


  }

}

在這裡插入圖片描述

2.5 執行結果

Master 在這裡插入圖片描述

Worker 在這裡插入圖片描述