1. 程式人生 > >Actor簡介(一)

Actor簡介(一)

       在開發初期,業務單一、系統簡單,一臺機器或許就能支撐,這個時候,單機上的任務大多直接呼叫API的方式就能完成。然而當業務需求日益增多,功能逐漸複雜化,我們就需要考慮把專案拆分成分散式環境,這時,多系統通訊就需要通過網路實現。JDK內建的併發包在單機上游刃有餘,在分散式環境中,就不是那麼完美了。Akka通過Actor很好的解決了遠端通訊問題。

     Actor初識

        Akka中,Actor使用訊息傳遞進行通訊,完成任務。Actor程式設計,不同於之前通過物件呼叫完成某個功能,它採用“問答”式API進行通訊,這種方式更加貼近日常生活,易於理解。另外,使用Actor,不是操作Actor物件,而是操作它的引用物件(ActorRef),在Akka中,ActorRef對Actor做了很好的封裝,防止外界對Actor的狀態進行修改。

       下面,來一個小示例認識一下actor:

        首先,我們建立一個maven project,並在pom.xml中引入Akka依賴:

<dependency>

    <groupId>com.typesafe.akka</groupId>

    <artifactId>akka-actor_2.11</artifactId>

    <version>2.5.16</version>

</dependency>

        定義Actor:

public class ActorDemo extends AbstractActor {

private final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);

@Override

public Receive createReceive() {

    return receiveBuilder().match(String.class, s -> {

        logger.info("小明:" + s);

      }).matchAny(other -> {

        logger.info("其它未知訊息:" + other);

      }).build();

}


public static void main(String[] args) {

    ActorSystem system = ActorSystem.create("system");

    ActorRef demo = system.actorOf(Props.create(ActorDemo.class), "actorDemo");
    
    demo.tell("hello world",ActorRef.noSender());

    final Integer other=20;

    demo.tell(other,ActorRef.noSender());

  }

}

         執行結果:

