(15)Reactor 3 Operators——響應式Spring的道法術器
本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | 響應式流規範
2.5 Reactor 3 Operators
雖然響應式流規範中對Operator(以下均稱作”操作符“)並未做要求,但是與RxJava等響應式開發庫一樣,Reactor也提供了非常豐富的操作符。
2.5.1 豐富的操作符
本系列前邊的文章中,陸續介紹了一些常用的操作符。但那也只是冰山之一角,Reactor 3提供了豐富的操作符,如果要一個一個介紹,那篇幅大了去了,授人以魚不如授人以漁,我們可以通過以下幾種途徑了解操作符的應用場景,熟悉它們的使用方法:
- 附2是《Reactor 3 參考文檔》中關於“如何選擇合適的操作符”一節的翻譯,介紹了如何選擇合適的操作符。
- 參考Javadoc中對Flux和Mono的解釋和示意圖。
- 如果想通過實戰的方式上手試一下各種操作符,強烈推薦來自Reactor官方的lite-rx-api-hands-on項目。拿到項目後,你要做的就是使用操作符,完成“TODO”的代碼,讓所有的
@Test
綠燈就OK了。相信完成這些測試之後,對於常見的操作符就能了然於胸了。 - 此外,在日常的開發過程中,通過IDE也可以隨時查閱,比如IntelliJ:
由於Project Reactor的核心開發團隊也有來自RxJava的大牛,並且Reactor本身在開發過程中也借鑒了大多數RxJava的操作符命名(對於RxJava中少量命名不夠清晰的操作符進行了優化),因此對於熟悉RxJava的朋友來說,使用Reactor基本沒有學習成本。同樣的,學習了Reactor之後,再去使用RxJava也沒有問題。
2.5.2 “打包”操作符
我們在開發過程中,為了保持代碼的簡潔,通常會將經常使用的一系列操作封裝到方法中,以備調用。
Reactor也提供了類似的對操作符的“打包”方法。
1)使用 transform 操作符
transform
可以將一段操作鏈打包為一個函數式。這個函數式能在組裝期將被封裝的操作符還原並接入到調用transform
的位置。這樣做和直接將被封裝的操作符加入到鏈上的效果是一樣的。示例如下:
@Test public void testTransform() { Function<Flux<String>, Flux<String>> filterAndMap = f -> f.filter(color -> !color.equals("orange")) .map(String::toUpperCase); Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .transform(filterAndMap) .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d)); }
這個例子,通過名為filterAndMap
的函數式將filter
和map
操作符進行了打包,然後交給transform拼裝到操作鏈中。輸出如下:
blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE
2)使用 compose 操作符
compose 操作符與 transform 類似,也能夠將幾個操作符封裝到一個函數式中。主要的區別就是,這個函數式是針對每一個訂閱者起作用的。這意味著它對每一個 subscription 可以生成不同的操作鏈。舉個例子:
public void testCompose() {
AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
}
return f.filter(color -> !color.equals("purple"))
.map(String::toUpperCase);
};
Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.compose(filterAndMap);
composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :" + d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: " + d));
}
這個例子中,filterAndMap函數式有一個名為ai
的會自增的狀態值。每次調用subscribe
方法進行訂閱的時候,compose
會導致ai
自增,從而兩次訂閱的操作鏈是不同的。輸出如下:
blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple
也就是說,compose
中打包的函數式可以是有狀態的(stateful):
而transform
打包的函數式是無狀態的。將compose
換成transform
再次執行,發現兩次訂閱的操作鏈是一樣的,都會過濾掉orange
。
(15)Reactor 3 Operators——響應式Spring的道法術器