1. 程式人生 > >Scala學習之路 (十)Scala的Actor

Scala學習之路 (十)Scala的Actor

margin 編碼 source remote RR ada rpath 當前時間 mov

一、Scala中的並發編程

1、Java中的並發編程

①Java中的並發編程基本上滿足了事件之間相互獨立,但是事件能夠同時發生的場景的需要。

②Java中的並發編程是基於共享數據和加鎖的一種機制,即會有一個共享的數據,然後有若幹個線程去訪問這個共享的數據(主要是對這個共享的數據進行修改),同時Java利用加鎖的機制(即synchronized)來確保同一時間只有一個線程對我們的共享數據進行訪問,進而保證共享數據的一致性。

③Java中的並發編程存在資源爭奪和死鎖等多種問題,因此程序越大問題越麻煩。

2、Scala中的並發編程

①Scala中的並發編程思想與Java中的並發編程思想完全不一樣,Scala中的Actor是一種不共享數據,依賴於消息傳遞的一種並發編程模式, 避免了死鎖、資源爭奪等情況。在具體實現的過程中,Scala中的Actor會不斷的循環自己的郵箱,並通過receive偏函數進行消息的模式匹配並進行相應的處理。

②如果Actor A和 Actor B要相互溝通的話,首先A要給B傳遞一個消息,B會有一個收件箱,然後B會不斷的循環自己的收件箱, 若看見A發過來的消息,B就會解析A的消息並執行,處理完之後就有可能將處理的結果通過郵件的方式發送給A。

二、Scala中的Actor

1、什麽是Actor

一個actor是一個容器,它包含 狀態, 行為,信箱,子Actor 和 監管策略,所有這些包含在一個ActorReference(Actor引用)裏。一個actor需要與外界隔離才能從actor模型中獲益,所以actor是以actor引用的形式展現給外界的

2、ActorSystem的層次結構

如果一個Actor中的業務邏輯非常復雜,為了降低代碼的復雜度,可以將其拆分成多個子任務(在一個actor的內部可以創建一個或多個actor,actor的創建者也是該actor的監控者)


一個ActorSystem應該被正確規劃,例如哪一個Actor負責監控,監控什麽等等:

    • 負責分發的actor管理接受任務的actor
    • 擁有重要數據的actor,找出所有可能丟失數據的子actor,並且處理他們的錯誤。

3、ActorPath

ActorPath是通過字符串描述Actor的層級關系,並唯一標識一個Actor的方法。

ActorPath包含協議,位置 和 Actor層級關系

//本地path
"akka://my-sys/user/service-a/worker1"   

//遠程path  akka.tcp://(ActorSystem的名稱)@(遠程地址的IP):(遠程地址的端口)/user/(Actor的名稱)
"akka.tcp://[email protected]:5678/user/service-b" 

//akka集群
"cluster://my-cluster/service-c"

遠程地址不清楚是多少的話,可以在遠程的服務啟動的時候查看

技術分享圖片

4、獲取Actor Reference

獲取Actor引用的方式有兩種:創建 和 查找。

要創建Actor,可以調用ActorSystem.actorOf(..),它創建的actor在guardian actor之下,接著可以調用ActorContext的actorOf(…) 在剛才創建的Actor內生成一個actor樹。這些方法會返回新創建的actor的引用,每一個actor都可以通過訪問ActorContext來獲得自己(self),子Actor(children,child)和父actor(parent)。


要查找Actor Reference,可以調用ActorSystem或ActorContext的actorSelection(“path”),在查找ActorRef時,可以使用相對路徑或絕對路徑,如果是相對路徑,可以用 .. 來表示parent actor。

actorOf / actorSelection / actorFor的區別

  • actorOf 創建一個新的actor,創建的actor為調用該方法所屬的context的直接子actor。

  • actorSelection 查找現有actor,並不會創建新的actor。

  • actorFor 查找現有actor,不創建新的actor,已過時。

5、Actor和ActorSystem

Actor:
就是用來做消息傳遞的
用來接收和發送消息的,一個actor就相當於是一個老師或者是學生。
如果我們想要多個老師,或者學生,就需要創建多個actor實例。
ActorSystem:
用來創建和管理actor,並且還需要監控Actor。ActorSystem是單例的(object)
在同一個進程裏面,只需要一個ActorSystem就可以了

