1. 程式人生 > >Akka 實踐(二)- java開發demo1

Akka 實踐(二)- java開發demo1

下面就Akka的一個java demo來說明Akka 如何運作的。

1、首先下載 Akka的官方包

下載地址為: http://akka.io/downloads/.  我下載的是 Akka的2.3.15


     解壓這個壓縮包,準備拷貝相關的jar包到自己的工程

2、建立一個java工程

     首先匯入包

    

 3、下面是原始碼和原始碼說明

這裡,首先貼入原始碼,然後再對原始碼進行分析。 原始碼有4個檔案,主要完成的功能是,主程式建立ActorSystem,並且生成Actor1(CreationActor),這個Actor1,根據隨機得到的訊息,迴圈建立執行緒,每個執行緒都建立Actor2(CalculatorActor),併發不同的訊息給Actor2 告訴這些第二層Actor要做什麼。 這些Actor2 做完自己的事情後,發訊息給Actor1,告訴自己完成了,Actor1 收到訊息後,列印結果是什麼,並結束執行緒。

整個業務大致可以用下面圖來表示:(左邊是示意圖,右邊是類圖)


Op.java

<span style="font-family:Microsoft YaHei;font-size:14px;">package com.cwqsolo.study.akka.demo;

import java.io.Serializable;

//Op 類是操作類,分別包含了 加減乘除類和加減乘除結果類
public class Op {

  public interface MathOp extends Serializable {
  }

  public interface MathResult extends Serializable {
  }

  //加法操作
  static class Add implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1;
    private final int n2;

    public Add(int n1, int n2) {
      this.n1 = n1;
      this.n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class AddResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1;
    private final int n2;
    private final int result;

    public AddResult(int n1, int n2, int result) {
      this.n1 = n1;
      this.n2 = n2;
      this.result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

  //減法操作
  static class Subtract implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1;
    private final int n2;

    public Subtract(int n1, int n2) {
      this.n1 = n1;
      this.n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class SubtractResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1;
    private final int n2;
    private final int result;

    public SubtractResult(int n1, int n2, int result) {
      this.n1 = n1;
      this.n2 = n2;
      this.result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }

   //乘法操作
  static class Multiply implements MathOp {
    private static final long serialVersionUID = 1L;
    private final int n1;
    private final int n2;

    public Multiply(int n1, int n2) {
      this.n1 = n1;
      this.n2 = n2;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class MultiplicationResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final int n1;
    private final int n2;
    private final int result;

    public MultiplicationResult(int n1, int n2, int result) {
      this.n1 = n1;
      this.n2 = n2;
      this.result = result;
    }

    public int getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public int getResult() {
      return result;
    }
  }
  // 除法操作
  static class Divide implements MathOp {
    private static final long serialVersionUID = 1L;
    private final double n1;
    private final int n2;

    public Divide(double n1, int n2) {
      this.n1 = n1;
      this.n2 = n2;
    }

    public double getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }
  }

  static class DivisionResult implements MathResult {
    private static final long serialVersionUID = 1L;
    private final double n1;
    private final int n2;
    private final double result;

    public DivisionResult(double n1, int n2, double result) {
      this.n1 = n1;
      this.n2 = n2;
      this.result = result;
    }

    public double getN1() {
      return n1;
    }

    public int getN2() {
      return n2;
    }

    public double getResult() {
      return result;
    }
  }
}</span>
CalculatorActor.java
<span style="font-family:Microsoft YaHei;font-size:14px;">package com.cwqsolo.study.akka.demo;

import akka.actor.UntypedActor;


public class CalculatorActor extends UntypedActor {
    
    @Override
    public void onReceive(Object message) {

      if (message instanceof Op.Add) {
        Op.Add add = (Op.Add) message;
        System.out.println("Calculating " + add.getN1() + " + " + add.getN2());
        Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(),
            add.getN1() + add.getN2());
        getSender().tell(result, getSelf());

      } else if (message instanceof Op.Subtract) {
        Op.Subtract subtract = (Op.Subtract) message;
        System.out.println("Calculating " + subtract.getN1() + " - "
            + subtract.getN2());
        Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(),
            subtract.getN2(), subtract.getN1() - subtract.getN2());
        getSender().tell(result, getSelf());

      } else if (message instanceof Op.Multiply) {
        Op.Multiply multiply = (Op.Multiply) message;
        System.out.println("Calculating " + multiply.getN1() + " * "
            + multiply.getN2());
        Op.MultiplicationResult result = new Op.MultiplicationResult(
            multiply.getN1(), multiply.getN2(), multiply.getN1()
                * multiply.getN2());
        getSender().tell(result, getSelf());

      } else if (message instanceof Op.Divide) {
        Op.Divide divide = (Op.Divide) message;
        System.out.println("Calculating " + divide.getN1() + " / "
            + divide.getN2());
        Op.DivisionResult result = new Op.DivisionResult(divide.getN1(),
            divide.getN2(), divide.getN1() / divide.getN2());
        getSender().tell(result, getSelf());

      } else {
        unhandled(message);
      }
    }
  }</span>
CreationActor.java
<span style="font-family:Microsoft YaHei;font-size:14px;">package com.cwqsolo.study.akka.demo;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

//建立Actor
public class CreationActor extends UntypedActor {

