1. 程式人生 > >(12)Reactor 3 自定義數據流——響應式Spring的道法術器

(12)Reactor 3 自定義數據流——響應式Spring的道法術器

響應式編程 Spring WebFlux

本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範
本文源碼

2.2 自定義數據流

這一小節介紹如何通過定義相應的事件(onNextonErroronComplete) 創建一個 Flux 或 Mono。Reactor提供了generatecreatepushhandle等方法,所有這些方法都使用 sink(池)來生成數據流。

sink,顧名思義,就是池子,可以想象一下廚房水池的樣子。如下圖所示:

技術分享圖片

下面介紹到的方法都有一個sink提供給方法使用者,通常至少會暴露三個方法給我們,nexterrorcomplete。next和error相當於兩個下水口,我們不斷將自定義的數據放到next口,Reactor就會幫我們串成一個Publisher數據流,直到有一個錯誤數據放到error口,或按了一下complete

按鈕,數據流就會終止了。

2.2.1 generate

generate是一種同步地,逐個地發出數據的方法。因為它提供的sink是一個SynchronousSink, 而且其next()方法在每次回調的時候最多只能被調用一次。

generate方法有三種簽名:

    public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) 

    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

1)使用SynchronousSink生成數據流

    @Test
    public void testGenerate1() {
        final AtomicInteger count = new AtomicInteger(1);   // 1
        Flux.generate(sink -> {
            sink.next(count.get() + " : " + new Date());   // 2
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (count.getAndIncrement() >= 5) {
                sink.complete();     // 3
            }
        }).subscribe(System.out::println);  // 4
    }
  1. 用於計數;
  2. 向“池子”放自定義的數據;
  3. 告訴generate方法,自定義數據已發完;
  4. 觸發數據流。

輸出結果為每1秒鐘打印一下時間,共打印5次。

2)增加一個伴隨狀態

對於上邊的例子來說,count用於記錄狀態,當值達到5之後就停止計數。由於在lambda內部使用,因此必須是final類型的,且不能是原生類型(如int)或不可變類型(如Integer)。

如果使用第二個方法簽名,上邊的例子可以這樣改:

    @Test
    public void testGenerate2() {
        Flux.generate(
                () -> 1,    // 1
                (count, sink) -> {      // 2
                    sink.next(count + " : " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (count >= 5) {
                        sink.complete();
                    }
                    return count + 1;   // 3
                }).subscribe(System.out::println);
    }
  1. 初始化狀態值;
  2. 第二個參數是BiFunction,輸入為狀態和sink;
  3. 每次循環都要返回新的狀態值給下次使用。

3)完成後處理

第三個方法簽名除了狀態、sink外,還有一個Consumer,這個Consumer在數據流發完後執行。

        Flux.generate(
                () -> 1,
                (count, sink) -> {
                    sink.next(count + " : " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (count >= 5) {
                        sink.complete();
                    }
                    return count + 1;
                }, System.out::println)     // 1
                .subscribe(System.out::println);
    }
  1. 最後將count值打印出來。

如果 state 使用了數據庫連接或者其他需要進行清理的資源,這個 Consumer lambda 可以用來在最後完成資源清理任務。

2.2.2 create

create是一個更高級的創建Flux的方法,其生成數據流的方式既可以是同步的,也可以是異步的,並且還可以每次發出多個元素。

create用到了FluxSink,後者同樣提供 next,error 和 complete 等方法。 與generate不同的是,create不需要狀態值,另一方面,它可以在回調中觸發多個事件(即使事件是發生在未來的某個時間)。

create 常用的場景就是將現有的 API 轉為響應式,比如監聽器的異步方法。

先編寫一個事件源:

    public class MyEventSource {

        private List<MyEventListener> listeners;

        public MyEventSource() {
            this.listeners = new ArrayList<>();
        }

        public void register(MyEventListener listener) {    // 1
            listeners.add(listener);
        }

        public void newEvent(MyEvent event) {
            for (MyEventListener listener :
                    listeners) {
                listener.onNewEvent(event);     // 2
            }
        }

        public void eventStopped() {
            for (MyEventListener listener :
                    listeners) {
                listener.onEventStopped();      // 3
            }
        }

        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class MyEvent {   // 4
            private Date timeStemp;
            private String message;
        }
    }
  1. 註冊監聽器;
  2. 向監聽器發出新事件;
  3. 告訴監聽器事件源已停止;
  4. 事件類,使用了lombok註解。

準備一個監聽器接口,它可以監聽上邊第2和3的兩種事件:(1)新的MyEvent到來;(2)事件源停止。如下:

    public interface MyEventListener {
        void onNewEvent(MyEventSource.MyEvent event);
        void onEventStopped();
    }

下面的測試方法邏輯是:創建一個監聽器註冊到事件源,這個監聽器再收到事件回調的時候通過Flux.create的sink將一系列事件轉換成異步的事件流:

    @Test
    public void testCreate() throws InterruptedException {
        MyEventSource eventSource = new MyEventSource();    // 1
        Flux.create(sink -> {
                    eventSource.register(new MyEventListener() {    // 2
                        @Override
                        public void onNewEvent(MyEventSource.MyEvent event) {
                            sink.next(event);       // 3
                        }

                        @Override
                        public void onEventStopped() {
                            sink.complete();        // 4
                        }
                    });
                }
        ).subscribe(System.out::println);       // 5

        for (int i = 0; i < 20; i++) {  // 6
            Random random = new Random();
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));  
        }
        eventSource.eventStopped(); // 7
    }
  1. 事件源;
  2. 向事件源註冊用匿名內部類創建的監聽器;
  3. 監聽器在收到事件回調的時候通過sink將事件再發出;
  4. 監聽器再收到事件源停止的回調的時候通過sink發出完成信號;
  5. 觸發訂閱(這時候還沒有任何事件產生);
  6. 循環產生20個事件,每個間隔不超過1秒的隨機時間;
  7. 最後停止事件源。

運行一下這個測試方法,20個MyEvent陸續打印出來。

如果將上邊的create方法換成generate方法,則會報出異常:

java.lang.IllegalStateException: The generator didn‘t call any of the SynchronousSink method

證明generate並不支持異步的方式。

create方法還有一個變體方法push,適合生成事件流。與 create類似,push 也可以是異步地, 並且能夠使用以上各種回壓策略。所以上邊的例子可以替換為push方法。區別在於,push方法中,調用nextcompleteerror的必須是同一個線程。

除了nextcompleteerror方法外,FluxSink還有onRequest方法,這個方法可以用來響應下遊訂閱者的請求事件。從而不僅可以像上一個例子那樣,上遊在數據就緒的時候將其推送到下遊,同時下遊也可以從上遊拉取已經就緒的數據。這是一種推送/拉取混合的模式。比如:

    Flux<String> bridge = Flux.create(sink -> {
        myMessageProcessor.register(
          new MyMessageListener<String>() {

            public void onMessage(List<String> messages) {
              for(String s : messages) {
                sink.next(s);   // 1
              }
            }
        });
        sink.onRequest(n -> {   // 2
            List<String> messages = myMessageProcessor.request(n);  // 3
            for(String s : message) {
               sink.next(s); 
            }
        });
        ...
    }
  1. push方式,主動向下遊發出數據;
  2. 在下遊發出請求時被調用;
  3. 響應下遊的請求,查詢是否有可用的message。

(12)Reactor 3 自定義數據流——響應式Spring的道法術器