三、Actor的示例

1、示例說明

技術分享圖片

2、代碼實現

MyResourceManager.scala(服務端)

技術分享圖片
package com.rpc

import akka.actor._
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable

class MyResourceManager(var resourceManagerHostName:String, var resourceManagerPort:Int) extends Actor {
  /**
    * 定義一個Map,接受MyNodeManager的註冊信息,key是主機名,
    * value是NodeManagerInfo對象,裏面存儲主機名、CPU和內存信息
    * */
  var registerMap = new mutable.HashMap[String,NodeManagerInfo]()
  /**
    * 定義一個Set,接受MyNodeManager的註冊信息,key是主機名,
    * value是NodeManagerInfo對象,裏面存儲主機名、CPU和內存信息
    * 實際上和上面的Map裏面存檔內容一樣,容易變歷,可以不用寫,主要是模仿後面Spark裏面的內容
    * 方便到時理解Spark源碼
    * */
  var registerSet = new mutable.HashSet[NodeManagerInfo]()


  override def preStart(): Unit = {
    import scala.concurrent.duration._
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, 5000 millis, self,CheckTimeOut)
  }

  //對MyNodeManager傳過來的信息進行匹配
  override def receive: Receive = {
    //匹配到NodeManager的註冊信息進行對應處理
    case NodeManagerRegisterMsg(nodeManagerID,cpu,memory) => {
      //將註冊信息實例化為一個NodeManagerInfo對象
      val registerMsg = new NodeManagerInfo(nodeManagerID,cpu,memory)
      //將註冊信息存儲到registerMap和registerSet裏面,key是主機名,value是NodeManagerInfo對象
      registerMap.put(nodeManagerID,registerMsg)
      registerSet += registerMsg
      //註冊成功之後,反饋個MyNodeManager一個成功的信息
      sender() ! new RegisterFeedbackMsg("註冊成功!" + resourceManagerHostName+":"+resourceManagerPort)
    }
    //匹配到心跳信息做相應處理
    case HeartBeat(nodeManagerID) => {
      //獲取當前時間
      val time:Long = System.currentTimeMillis()
      //根據nodeManagerID獲取NodeManagerInfo對象
      val info = registerMap(nodeManagerID)
      info.lastHeartBeatTime = time
      //更新registerMap和registerSet裏面nodeManagerID對應的NodeManagerInfo對象信息(最後一次心跳時間)
      registerMap(nodeManagerID) = info
      registerSet += info
    }
    //檢測超時,對超時的數據從集合中刪除
    case CheckTimeOut => {
      var time = System.currentTimeMillis()
      registerSet
        .filter( nm => time - nm.lastHeartBeatTime > 10000)
        .foreach(deadnm => {
          registerSet -= deadnm
          registerMap.remove(deadnm.nodeManagerID)
        })
      println("當前註冊成功的節點數:" + registerMap.size)
    }
  }
}