[INFO] [09/22/2018 10:35:54.295] [system-akka.actor.default-dispatcher-4] [akka://system/user/actorDemo] 小明:hello world

[INFO] [09/22/2018 10:35:54.296] [system-akka.actor.default-dispatcher-4] [akka://system/user/actorDemo] 其它未知訊息:20

       我們通過extends AbstractActor 類並且重寫createReceive方法建立Actor。重寫createReceive(),我們可以決定自己的actor可以接受何種型別的訊息,以及如何處理這些訊息。receiveBuilder工廠定義了許多API,可以方便我們處理。上述程式碼,actorDemo被允許接受String型別訊息,當不屬於String型別的訊息,我們當做未知訊息進行處理。

     Actor建立方式

       在Akka中,我們一般會使用ActorSystem或ActorContext來建立Actor。在上述示例中,我們使用的是ActorSystem來建立,它是一個比較重量級的物件,通常一個應用程式我們只會建立一個該物件。在建立Actor時,最好給出相應名字,用於業務識別。特別注意,在同一個ActorSystem中,actor不能重名。在實際專案中,我們更希望建立具有層級的actor,讓父級對子級進行管理和監控,這時,我們就會使用ActorContext來建立。例如: 

       ActorRef actorDemo = getContext().actorOf(Props.create(ActorDemo.class), "actorDemo");

     Actor構建工具Props

       通過上述介紹發現,無論是ActorSystem,還是ActorContext,它們都是通過接受一個Props物件來建立Actor。Props是一個配置類,Akka為我們提供三種方式,如下:

Props props1 = Props.create(ActorDemo.class); 

Props props2 = Props.create(ActorDemo.class,() -> new ActorWithArgs("arg")); 

Props props3 = Props.create(ActorDemo.class, "arg");

     訊息

        在actor網路中,actor與actor進行通訊都是通過訊息傳遞。這就好比現實中的寫信,A和B兩個人只知道各自的地址,沒有其它聯絡方式,這時他們通過信件來進行溝通,A給B發完信件之後,又可以去做其它的事情,B也是一樣。於此同時,A和B也可以接受其他小夥伴傳送的信件。這裡的信件就類似actor之間的訊息,通過非同步的方式傳送。

        訊息的傳送分為兩種:

類別

特點

tell

非同步傳送訊息並立即返回。

ask

非同步傳送訊息,返回Future表示可能的結果,可以設定超時時間,超時未得到結果,返回超時異常。

     tell方法

       當我們僅僅只需要非同步傳送訊息的時候,tell方法是一個不錯的選擇。

       demo.tell("hello world",ActorRef.noSender());

       tell方法有兩個引數,第一個引數表示訊息,它可以接受任何型別的資料,第二個引數表示傳送者,也就是另外一個actor的引用物件。上述使用的ActorRef.noSender(),表示沒有傳送者(其實是一個叫做deadLetters的Actor)。在專案中,如果我們想得到傳送者,可以呼叫getSender()方法。

     ask方法

       這個方法類似於java中的Future模式,也是非同步的得到一個返回物件,典型的“請求-響應”模式。通常我們採用非同步回撥的方式獲取Future物件。

       ask方法不能通過actorRef物件呼叫,而是 通過akka.pattern.Patterns包下的Patterns.ask方式來操作,ask方法有三個引數,第一個引數表示接受訊息的ActorRef,第二個引數表示傳送的訊息,第三個引數表示超時時間。ask方法會返回一個Future物件,它提供了三個回撥方法,供我們處理返回資料和異常—onSuccess、onFailure、onComplete。例如:

Future<Object> future = Patterns.ask(demo, "hello world", new Timeout(Duration.create(1, TimeUnit.SECONDS)));

future.onSuccess(new OnSuccess<Object>() {

@Override

public void onSuccess(Object o) throws Throwable {

    System.out.println("返回結果:" + o);

}

}, system.dispatcher());

       通過forward轉發訊息,傳送給TargetActor訊息的sender仍然是ForwardActor的傳送者,而不是ForwardActor。

     訊息轉發

       在Actor通訊中,我們除了可以給某個Actor直接傳送訊息之外,還可以通過forward轉發的方式。forward表明Akka會保留訊息原始發件人的地址及引用,當我們在實現訊息路由或者負載均衡時,非常有用。示例:

public class ForwardActor extends AbstractActor {

    private ActorRef actorRef = getContext().actorOf(Props.create(TargetActor.class), "targetActor");

    public static void main(String[] args) {

        ActorSystem actorSystem = ActorSystem.create();

        ActorRef forwardActor = actorSystem.actorOf(Props.create(ForwardActor.class), "forwardActor");

        forwardActor.tell("hello world", ActorRef.noSender());

    }

    @Override

    public Receive createReceive() {

        return receiveBuilder().match(String.class, s -> {

            actorRef.forward(s, getContext());

        }).build();

    }

}

class TargetActor extends AbstractActor {

    @Override

    public Receive createReceive() {

        return receiveBuilder().match(String.class, s -> {

            System.out.println("forward message:" + s);

        }).build();

    }

}

   

     查詢

       在介紹actor查詢之前,我們需要先了解一下actor路徑問題。

       之前我們已經談到,建立actor可以使用ActorSystem或ActorContext,使用ActorSystem建立的actor是我們所能建立的最高級別actor,也就是在/user/目錄下。在上述第一個示例中,輸出日誌中,有這樣一段akka://system/user/actorDemo,這就代表actorDemo的路徑,其中system是我們定義的ActorSystem名稱,user是我們建立actor所在的父目錄(我們建立的actor都在它之下),actorDemo是我們建立的actor名稱,它們共同組成一個actor路徑。假如我們在ActorDemo中,使用ActorContext建立一個Actor—otherActor,那麼otherActor的路徑就是akka://system/user/actorDemo/otherActor,以此類推。

       在實際專案中,當我們需要呼叫某一個Actor時,我們也可以使用路徑去定位到它,相關API:

 ActorSelection=[ActorSystem/ActorContext].actorSelection([path]);

       其中path,我們可以使用相對或者是絕對路徑,例如:

       /user/actorDemo

       ../actorDemo

       akka.tcp://[email protected]:8080/user/parentActor/childActor  (遠端actor路徑)

       呼叫actorSelection會返回一個ActorSelection物件,當我們想得到目標ActorRef時,可以向ActorSelection傳送一個Identify物件訊息,然後對方會自動返回一個ActorIdentity物件,該物件包含了目標ActorRef資訊,示例:

class TargetActor extends AbstractActor {
    @Override
    public Receive createReceive() {
        return receiveBuilder().match(Object.class, o -> {
            System.out.println("receive message:" + o);
        }).build();
    }
}


class LookupActor extends AbstractActor {
    private ActorRef actorRef = getContext().actorOf(Props.create(TargetActor.class), "targetActor");

    public static void main(String[] args) {
        ActorSystem actorSystem = ActorSystem.create("system");
        ActorRef actor = actorSystem.actorOf(Props.create(LookupActor.class), "lookupActor");
        actor.tell("find", ActorRef.noSender());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(String.class, s -> {
            if ("find".equals(s)) {
                ActorSelection targetActor = getContext().actorSelection("targetActor");
                targetActor.tell(new Identify("001"), getSelf());
            }
        }).match(ActorIdentity.class, i -> {
            if (i.correlationId().equals("001")) {
                Optional<ActorRef> ref = i.getActorRef();
                if (ref != null) {
                    ActorRef actorRef = ref.get();
                    System.out.println("id:" + i.correlationId() + " " + actorRef);
                    actorRef.tell("hello targetActor", getSelf());
                }
            }
        }).build();
    }
}

      執行結果:

id:001 Actor[akka://system/user/lookupActor/targetActor#1717439239]

receive message:hello targetActor

      上述程式碼中,由於TargetActor是LookupActor的子集,所以我們可以指定為“targetActor”。由於我們指定actor,有可能找不到,所以在獲取目標ActorRef時,需要判斷是否為空,以免出現異常。

       Actor查詢,不僅可以使用相對或絕對路徑,而且還可以使用萬用字元進行匹配,例如:

       ../*     查詢所有同級的actor

       /user/parentActor/*    查詢parentActor下面的所有子actor

       /user/parentActor/child*   查詢parentActor下面所有以child開頭的子actor

       另外,我們還可以使用ActorSelection的resolveOne得到ActorRef,該方法會返回一個Future物件,和ask返回Future物件相同,也具有onSuccess和onFailure回撥方法。我們可以使用onSuccess回撥方法獲取ActorRef物件,使用onFailure方法處理異常資訊。這裡不做過多描述,大家有興趣可以自行研究研究。

        在這裡對Actor的基本用法做了大致講解,下一節,對Actor的生命週期做詳細描述,由於本人才疏學淺,有不對的地方,望大家批評指正,共同進步,