6、 reactor
Reactor 簡介
前面提到的 RxJava 庫是 JVM 上反應式程式設計的先驅,也是反應式流規範的基礎。RxJava 2 在 RxJava 的基礎上做了很多的更新。不過 RxJava 庫也有其不足的地方。RxJava 產生於反應式流規範之前,雖然可以和反應式流的介面進行轉換,但是由於底層實現的原因,使用起來並不是很直觀。RxJava 2 在設計和實現時考慮到了與規範的整合,不過為了保持與 RxJava 的相容性,很多地方在使用時也並不直觀。Reactor 則是完全基於反應式流規範設計和實現的庫,沒有 RxJava 那樣的歷史包袱,在使用上更加的直觀易懂。Reactor 也是 Spring 5 中反應式程式設計的基礎。學習和掌握 Reactor 可以更好地理解 Spring 5 中的相關概念。
Reactor是JVM的完全非阻塞反應式程式設計基礎,具有高效的需求管理(以管理“背壓”的形式)。它直接與Java 8功能的API,特別是整合CompletableFuture,Stream和 Duration。它提供了可組合的非同步序列API Flux(用於[N]元素)和Mono(用於[0 | 1]元素),廣泛地實現了Reactive Extensions規範。這段的重點是和Java8結合利用lambda表示式簡潔的優點。
使用
在 Reactor 中,經常使用的類並不是很多,主要有以下兩個:
Mono 實現了 org.reactivestreams.Publisher 介面,代表0到1個元素的釋出者。
Flux 同樣實現了 org.reactivestreams.Publisher 介面,代表0到N個元素的發表者。
Scheduler 表示背後驅動反應式流的排程器,通常由各種執行緒池實現。
建立 Flux
有多種不同的方式可以建立 Flux 序列。
Flux 類的靜態方法
第一種方式是通過 Flux 類中的靜態方法。
just():可以指定序列中包含的全部元素。創建出來的 Flux 序列在釋出這些元素之後會自動結束。
fromArray(),fromIterable()和 fromStream():可以從一個數組、Iterable 物件或 Stream 物件中建立 Flux 物件。
empty():建立一個不包含任何元素,只發布結束訊息的序列。
error(Throwable error):建立一個只包含錯誤訊息的序列。
never():建立一個不包含任何訊息通知的序列。
range(int start, int count):建立包含從 start 起始的 count 個數量的 Integer 物件的序列。
interval(Duration period)和 interval(Duration delay, Duration period):建立一個包含了從 0 開始遞增的 Long 物件的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素髮布之前的延遲時間。
intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同,只不過該方法通過毫秒數來指定時間間隔和延遲時間。
Flux.just("Hello", "World").subscribe(System.out::println); Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println); Flux.empty().subscribe(System.out::println); Flux.range(1, 10).subscribe(System.out::println); Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println); Flux.intervalMillis(1000).subscribe(System.out::println);
上面的這些靜態方法適合於簡單的序列生成,當序列的生成需要複雜的邏輯時,則應該使用 generate() 或 create() 方法。
//通過generate生成元素 Flux.generate(sink -> { sink.next("Hello"); sink.complete(); }).subscribe(System.out::println); final Random random = new Random(); Flux.generate(ArrayList::new, (list, sink) -> { int value = random.nextInt(100); list.add(value); sink.next(value); if (list.size() == 10) { sink.complete(); } return list; }).subscribe(System.out::println); //通過create生成元素 Flux.create(sink -> { for (int i = 0; i < 10; i++) { sink.next(i); } sink.complete(); }).subscribe(System.out::println);
Mono 的建立
Mono 的建立方式與之前介紹的 Flux 比較相似。Mono 類中也包含了一些與 Flux 類中相同的靜態方法。這些方法包括 just(),empty(),error()和 never()等。除了這些方法之外,Mono 還有一些獨有的靜態方法。
fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中建立 Mono。
delay(Duration duration)和 delayMillis(long duration):建立一個 Mono 序列,在指定的延遲時間之後,產生數字 0 作為唯一值。
ignoreElements(Publisher<T> source):建立一個 Mono 序列,忽略作為源的 Publisher 中的所有元素,只產生結束訊息。
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):從一個 Optional 物件或可能為 null 的物件中建立 Mono。只有 Optional 物件中包含值或物件不為 null 時,Mono 序列才產生對應的元素。
還可以通過 create()方法來使用 MonoSink 來建立 Mono。程式碼清單 4 中給出了建立 Mono 序列的示例。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println); Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
操作符
filter:
對流中包含的元素進行過濾,只留下滿足 Predicate 指定條件的元素
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
zipWith:
zipWith 操作符把當前流中的元素與另外一個流中的元素按照一對一的方式進行合併。
Flux.just("a", "b") .zipWith(Flux.just("c", "d")) .subscribe(System.out::println); Flux.just("a", "b") .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)) .subscribe(System.out::println);
reduce 和 reduceWith:
reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。在操作時可以指定一個初始值。如果沒有初始值,則序列的第一個元素作為初始值。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println); Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
flatMap 和 flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每個元素轉換成一個流,再把所有流中的元素進行合併。flatMapSequential 和 flatMap 之間的區別與 mergeSequential 和 merge 之間的區別是一樣的。
Flux.just(5, 10) .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)) .toStream() .forEach(System.out::println);
排程器
前面介紹了反應式流和在其上可以進行的各種操作,通過排程器(Scheduler)可以指定這些操作執行的方式和所在的執行緒。有下面幾種不同的排程器實現。
當前執行緒,通過 Schedulers.immediate()方法來建立。
單一的可複用的執行緒,通過 Schedulers.single()方法來建立。
使用彈性的執行緒池,通過 Schedulers.elastic()方法來建立。執行緒池中的執行緒是可以複用的。當所需要時,新的執行緒會被建立。如果一個執行緒閒置太長時間,則會被銷燬。該排程器適用於 I/O 操作相關的流的處理。
使用對並行操作優化的執行緒池,通過 Schedulers.parallel()方法來建立。其中的執行緒數量取決於 CPU 的核的數量。該排程器適用於計算密集型的流的處理。
使用支援任務排程的排程器,通過 Schedulers.timer()方法來建立。
從已有的 ExecutorService 物件中建立排程器,通過 Schedulers.fromExecutorService()方法來建立。
某些操作符預設就已經使用了特定型別的排程器。比如 intervalMillis()方法建立的流就使用了由 Schedulers.timer()建立的排程器。通過 publishOn()和 subscribeOn()方法可以切換執行操作的排程器。其中 publishOn()方法切換的是操作符的執行方式,而 subscribeOn()方法切換的是產生流中元素時的執行方式。
使用排程器切換操作符執行方式:
Flux.create(sink -> { sink.next(Thread.currentThread().getName()); sink.complete(); }) .publishOn(Schedulers.single()) .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x)) .publishOn(Schedulers.elastic()) .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x)) .subscribeOn(Schedulers.parallel()) .toStream() .forEach(System.out::println);
在以上程式碼中,使用 create()方法建立一個新的 Flux 物件,其中包含唯一的元素是當前執行緒的名稱。接著是兩對 publishOn()和 map()方法,其作用是先切換執行時的排程器,再把當前的執行緒名稱作為字首新增。最後通過 subscribeOn()方法來改變流產生時的執行方式。執行之後的結果是[elastic-2] [single-1] parallel-1。最內層的執行緒名字 parallel-1 來自產生流中元素時使用的 Schedulers.parallel()排程器,中間的執行緒名稱 single-1 來自第一個 map 操作之前的 Schedulers.single()排程器,最外層的執行緒名字 elastic-2 來自第二個 map 操作之前的 Schedulers.elastic()排程器。
回壓
回壓的處理有以下幾種策略:
IGNORE: 完全忽略下游背壓請求,這可能會在下游佇列積滿的時候導致 IllegalStateException。
ERROR: 當下遊跟不上節奏的時候發出一個 IllegalStateException 的錯誤訊號。
DROP:當下遊沒有準備好接收新的元素的時候拋棄這個元素。
LATEST:讓下游只得到上游最新的元素。
BUFFER:(預設的)快取所有下游沒有來得及處理的元素(這個不限大小的快取可能導致 OutOfMemoryError)。
Hot vs Cold
到目前為止,我們討論的釋出者,無論是Flux還是Mono,都有一個特點:訂閱前什麼都不會發生。當我們“建立”了一個Flux的時候,我們只是“宣告”/“組裝”了它,但是如果不呼叫.subscribe來訂閱它,它就不會開始發出元素。
但是我們對“資料流”(尤其是乍聽到這個詞的時候)會有種天然的感覺,就是無論有沒有訂閱者,它始終在按照自己的步伐發出資料。就像假設一個人沒有一個粉絲,他也可以發微博一樣。
以上這兩種資料流分別稱為“冷”序列和“熱”序列。所以我們一直在介紹的Reactor3的釋出者就屬於“冷”的釋出者。不過有少數的例外,比如just生成的就是一個“熱”序列,它直接在組裝期就拿到資料,如果之後有誰訂閱它,就重新發送資料給訂閱者。Reactor 中多數其他的“熱”釋出者是擴充套件自Processor 的(下節會介紹到)。
下面我們通過對比了解一下兩種不同的釋出者的效果,首先是我們熟悉的“冷”釋出者:
@Test public void testCodeSequence() { Flux<String> source = Flux.*fromIterable*(Arrays.*asList*("blue", "green", "orange", "purple")) .map(String::toUpperCase); source.subscribe(d -> System.*out*.println("Subscriber 1: " + d)); System.*out*.println(); source.subscribe(d -> System.*out*.println("Subscriber 2: " + d)); }
我們對釋出者source進行了兩次訂閱,每次訂閱都導致它把資料流從新發一遍:
Subscriber 1: BLUE Subscriber 1: GREEN Subscriber 1: ORANGE Subscriber 1: PURPLE Subscriber 2: BLUE Subscriber 2: GREEN Subscriber 2: ORANGE Subscriber 2: PURPLE
然後再看一個“熱”釋出者的例子:
@Test public void testHotSequence() { UnicastProcessor<String> hotSource = UnicastProcessor.*create*(); Flux<String> hotFlux = hotSource.publish() .autoConnect() .map(String::toUpperCase); hotFlux.subscribe(d -> System.*out*.println("Subscriber 1 to Hot Source: " + d)); hotSource.onNext("blue"); hotSource.onNext("green"); hotFlux.subscribe(d -> System.*out*.println("Subscriber 2 to Hot Source: " + d)); hotSource.onNext("orange"); hotSource.onNext("purple"); hotSource.onComplete(); }
這個熱釋出者是一個UnicastProcessor,我們可以使用它的onNext等方法手動發出元素。上邊的例子中,hotSource發出兩個元素後第二個訂閱者才開始訂閱,所以第二個訂閱者只能收到之後的元素:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: PURPLE
由此可見,UnicastProcessor是一個熱釋出者。
另一個示例:
@Test public void codeTest() throws InterruptedException { //getNameByPhoney方法執行兩次 Mono<String> customerPhoneMono = Mono.fromSupplier( () -> ColdMonoHttp.getNameByPhone("18611854542") ); //getNameByPhoney方法執行一次 Mono<String> customerPhoneMono = Mono.fromFuture( CompletableFuture.supplyAsync(() -> ColdMonoHttp.getNameByPhone("18611854542")) ); customerPhoneMono.subscribe(a -> System.out.println("-----訂閱一:name=" + a)); customerPhoneMono.subscribe(a -> System.out.println("-----訂閱二:name=" + a)); Thread.sleep(10000); } public static String getNameByPhone(String phone) { try { System.out.println("get name start:" + phone); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); return null; } System.out.println("get name end:" + phone); return "zhangsan"; }
參考:
官網:
ofollow,noindex">GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM
GitHub & BitBucket HTML Preview