object MyResourceManager {
  def main(args: Array[String]): Unit = {
    /**
      * 傳參:
      *   ResourceManager的主機地址、端口號
      * */
    val RM_HOSTNAME = args(0)
    val RM_PORT = args(1).toInt

    val str:String =
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname =localhost
        |akka.remote.netty.tcp.port=19888
      """.stripMargin
    val conf: Config = ConfigFactory.parseString(str)
    val actorSystem = ActorSystem(Conf.RMAS,conf)
    actorSystem.actorOf(Props(new MyResourceManager(RM_HOSTNAME,RM_PORT)),Conf.RMA)
  }
}
View Code

MyNodeManager.scala(客戶端)

技術分享圖片
package com.rpc

import java.util.UUID

import akka.actor._
import com.typesafe.config.{Config, ConfigFactory}

class MyNodeManager(resourceManagerHostName:String,resourceManagerPort:Int,cpu:Int,memory:Int) extends Actor{
  //MyNodeManager的UUID
  var nodeManagerID:String = _
  var rmref:ActorSelection = _
  override def preStart(): Unit = {
    //獲取MyResourceManager的Actor的引用
    rmref = context.actorSelection(s"akka.tcp://${Conf.RMAS}@${resourceManagerHostName}:${resourceManagerPort}/user/${Conf.RMA}")
    //生成隨機的UUID
    nodeManagerID = UUID.randomUUID().toString
    /**
      * 向MyResourceManager發送註冊信息
      * */
    rmref ! NodeManagerRegisterMsg(nodeManagerID,cpu,memory)

  }
  //進行信息匹配
  override def receive: Receive = {
    //匹配到註冊成功之後MyResourceManager反饋回的信息,進行相應處理
    case RegisterFeedbackMsg(feedbackMsg) => {
      /**
        * initialDelay: FiniteDuration, 多久以後開始執行
        * interval:     FiniteDuration, 每隔多長時間執行一次
        * receiver:     ActorRef, 給誰發送這個消息
        * message:      Any  發送的消息是啥
        */
      //定時任務需要導入的工具包
      import scala.concurrent.duration._
      import context.dispatcher
      //定時向自己發送信息
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendMessage)
    }
    //匹配到SendMessage信息之後做相應處理
    case SendMessage => {
      //向MyResourceManager發送心跳信息
      rmref ! HeartBeat(nodeManagerID)
      println(Thread.currentThread().getId + ":" + System.currentTimeMillis())
    }
  }
}

object MyNodeManager {
  def main(args: Array[String]): Unit = {
    /**
      * 傳參:
      *   NodeManager的主機地址、端口號、CPU、內存
      *   ResourceManager的主機地址、端口號
      * */
    val NM_HOSTNAME = args(0)
    val NM_PORT = args(1)
    val NM_CPU:Int = args(2).toInt
    val NM_MEMORY:Int = args(3).toInt

    val RM_HOSTNAME = args(4)
    val RM_PORT = args(5).toInt

    val str:String =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = ${NM_HOSTNAME}
        |akka.remote.netty.tcp.port = ${NM_PORT}
      """.stripMargin
    val conf: Config = ConfigFactory.parseString(str)
    val actorSystem = ActorSystem(Conf.NMAS,conf)
    actorSystem.actorOf(Props(new MyNodeManager(RM_HOSTNAME,RM_PORT,NM_CPU,NM_MEMORY)),Conf.NMA)
  }
}
View Code

Conf.scala(配置文件)

技術分享圖片
package com.rpc

//避免硬編碼
object Conf {
  //ResourceManagerActorSystem
  val RMAS = "MyRMActorSystem"
  //ResourceManagerActor
  val RMA = "MyRMActor"
  //NodeManagerActorSystem
  val NMAS = "MyNMActorSystem"
  //NodeManagerActor
  val NMA = "MyNMactor"
}
View Code

Message.scala

技術分享圖片
package com.rpc
//NodeManager註冊信息
case class NodeManagerRegisterMsg(val nodeManagerID:String, var cpu:Int, var memory:Int)
//ResourceManager接收到註冊信息成功之後的返回信息
case class RegisterFeedbackMsg(val feedbackMsg: String)
//NodeManager的心跳信息
case class HeartBeat(val nodeManagerID:String)
//NodeManager註冊信息
class NodeManagerInfo(val nodeManagerID:String, var cpu:Int, var memory:Int){
  //定義一個屬性,存儲上一次的心跳時間
  var lastHeartBeatTime:Long = _
}

case object SendMessage
case object CheckTimeOut
View Code

3、運行

(1)運行MyResourceManager

運行結果

技術分享圖片

發現報錯數組越界,原因是在啟動時需要傳入2個參數

技術分享圖片

技術分享圖片

重新啟動,啟動成功

技術分享圖片

2、運行MyNodeManager

技術分享圖片

報相同的錯誤,不過此處需要傳入6個參數

技術分享圖片

重新啟動,啟動成功

技術分享圖片

3、觀察MyResourceManager

發現有一個節點連接成功

技術分享圖片

4、再啟動一個MyNodeManager觀察情況

先修改MyNodeManager配置裏面的端口

技術分享圖片

再啟動

技術分享圖片

啟動成功之後觀察MyResourceManager,此時有2個節點連接成功

技術分享圖片

5、關閉一個節點,觀察情況

集合中連接超時的成功刪除

技術分享圖片

Scala學習之路 (十)Scala的Actor