1. 程式人生 > >Scala併發程式設計模型AKKA

Scala併發程式設計模型AKKA

一、併發程式設計模型AKKA

    Spark使用底層通訊框架AKKA
    分散式
    master
    worker
    hadoop使用的是rpc
    
    1)akka簡介
    寫併發程式很難,AKKA解決spark這個問題。
    akka構建在JVM平臺上,是一種高併發、分散式、並且容錯的應用工具包
    akka用scala語言編寫同時提供了scala和java的開發介面
    akka可以開發一些高併發程式。
    
    2)Akka的Actor模型
    akka處理併發的方法基於actor模型
    在基於actor的系統中,所有事物都是actor。
    actor作為一個併發模型設計和架構的,面向物件不是。
    actor與actor之間只能通過訊息通訊。
    
    Akka特點:
    (
1)對併發模型進行了更高的抽象 (2)非同步、非阻塞、高效能的事件驅動程式設計模型 (3)輕量級事件處理(1G記憶體可以容納百萬級別的Actor) 同步:阻塞(發訊息 一直等待訊息) 非同步:不阻塞(發訊息 不等待 該幹嘛幹嘛) actor簡化了併發程式設計,提高了程式效能。

1、Actor模型

2、Actor工作機制

二、AKKA程式設計

1、需求  我發訊息,自己收

object CallMe {
  //1.建立ActorSystem 用ActorSystem建立Actor
  private val acFactory = ActorSystem("AcFactory")
  
//2.Actor傳送訊息通過ActorRef private val callRef = acFactory.actorOf(Props[CallMe],"CallMe") def main(args: Array[String]): Unit = { //3.傳送訊息 callRef ! "你吃飯了嗎" callRef ! "很高興見到你" callRef ! "stop" } } class CallMe extends Actor{ //Receive使用者接收訊息並且處理訊息 override def receive: Receive = {
case "你吃飯了嗎" => println("吃的雞腿") case "很高興見到你" => println("我也是") case "stop" => { //關閉代理ActorRef context.stop(self) //關閉ActorSystem context.system.terminate() } } }

結果:

2.需求  一個Actor傳送訊息,另外一個Actor接收訊息

(1)TomActor

import akka.actor.Actor

class TomActor extends Actor{
  override def receive: Receive = {
    case "你好,我是John" => {
      println("你好,我是Tom")
    }

    case "我愛Tom" => {
      println("Tom也愛John")
    }
  }
}

(2)JohnActor

import akka.actor.{Actor, ActorRef}

class JohnActor(val h:ActorRef) extends Actor{
  override def receive: Receive = {
    case "你好,我是John" => {
      //John傳送訊息給TomActor
      h ! "我愛Tom"
    }
  }
}

(3)QqDriver

import akka.actor.{ActorSystem, Props}

object QqDriver {
  //1.建立ActorSystem 用ActorSystem建立Actor
  private val qqFactory = ActorSystem("QqFactory")
  //2.Actor傳送訊息通過ActorRef
  private val hRef = qqFactory.actorOf(Props[TomActor],"Tom")
  //John需要接受Tom傳送的訊息
  private val dRef = qqFactory.actorOf(Props(new JohnActor(hRef)),"John")

  def main(args: Array[String]): Unit = {
    //1.Tom自己給自己傳送訊息
    //hRef ! "我愛Tom"

    //2John給Tom傳送訊息
    dRef ! "你好,我是John"
  }
}

(4)結果

 3、maven依賴pom檔案

    <!-- 定義版本常量 -->
    <properties>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <akka.version>2.4.17</akka.version>
    </properties>

    <dependencies>
        <!-- 新增scala包的依賴 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- 新增akka包的actor依賴 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>

        <!-- 多程序之間的Actor通訊設定 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>

    <!-- 指定使用外掛-->
    <build>
        <!-- 指定原始碼包和測試包的位置資訊 -->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <!-- 指定編譯scala的外掛 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- maven打包使用的外掛 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <!-- 指定main方法 -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.itstaredu.spark.SparkWorker</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>