1. 程式人生 > >Scal:Master和worker之間的通訊

Scal:Master和worker之間的通訊

Scala程式設計實戰

  1. 課程目標 1.1. 目標:熟練使用Scala編寫程式 在這裡插入圖片描述

  2. 專案概述 2.1. 需求 目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。

Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。 2.2. Akka簡介 Akka基於Actor模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。

Actor模型:在電腦科學領域,Actor模型是一個平行計算(Concurrent Computation)模型,它把actor作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個actor能夠自己做出一些決策,如建立更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。

在這裡插入圖片描述

Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:

1.提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發 2.提供了非同步非阻塞的、高效能的事件驅動程式設計模型 3.超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

  1. 專案實現 3.1. 架構圖

3.2. 重要類介紹 3.2.1. ActorSystem 在Akka中,ActorSystem是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中,ActorSystem通常是一個單例物件,我們可以使用這個ActorSystem建立很多Actor。 3.2.2. Actor 在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法。

1.preStart()方法:該方法在Actor物件構造方法執行後執行,整個Actor生命週期中僅執行一次。 2.receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收訊息,會被反覆執行。 3.3. Master類

package cn.itcast.spark
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

/**
  * Master為整個叢集中的主節點
  * Master繼承了Actor
  */
class Master extends Actor{

  //儲存WorkerID和Work資訊的map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]
  //儲存所有Worker資訊的Set
  val workers = new mutable.HashSet[WorkerInfo]
  //Worker超時時間
  val WORKER_TIMEOUT = 10 * 1000
  //重新receive方法

  //匯入隱式轉換,用於啟動定時器
  import context.dispatcher

  //構造方法執行完執行一次
  override def preStart(): Unit = {
    //啟動定時器,定時執行
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  }

  //該方法會被反覆執行,用於接收訊息,通過case class模式匹配接收訊息
  override def receive: Receive = {
    //Worker向Master傳送的註冊訊息
    case RegisterWorker(id, workerHost, memory, cores) => {
      if(!idToWorker.contains(id)) {
        val worker = new WorkerInfo(id, workerHost, memory, cores)
        workers.add(worker)
        idToWorker(id) = worker
        sender ! RegisteredWorker("192.168.10.1")
      }
    }

    //Worker向Master傳送的心跳訊息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    }

    //Master自己向自己傳送的定期檢查超時Worker的訊息
    case CheckOfTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
      for(worker <- toRemove){
        workers -= worker
        idToWorker.remove(worker.id)
      }
      println("worker size: " + workers.size)
    }
  }
}

object Master {
  //程式執行入口
  def main(args: Array[String]) {

    val host = "192.168.10.1"
    val port = 8888
    //建立ActorSystem的必要引數
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem是單例的,用來建立Actor
    val actorSystem = ActorSystem.create("MasterActorSystem", config)
    //啟動Actor,Master會被例項化,生命週期方法會被呼叫
    actorSystem.actorOf(Props[Master], "Master")
  }
}

3.4. Worker類

package cn.itcast.spark

import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

/**
  * Worker為整個叢集的從節點
  * Worker繼承了Actor
  */
class Worker extends Actor{

  //Worker端持有Master端的引用(代理物件)
  var master: ActorSelection = null
  //生成一個UUID,作為Worker的標識
  val id = UUID.randomUUID().toString

  //構造方法執行完執行一次
  override def preStart(): Unit = {
    //Worker向MasterActorSystem傳送建立連線請求
    master = context.system.actorSelection("akka.tcp://[email protected]:8888/user/Master")
    //Worker向Master傳送註冊訊息
    master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
  }

  //該方法會被反覆執行,用於接收訊息,通過case class模式匹配接收訊息
  override def receive: Receive = {
    //Master向Worker的反饋資訊
    case RegisteredWorker(masterUrl) => {
      import context.dispatcher
      //啟動定時任務,向Master傳送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
    }

    case SendHeartBeat => {
      println("worker send heartbeat")
      master ! HeartBeat(id)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val clientPort = 2552
    //建立WorkerActorSystem的必要引數
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.port = $clientPort
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerActorSystem", config)
    //啟動Actor,Master會被例項化,生命週期方法會被呼叫
    actorSystem.actorOf(Props[Worker], "Worker")
  }
}