1. 程式人生 > >Scala入門第十一篇--Akka實現簡易版的spark通訊框架

Scala入門第十一篇--Akka實現簡易版的spark通訊框架

本次我們介紹使用Akka實現簡易的Spark通訊框架,主要分為:

  • Akka簡介
  • 提出需求
  • 需求分析
  • 程式碼示例

Akka簡介

Akka基於Actor模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。

  • Actor模型
  • 在電腦科學領域,Actor模型是一個平行計算(Concurrent Computation)模型,它把actor作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個actor能夠自己做出一些決策,如建立更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。 在這裡插入圖片描述 Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:
  1. 提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發
  2. 提供了非同步非阻塞的、高效能的事件驅動程式設計模型
  3. 超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

這裡用發郵件的例子類比Akka的併發程式設計模型 在這裡插入圖片描述 併發程式設計可以做如下類比,像是一個非阻塞的訊息 在這裡插入圖片描述 錯誤恢復機制 在這裡插入圖片描述

專案需求

目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所以Hadoop的RPC顯得有些笨重。

而需要實現高效的通訊框架,我們可以引入Spark的通訊框架,Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。 需求 利用Akka的actor程式設計模型,實現2個程序間的通訊。 在這裡插入圖片描述 通訊業務邏輯

  • 啟動master,然後啟動worker
  • worker先拿到master的引用,接下來worker通過這個master引用向master傳送註冊資訊,註冊的時候需要的資訊有:
    1. 通訊協議
    2. master的埠
    3. master的IP
    4. 建立master actor的老大ActorSystem
    5. actor層級
  • master接收worker傳送過來的註冊資訊
  • master傳送註冊成功的資訊給worker

ActorSystem介紹 ActorSystem:在Akka中,ActorSystem是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中,ActorSystem通常是一個單例物件,我們可以使用這個ActorSystem建立很多Actor。 注意:

  1. ActorSystem是一個程序中的老大,它負責建立和監督actor
  2. ActorSystem是一個單例物件
  3. actor負責通訊

Actor介紹 在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法。

  • preStart()方法:該方法在Actor物件構造方法執行後執行,整個Actor生命週期中僅執行一次。
  • receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收訊息,會被反覆執行。

##程式碼實現

Master類

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

//todo:利用akka的actor模型實現2個程序間的通訊-----Master端

class Master  extends Actor{
  //構造程式碼塊先被執行
  println("master constructor invoked")

  //prestart方法會在構造程式碼塊執行後被呼叫,並且只被呼叫一次
  override def preStart(): Unit = {
    println("preStart method invoked")
  }

  //receive方法會在prestart方法執行後被呼叫,表示不斷的接受訊息
  override def receive: Receive = {
    case "connect" =>{
      println("a client connected")
      //master傳送註冊成功資訊給worker
      sender ! "success"
    }
  }
}
object Master{
  def main(args: Array[String]): Unit = {
    //master的ip地址
    val host=args(0)
    //master的port埠
    val port=args(1)

    //準備配置檔案資訊
    val configStr=
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //配置config物件 利用ConfigFactory解析配置檔案,獲取配置資訊
    val config=ConfigFactory.parseString(configStr)

      // 1、建立ActorSystem,它是整個程序中老大,它負責建立和監督actor,它是單例物件
    val masterActorSystem = ActorSystem("masterActorSystem",config)
     // 2、通過ActorSystem來建立master actor
      val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor")
    // 3、向master actor傳送訊息
    //masterActor ! "connect"
  }
}

worker類

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory


//todo:利用akka中的actor實現2個程序間的通訊-----Worker端
class Worker  extends Actor{
  println("Worker constructor invoked")

  //prestart方法會在構造程式碼塊之後被呼叫,並且只會被呼叫一次
  override def preStart(): Unit = {
      println("preStart method invoked")
    //獲取master actor的引用
    //ActorContext全域性變數,可以通過在已經存在的actor中,尋找目標actor
    //呼叫對應actorSelection方法,
    // 方法需要一個path路徑:1、通訊協議、2、master的IP地址、3、master的埠 4、建立master actor老大 5、actor層級
    val master: ActorSelection = context.actorSelection("akka.tcp://[email protected]:8888/user/masterActor")

    //向master傳送訊息
    master ! "connect"

  }

  //receive方法會在prestart方法執行後被呼叫,不斷的接受訊息
  override def receive: Receive = {
    case "connect" =>{
      println("a client connected")
    }
    case "success" =>{
      println("註冊成功")
    }
  }
}

object Worker{
  def main(args: Array[String]): Unit = {
    //定義worker的IP地址
    val host=args(0)
    //定義worker的埠
    val port=args(1)

    //準備配置檔案
    val configStr=
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //通過configFactory來解析配置資訊
    val config=ConfigFactory.parseString(configStr)
     // 1、建立ActorSystem,它是整個程序中的老大,它負責建立和監督actor
    val workerActorSystem = ActorSystem("workerActorSystem",config)
    // 2、通過actorSystem來建立 worker actor
    val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker),"workerActor")

    //向worker actor傳送訊息
    workerActor ! "connect"
  }
}

本次結束