RxJava和Reactor比較
Reactor更推薦,當然RxJava 2.x仍然是一個不錯的選擇,如果你使用Android,那麼RxJava 2.x是你唯一的選擇.
首先,從功能角度來看,RxJava和Reactor的兩個版本都非常相似。如果您知道RxJava 1.x或2.x,Reactor將會很快非常熟悉,但您仍然需要了解差異。
其次,java.util.concurrent.Flow 類(一組介面,確切地說)是反應流 規範的一部分,捆綁到JDK中。該規範規定各種反應庫應該禮貌地表現並且乾淨地相互互動。然而,規範誕生於Java 9之前並引入Flow因此,庫是基於外部的reactive-streams.jar,而不是JDK。當談到RxJava / Reactor比較時,有很多觀點。讓我快速介紹一些差異。我假設您對這兩個庫都有一定的瞭解。
API
Flowable並且Flux有非常相似的API。顯然,他們都支援,如基本的操作map(),filter(),flatMap(),以及更先進的。主要區別在於Java目標版本。RxJava 2.x必須仍然支援Java 6,因為它在Android上被廣泛使用(稍後閱讀)。另一方面,Reactor的目標是Java 8+。因此,Reactor可以利用現代(-ish,Java 8是5年前,在撰寫本文時)API java.time和java.util.function。輸入更加安全:
<b>import</b> java.time.Duration; ... flux.window(Duration.ofSeconds(1));
而不是:
<b>import</b> java.util.concurrent.TimeUnit; ... flowable.window(1, TimeUnit.SECONDS);
傳遞單個Duration例項比整數更容易,更安全。Reactor是直接對應CompletableFuture,Optional,java.util.stream.Stream等。
Reactor加1分
型別安全
談到型別安全性,我真的很想念RxJava 1/2中引入的細粒度型別。
RxJava 2:Completable
Reactor:N / A
成功完成或失敗,不產生任何值。類似CompletableFuture<Void>
RxJava 2:Maybe<T>
Reactor:Mono<T>
成功完成或失敗,可能會也可能不會發出單個值。像非同步一樣Optional<T>
RxJava 2:Single<T>
Reactor:N / A
完成成功發射一個條目或失敗。
RxJava 2:Observable<T>
Reactor:N / A
發出無限數量的事件(從零到無限),可選擇成功完成或失敗。由於其代表的事件來源的性質,不支援背壓。
RxJava 2:Flowable<T>
Reactor:Flux<T>
發出無限數量的事件(從零到無限),可選擇成功完成或失敗。支援背壓(當消費者無法跟上時,源可能會減慢)
Reactor中缺少某些型別並不意味著它不支援某些用例。如果需要Completable,可以使用笨拙的Mono<Void>(如Mono.then()操作符)。
Observable和Flowable之間的區別給你一個提示,你應該期待什麼樣的流量控制flow-control 。
RxJava 2也分開Observable和Flowable型別。如果源本身是無法控制的,我們可以在型別安全中表達Observable。但是Observable有些操作毫無意義或無法實施。沒關係,使用另一Flowable,它具有完全的背壓支援,這意味著它可以減速。
我可以輕鬆在Flowable和Observable之間轉換,當消費者無法跟上生產者時,我該怎麼辦,但生產者不能放慢速度?刪除額外的訊息?暫緩一段時間?在Reactor中,兩種型別的流都表示為Flux(如在RxJava 1.x中),因此您可能總是因為缺少背壓而出現錯誤。在RxJava 2中,由於明確的保證,這種出錯現象有點不太常見。
RxJava加1分。
檢查異常
Reactor使用JDK中的標準函式型別,就像Function 它的API一樣。那很棒。但是這種選擇的一個微小副作用是對轉換中的已檢查異常進行笨拙的處理。請考慮以下不編譯的程式碼:
Flux .just(<font>"java.math.BigDecimal"</font><font>, </font><font>"java.time.Instant"</font><font>) .map(Class::forName) </font>
Class.forName()丟擲檢查ClassNotFoundException,遺憾的是,您不能從中丟擲已檢查的異常java.util.function.Function。另一方面,在RxJava中,io.reactivex.functions.Function 沒有這樣的約束,類似的程式碼可以編譯得很好。無論您是否喜歡已檢查的異常,一旦您必須處理它們,RxJava會讓您的體驗更加愉快。
RxJava加1分。雖然我不認為這是一個主要優勢。
測試
兩個庫中排程程式的存在不僅允許對併發進行細粒度控制。排程程式在單元測試中也發揮著重要作用。在Reactor和RxJava中,您可以將基於掛鐘的排程程式替換為基於人工虛擬時鐘的排程程式。當您在時間過去測試流的行為時,這非常方便。定期事件,超時,延遲 - 所有這些都可以可靠地進行單元測試。
Reactor更進了一步。在RxJava中,您必須外部化每個排程程式的配置,以便您可以在單元測試中替換它。本質上還不錯,你應該將它們外化。但是,當你需要測試時,它會很快變得混亂,TestScheduler 在您的生產程式碼中的幾十個地方;另一方面,在Reactor中,它足以包圍測試中的程式碼,並且所有底層排程程式都會被虛擬替換為自動神奇地替換:
StepVerifier .withVirtualTime(() -> Flux .never() .timeout(ofMillis(100)) ) .expectSubscription() .expectNoEvent(ofMillis(99)) .thenAwait(ofMillis(1)) .expectError(TimeoutException.<b>class</b>) .verify(ofSeconds(1));
此特定測試確保超時按預期工作。測試非常精確*且100%可預測。沒有sleeping 或忙碌等待結果。與RxJava相比,優勢在於無論您的流程有多複雜,所有排程程式都是stubbed存根的。在RxJava中,您可以編寫類似的測試,但是您必須確保替換正在測試的程式碼中的所有排程程式TestScheduler。反應器方便地通過所有層注入虛擬時鐘。
Reactor加1分
除錯
Reactor添加了一個很棒的除錯寶石:
Hooks.onOperatorDebug();
放置在應用程式開頭的這條細線將跟蹤訊號在流中的流動方式。我們來看一個實際的例子吧。想象一下以下流:
<b>import</b> reactor.core.publisher.Flux; <b>import</b> reactor.core.publisher.Hooks; <b>import</b> reactor.core.publisher.Mono; <b>import</b> java.io.File; <b>public</b> <b>class</b> StackTest { <b>public</b> <b>static</b> <b>void</b> main(String[] args) { Mono<Long> totalTxtSize = Flux .just(<font>"/tmp"</font><font>, </font><font>"/home"</font><font>, </font><font>"/404"</font><font>) .map(File::<b>new</b>) .concatMap(file -> Flux.just(file.listFiles())) .filter(File::isFile) .filter(file -> file.getName().endsWith(</font><font>".txt"</font><font>)) .map(File::length) .reduce(0L, Math::addExact); totalTxtSize.subscribe(System.out::println); } } </font>
它發現所有的.txt檔案下/tmp,/home和/404目錄,並計算出它們全部的總規模。該程式在執行時失敗,具有神祕的,長達一英里的堆疊跟蹤:
java.lang.NullPointerException at reactor.core.publisher.Flux.fromArray(Flux.java:953) at reactor.core.publisher.Flux.just(Flux.java:1161) at com.nurkiewicz.StackTest.lambda$main$0(StackTest.java:16) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126) at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:229) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:49) at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:53) at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) at reactor.core.publisher.MonoReduceSeed.subscribe(MonoReduceSeed.java:65) at reactor.core.publisher.Mono.subscribe(Mono.java:3695) at reactor.core.publisher.Mono.subscribeWith(Mono.java:3801) at reactor.core.publisher.Mono.subscribe(Mono.java:3689) at reactor.core.publisher.Mono.subscribe(Mono.java:3656) at reactor.core.publisher.Mono.subscribe(Mono.java:3603) at com.nurkiewicz.StackTest.main(StackTest.java:23)
如果你清理一下堆疊,你可能會感覺到看到了臭名昭著的NullPointerException:
at ...Flux.fromArray() at ...Flux.just() at com.nurkiewicz.StackTest.lambda$main$0(StackTest.java:16) ... at ...FluxArray.subscribe() at ...FluxMapFuseable.subscribe() at ...FluxConcatMap.subscribe() at ...FluxFilter.subscribe() at ...FluxFilter.subscribe() at ...FluxMap.subscribe() at ...MonoReduceSeed.subscribe() ... at com.nurkiewicz.StackTest.main(StackTest.java:23)
但它沒有多大幫助,大多數堆疊跟蹤指向Reactor原始碼(你不想去那裡)。在我們自己的程式碼中檢視所述運算子的宣告位置要方便得多。這是Hooks.onOperatorDebug()前面提到的堆疊跟蹤旁邊顯示的內容:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] : reactor.core.publisher.Flux.concatMap(Flux.java:3425) com.nurkiewicz.StackTest.main(StackTest.java:17) Error has been observed by the following <b>operator</b>(s): |_Flux.concatMap ⇢ com.nurkiewicz.StackTest.main(StackTest.java:17) |_Flux.filter ⇢ com.nurkiewicz.StackTest.main(StackTest.java:18) |_Flux.filter ⇢ com.nurkiewicz.StackTest.main(StackTest.java:19) |_Flux.map ⇢ com.nurkiewicz.StackTest.main(StackTest.java:20) |_Flux.reduce ⇢ com.nurkiewicz.StackTest.main(StackTest.java:21)
矛盾的是,我們對執行時丟擲異常的地方不感興趣。答案几乎總是:在Reactor勇氣中。我們更願意看看故障流是如何構建的。Reactor在這裡是無與倫比的。除錯反應式程式很難,真的很難。這個操作符使它更容易一些。順便問一下你知道的來源是NullPointerException什麼?來自JavaDocFile.listFiles():
如果發生I / O錯誤,則返回null。
返回... null ...如果...錯誤...發生。在二十一世紀。
Reactor加1分
Spring支援
您可以自由使用沒有Spring框架的Reactor。您也可以在沒有Reactor的情況下使用Spring框架。但事實恰恰相反,他們非常緊密地集成了Spring WebFlux(請注意名稱)Mono並Flux廣泛使用。Reactor加1分
Android開發
RxJava在Android開發人員中非常受歡迎。它非常乾淨地解決了兩個問題:
- 通過將UI事件建模為流來避免回撥地獄
- 容易線上程之間來回切換,特別是確保UI執行緒上不會發生I / O.
這就是為什麼RxJava仍然針對較舊的Java版本的原因之一。這可能會在未來發生變化,但在撰寫本文時,RxJava是Android開發人員的唯一選擇。它是一個堅固的庫,所以我認為它們不會錯過Reactor。為RxJava +1
成熟
RxJava在市場上更加成熟和完善(參見:Android)。此外,有許多獨立專案選擇RxJava作為其API,例如,官方Couchbase驅動程式 。對於MongoDB也是如此,但他們從RcJava驅動程式轉移到更相容RxJava和Reactor的通用反應流驅動程式 。這同樣適用於RxNetty 的弟弟reactor-netty 。RxJava書籍的數量也大大超過了Reactor的數量。因此,暫時為RxJava +1,但這很可能會在未來幾個月內發生變化。
總結
我沒想到,但事實證明我們有一個平局。然而,展望未來,Reactor肯定更有前途。它的表現似乎更好 ,開發更活躍,並由更大的玩家(Pivotal)支援。這些庫非常相似(至少從API的角度來看),但如果你做出選擇,Reactor可能會更好地為你服務。