1. 程式人生 > >Akka工具(二)—Event Bus

Akka工具(二)—Event Bus

某一天

小明:你好,你是報刊工作人員嗎?

報刊:是的,請問,你有什麼需要嗎?

小明:我想訂閱報刊,關於娛樂、經濟、社會類的,其它的我不想要,可以嗎?

報刊:可以的,並且你可以隨時取消訂閱。

小明:好的,謝謝你。祝你工作愉快!

通常該過程我們都是在軟體上實現的,但是這並不影響我們要說明的內容。實際專案中,業務是複雜多樣的,一個業務可能需要依賴另外一個業務,同時也可能取消和該業務的關聯,如果我們不做好處理,各業務耦合度將會大大提高,不利於我們擴充套件和維護。怎樣才能解耦這些依賴關係呢?通過上述小故事,大家是不是想到了什麼呢,某個設計模式?沒錯,我們要使用的就是“釋出-訂閱”模式,使用該模式,訂閱者可以訂閱自己需要的東西,並且可以隨時取消訂閱,在這個過程中,釋出者不知道訂閱者是誰。

事件匯流排

Akka系統中,是不是需要我們自己去實現“釋出-訂閱”模式呢?答案肯定是no,Akka系統提供了一種事件匯流排(Event Bus)工具,內部實現了釋出-訂閱,通過使用它,我們就可以實現業務之間的解耦。

事件匯流排,Akka定義為EventBus型別,擁有釋出、訂閱、取消訂閱等功能。處理過程:釋出者將Event釋出到EventBus上,訂閱者將會接受到需要的訊息,通常來講,訂閱者是一個Actor,通過createReceive方法接受訊息。處理過程中,Classifier被用來描述事件分類,不同的事件有不同的訂閱者,當然一個訂閱者可以訂閱多個事件,EventBus將會通過Classifier來選擇訂閱者並向其傳送訊息。常見的Classifier有LookupClassification、SubchannelClassification、ScanningClassification等。

下面我們來看看LookupClassification型別(按照指定的實際型別匹配)的事件分類的EventBus如何實現。

首先定義一個Event實體,擁有事件型別和訊息:

@AllArgsConstructor
public class Event {
    @Getter
    private final String type;
    @Getter
    private final String message;
}

實體定義使用lombok,一個第三方依賴,可以方便我們開發,減少get、set等重複工作,如果沒有使用過,大家可以在網上搜索學習一下,這裡不做過多介紹。

繼承EventBus,實現釋出、訂閱、分類等相關方法:

public class MyLookupEventBus extends LookupEventBus<Event,ActorRef,String> {
    //期望的classify數,一般為2的n次冪
    @Override
    public int mapSize() {
        return 8;
    }

    @Override
    public int compareSubscribers(ActorRef a1, ActorRef a2) {
        return a1.compareTo(a2);
    }

    @Override
    public String classify(Event event) {
        return event.getType();
    }

    @Override
    public void publish(Event event, ActorRef actorRef) {
            actorRef.tell(event.getMessage(),ActorRef.noSender());
    }
}

使用EventBus通常需要三個泛型,分別為指定事件型別、訂閱者型別、Classifier型別。當EventBus釋出Event,將會通過classify方法來選擇目標訂閱者,然後使用publish方法通知他們。

定義EventBusActor,作為訂閱者:

public class EventBusActor extends AbstractActor {
     @Override public Receive createReceive() { 
        return receiveBuilder().matchAny(other -> {
            System.out.println("接受的訊息:" + other);
     }).build(); 
} }

下面就使用EventBus.subscribe方法訂閱所需事件,如下:

        ActorSystem system = ActorSystem.create("system");
        ActorRef eventActor = system.actorOf(Props.create(EventBusActor.class), "eventActor");
        //定義訊息匯流排
        MyLookupEventBus bus = new MyLookupEventBus();
        //eventActor訂閱add、update訊息
        bus.subscribe(eventActor, "add");
        bus.subscribe(eventActor, "update");

        //釋出add訊息
        bus.publish(new Event("add", "insert object"));

        //取消訂閱
        bus.unsubscribe(eventActor, "update");

        //釋出update訊息
        bus.publish(new Event("update", "update object"));

測試程式碼中,eventActor定義了add和update事件,但是它只會收到add事件訊息,因為bus在釋出update事件訊息時,我們已經取消了update事件訂閱,所以不會收到update object訊息。呼叫subscribe方法,該方法會在EventBus內部維護一個Map結構,key為classfiy,也就是事件型別,value為訂閱者列表,在訂閱的過程中,eventBus使用compareSubscribers方法做排序比較。採用事件匯流排方式,我們可以訂閱自己需要的事件,不會收到無關訊息,結構清晰,並且可以隨時取消訂閱,降低程式碼耦合度。

事件流

事件流是ActorSystem上下文環境中的事件匯流排,使用它可以很方便的將“釋出-訂閱模式”用於任意事件型別,可以是自定義型別,也可以是死信(Dead Letter)或者日誌(log messages)。訂閱者通常都是Actor,當Actor終止時,會自動從訂閱者列表移除。

下面我們定義一個訂閱死信訊息的Actor:

public class DeadLetterActor extends AbstractActor {
    @Override
    public Receive createReceive() {
        return receiveBuilder().match(DeadLetter.class, d -> {
            System.out.println("deadLetter:" + d);
        }).matchAny(o -> {
            System.out.println("其它訊息:" + o);
        }).build();
    }
}

使用事件流非常簡單,示例:

        ActorSystem system = ActorSystem.create("system");
        ActorRef deadLetterActor = system.actorOf(Props.create(DeadLetterActor.class), "deadLetterActor");
        system.eventStream().subscribe(deadLetterActor,DeadLetter.class);

DeadLetter.class方式只能訂閱常規的死信提醒,DeadLetterSuppression訊息除外,如果需要訂閱這類死信提醒,我們可以使用

system.eventStream().subscribe(deadLetterActor,DeadLetterSuppression.class);

system.eventStream().subscribe(deadLetterActor,AllDeadLetters.class);

AllDeadLetters方式可以獲取所有死信提醒訊息。

EventStream實現了SubchannelClassification分類器,所以我們可以指定層次事件匹配,訂閱一個父類下的所有事件。我們可以定義一個事件介面,定義兩個子事件實現它,然後使用EventStream.subscibe定義父事件型別,大家可以自行嘗試。

總結

事件匯流排,基於觀察者模式思想,實現了釋出-訂閱的訊息處理,釋出者不需要關心訂閱者具體是誰,訂閱者可以隨時取消訂閱,在某種程度上降低了系統之間的耦合度,有利於擴充套件和維護系統。