  @Override
  public void onReceive(Object message) throws Exception {

    if (message instanceof Op.MathOp) {
      ActorRef calculator = getContext().actorOf(
          Props.create(CalculatorActor.class));
      calculator.tell(message, getSelf());

    } else if (message instanceof Op.MultiplicationResult) {
      Op.MultiplicationResult result = (Op.MultiplicationResult) message;
      System.out.printf("Mul result: %d * %d = %d\n", result.getN1(),
          result.getN2(), result.getResult());
      getContext().stop(getSender());

    } else if (message instanceof Op.DivisionResult) {
      Op.DivisionResult result = (Op.DivisionResult) message;
      System.out.printf("Div result: %.0f / %d = %.2f\n", result.getN1(),
          result.getN2(), result.getResult());
      getContext().stop(getSender());

    } else {
      unhandled(message);
    }
  }
}</span>
CreationApplication.java
<span style="font-family:Microsoft YaHei;font-size:14px;">package com.cwqsolo.study.akka.demo;

import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Random;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

import com.typesafe.config.ConfigFactory;

public class CreationApplication {

  public static void main(String[] args) {
    if (args.length == 0 || args[0].equals("CalculatorWorker"))
      startRemoteWorkerSystem();
    if (args.length == 0 || args[0].equals("Creation"))
      startRemoteCreationSystem();
  }

  public static void startRemoteWorkerSystem() {
    ActorSystem.create("CalculatorWorkerSystem",
        ConfigFactory.load(("calculator")));
    System.out.println("Started CalculatorWorkerSystem");
  }

  public static void startRemoteCreationSystem() {
    final ActorSystem system = ActorSystem.create("CreationSystem",
        ConfigFactory.load("remotecreation"));
    final ActorRef actor = system.actorOf(Props.create(CreationActor.class),
        "creationActor");

    System.out.println("Started CreationSystem");
    final Random r = new Random();
    system.scheduler().schedule(Duration.create(1, SECONDS),
        Duration.create(1, SECONDS), new Runnable() {
          @Override
          public void run() {
            if (r.nextInt(100) % 2 == 0) {
              actor.tell(new Op.Multiply(r.nextInt(100), r.nextInt(100)), null);
            } else {
              actor.tell(new Op.Divide(r.nextInt(10000), r.nextInt(99) + 1),
                  null);
            }
          }
        }, system.dispatcher());
  }
}</span>

原始碼執行的結果為:

Started CalculatorWorkerSystem
Started CreationSystem
Calculating 4915.0 / 63
Div result: 4915 / 63 = 78.02
Calculating 1409.0 / 15
Div result: 1409 / 15 = 93.93
Calculating 99 * 64
Mul result: 99 * 64 = 6336

4、原始碼分析

 4.1,我們先看一下序列圖,看看是如何呼叫的。



4.2 一些關鍵內容的說明

   1、只能建立一個WorkerSystem

    ActorSystem.create("CalculatorWorkerSystem",        ConfigFactory.load(("calculator")));
   2、 只能建立一個ActorSystem
    final ActorSystem system = ActorSystem.create("CreationSystem",    ConfigFactory.load("remotecreation"));

   3、建立Actor
    final ActorRef actor = system.actorOf(Props.create(CreationActor.class),   "creationActor");
   4、傳送訊息給這個Actor
    actor.tell(new Op.Multiply(r.nextInt(100), r.nextInt(100)), null);
  5、 Actor 返回訊息給上層呼叫的Actor
   getSender().tell(result, getSelf());