1. 程式人生 > >(15)Reactor 3 Operators——響應式Spring的道法術器

(15)Reactor 3 Operators——響應式Spring的道法術器

響應式編程 Spring WebFlux

本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | 響應式流規範

2.5 Reactor 3 Operators

雖然響應式流規範中對Operator(以下均稱作”操作符“)並未做要求,但是與RxJava等響應式開發庫一樣,Reactor也提供了非常豐富的操作符。

2.5.1 豐富的操作符

本系列前邊的文章中,陸續介紹了一些常用的操作符。但那也只是冰山之一角,Reactor 3提供了豐富的操作符,如果要一個一個介紹,那篇幅大了去了,授人以魚不如授人以漁,我們可以通過以下幾種途徑了解操作符的應用場景,熟悉它們的使用方法:

  1. 附2是《Reactor 3 參考文檔》中關於“如何選擇合適的操作符”一節的翻譯,介紹了如何選擇合適的操作符。
  2. 參考Javadoc中對Flux和Mono的解釋和示意圖。
  3. 如果想通過實戰的方式上手試一下各種操作符,強烈推薦來自Reactor官方的lite-rx-api-hands-on項目。拿到項目後,你要做的就是使用操作符,完成“TODO”的代碼,讓所有的@Test綠燈就OK了。相信完成這些測試之後,對於常見的操作符就能了然於胸了。
  4. 此外,在日常的開發過程中,通過IDE也可以隨時查閱,比如IntelliJ:

技術分享圖片

由於Project Reactor的核心開發團隊也有來自RxJava的大牛,並且Reactor本身在開發過程中也借鑒了大多數RxJava的操作符命名(對於RxJava中少量命名不夠清晰的操作符進行了優化),因此對於熟悉RxJava的朋友來說,使用Reactor基本沒有學習成本。同樣的,學習了Reactor之後,再去使用RxJava也沒有問題。

2.5.2 “打包”操作符

我們在開發過程中,為了保持代碼的簡潔,通常會將經常使用的一系列操作封裝到方法中,以備調用。

Reactor也提供了類似的對操作符的“打包”方法。

1)使用 transform 操作符

transform可以將一段操作鏈打包為一個函數式。這個函數式能在組裝期將被封裝的操作符還原並接入到調用transform的位置。這樣做和直接將被封裝的操作符加入到鏈上的效果是一樣的。示例如下:

    @Test
    public void testTransform() {
        Function<Flux<String>, Flux<String>> filterAndMap =
                f -> f.filter(color -> !color.equals("orange"))
                        .map(String::toUpperCase);

        Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                .doOnNext(System.out::println)
                .transform(filterAndMap)
                .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
    }

這個例子,通過名為filterAndMap的函數式將filtermap操作符進行了打包,然後交給transform拼裝到操作鏈中。輸出如下:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE

2)使用 compose 操作符

compose 操作符與 transform 類似,也能夠將幾個操作符封裝到一個函數式中。主要的區別就是,這個函數式是針對每一個訂閱者起作用的。這意味著它對每一個 subscription 可以生成不同的操作鏈。舉個例子:

    public void testCompose() {
        AtomicInteger ai = new AtomicInteger();
        Function<Flux<String>, Flux<String>> filterAndMap = f -> {
            if (ai.incrementAndGet() == 1) {
                return f.filter(color -> !color.equals("orange"))
                        .map(String::toUpperCase);
            }
            return f.filter(color -> !color.equals("purple"))
                    .map(String::toUpperCase);
        };

        Flux<String> composedFlux =
                Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                        .doOnNext(System.out::println)
                        .compose(filterAndMap);

        composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :" + d));
        composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: " + d));
    }

這個例子中,filterAndMap函數式有一個名為ai的會自增的狀態值。每次調用subscribe方法進行訂閱的時候,compose會導致ai自增,從而兩次訂閱的操作鏈是不同的。輸出如下:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple

也就是說,compose中打包的函數式可以是有狀態的(stateful):

技術分享圖片

transform打包的函數式是無狀態的。將compose換成transform再次執行,發現兩次訂閱的操作鏈是一樣的,都會過濾掉orange

技術分享圖片

(15)Reactor 3 Operators——響應式Spring的道法術器