1. 程式人生 > >響應式程式設計簡介之:Reactor

響應式程式設計簡介之:Reactor

[toc] # 簡介 Reactor是reactivex家族的一個非常重要的成員,Reactor是第四代的reactive library,它是基於Reactive Streams標準基礎上開發的,主要用來構建JVM環境下的非阻塞應用程式。 今天給大家介紹一下Reactor。 # Reactor簡介 Reactor是基於JVM的非阻塞API,他直接跟JDK8中的API相結合,比如:CompletableFuture,Stream和Duration等。 它提供了兩個非常有用的非同步序列API:Flux和Mono,並且實現了Reactive Streams的標準。 並且還可以和reactor-netty相結合,作為一些非同步框架的底層服務,比如我們非常熟悉的Spring MVC 5中引入的WebFlux。 我們知道WebFlux的底層使用的是reactor-netty,而reactor-netty又引用了Reactor。所以,如果你在POM中引入了webFlux依賴: ~~~xml org.springframework.boot spring-boot-starter-webflux ~~~ 那麼專案將會自動引入Reactor。 如果你用的不是Spring webflux,沒關係,你可以直接新增下面的依賴來使用Reactor: ~~~xml io.projectreactor reactor-core ~~~ # reactive programming的發展史 最最開始的時候微軟為.NET平臺建立了Reactive Extensions (Rx) library。接著RxJava實現了JVM平臺的Reactive。 然後Reactive Streams標準出現了,它定義了Java平臺必須滿足的的一些規範。並且已經整合到JDK9中的java.util.concurrent類中。 在Flow中定義了實現Reactive Streams的四個非常重要的元件,分別是Publisher,Subscriber,Subscription和Processor。 # Iterable-Iterator 和Publisher-Subscriber的區別 一般來說reactive在面向物件的程式語言中是以觀察者模式的擴充套件來使用的。 我們來具體看一下這個觀察者模式的實現,以Publisher和Subscriber為例: ~~~java public static interface Publisher { public void subscribe(Subscriber subscriber); } ~~~ ~~~java public static interface Subscriber { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } ~~~ 上面定義了兩個介面,Publisher和Subscriber,Publisher的作用就是subscribe到subscriber。 而subscriber定義了4個on方法,用來觸發特定的事件。 那麼Publisher中的subscribe是怎麼觸發Subscriber的onSubscribe事件呢? 很簡單,我們看一個具體的實現: ~~~java public void subscribe(Flow.Subscriber subscriber) { Subscription sub; if (throwable != null) { assert iterable == null : "non-null iterable: " + iterable; sub = new Subscription(subscriber, null, throwable); } else { assert throwable == null : "non-null exception: " + throwable; sub = new Subscription(subscriber, iterable.iterator(), null); } subscriber.onSubscribe(sub); if (throwable != null) { sub.pullScheduler.runOrSchedule(); } } ~~~ 上面的例子是PullPublisher的subscribe實現。我們可以看到,在這個subscribe中觸發了subscriber.onSubscribe方法。而這就是觀察者模式的祕密。 或者說,當Publisher呼叫subscribe的時候,是主動push subscriber的onSubscribe方法。 熟悉Iterable-Iterator模式的朋友應該都知道,Iterator模式,其實是一個主動的pull模式,因為需要不斷的去呼叫next()方法。所以它的控制權是在呼叫方。 # 為什麼要使用非同步reactive 在現代應用程式中,隨著使用者量的增多,程式設計師需要考慮怎麼才能提升系統的處理能力。 傳統的block IO的方式,因為需要佔用大量的資源,所以是不適合這樣的場景的。我們需要的是NO-block IO。 JDK中提供了兩種非同步程式設計的模型: 第一種是Callbacks,非同步方法可以通過傳入一個Callback引數的形式來在Callback中執行非同步任務。比較典型的像是java Swing中的EventListener。 第二中就是使用Future了。我們使用Callable來提交一個任務,然後通過Future來拿到它的執行結果。 這兩種非同步程式設計會有什麼問題呢? callback的問題就在於回撥地獄。熟悉JS的朋友應該很理解這個回撥地獄的概念。 簡單點講,回撥地獄就是在callback中又使用了callback,從而造成了這種callback的層級呼叫關係。 而Future主要是對一個非同步執行的結果進行獲取,它的 get()實際上是一個block操作。並且不支援異常處理,也不支援延遲計算。 當有多個Future的組合應該怎麼處理呢?JDK8 實際上引入了一個CompletableFuture類,這個類是Future也是一個CompletionStage,CompletableFuture支援then的級聯操作。不過CompletableFuture提供的方法不是那麼的豐富,可能滿足不了我的需求。 於是我們的Reactor來了。 # Flux Reactor提供了兩個非常有用的操作,他們是 Flux 和 Mono。 其中Flux 代表的是 0 to N 個響應式序列,而Mono代表的是0或者1個響應式序列。 我們看一個Flux是怎麼transfer items的: ![](https://img-blog.csdnimg.cn/20200902222919571.png) 先看下Flux的定義: ~~~java public abstract class Flux implements Publisher ~~~ 可以看到Flux其實就是一個Publisher,用來產生非同步序列。 Flux提供了非常多的有用的方法,來處理這些序列,並且提供了completion和error的訊號通知。 相應的會去呼叫Subscriber的onNext, onComplete, 和 onError 方法。 # Mono 我們看下Mono是怎麼transfer items的: ![](https://img-blog.csdnimg.cn/20200903102242943.png) 看下Mono的定義: ~~~java public abstract class Mono implements Publisher ~~~ Mono和Flux一樣,也是一個Publisher,用來產生非同步序列。 Mono因為只有0或者1個序列,所以只會觸發Subscriber的onComplete和onError方法,沒有onNext。 另一方面,Mono其實可以看做Flux的子集,只包含Flux的部分功能。 Mono和Flux是可以互相轉換的,比如Mono#concatWith(Publisher)返回一個Flux,而 Mono#then(Mono)返回一個Mono. # Flux和Mono的基本操作 我們看下Flux建立的例子: ~~~java Flux seq1 = Flux.just("foo", "bar", "foobar"); List iterable = Arrays.asList("foo", "bar", "foobar"); Flux seq2 = Flux.fromIterable(iterable); Flux numbersFromFiveToSeven = Flux.range(5, 3); ~~~ 可以看到Flux提供了很多種建立的方式,我們可以自由選擇。 再看看Flux的subscribe方法: ~~~java Disposable subscribe(); Disposable subscribe(Consumer consumer); Disposable subscribe(Consumer consumer, Consumer errorConsumer); Disposable subscribe(Consumer consumer, Consumer errorConsumer, Runnable completeConsumer); Disposable subscribe(Consumer consumer, Consumer errorConsumer, Runnable completeConsumer, Consumer subscriptionConsumer); ~~~ subscribe可以一個引數都沒有,也可以多達4個引數。 看下沒有引數的情況: ~~~java Flux numbersFromFiveToSeven = Flux.range(5, 3); numbersFromFiveToSeven.subscribe(); ~~~ > 注意,沒有引數並不表示Flux的物件不被消費,只是不可見而已。 看下帶引數的情況:consumer用來處理on each事件,errorConsumer用來處理on error事件,completeConsumer用來處理on complete事件,subscriptionConsumer用來處理on subscribe事件。 前面的3個引數很好理解,我們來舉個例子: ~~~java Flux ints3 = Flux.range(1, 4); ints3.subscribe(System.out::println, error -> System.err.println("Error " + error), () -> System.out.println("Done"), sub -> sub.request(2)); ~~~ 我們構建了從1到4的四個整數的Flux,on each就是打印出來,如果中間有錯誤的話,就輸出Error,全部完成就輸出Done。 那麼最後一個subscriptionConsumer是做什麼用的呢? subscriptionConsumer accept的是一個Subscription物件,我們看下Subscription的定義: ~~~java public interface Subscription { public void request(long n); public void cancel(); } ~~~ Subscription 定義了兩個方法,用來做初始化用的,我們可以呼叫request(n)來決定這次subscribe獲取元素的最大數目。 比如上面我們的例子中,雖然構建了4個整數,但是最終輸出的只有2個。 上面所有的subscribe方法,都會返回一個Disposable物件,我們可以通過Disposable物件的dispose()方法,來取消這個subscribe。 Disposable只定義了兩個方法: ~~~java public interface Disposable { void dispose(); default boolean isDisposed() { return false; } ~~~ dispose的原理是向Flux 或者 Mono發出一個停止產生新物件的訊號,但是並不能保證物件產生馬上停止。 有了Disposable,當然要介紹它的工具類Disposables。 Disposables.swap() 可以建立一個Disposable,用來替換或者取消一個現有的Disposable。 Disposables.composite(…​)可以將多個Disposable合併起來,在後面統一做處理。 # 總結 本文介紹了Reactor的基本原理和兩非常重要的元件Flux和Mono,下一篇文章我們會繼續介紹Reactor core的一些更加高階的用法。敬請期待。 本文的例子[learn-reactive](https://github.com/ddean2009/learn-reactive/tree/master/reactorIntroduction) > 本文作者:flydean程式那些事 > > 本文連結:[http://www.flydean.com/introduction-to-reactor/](http://www.flydean.com/introduction-to-reactor/) > > 本文來源:flydean的部落格 > > 歡迎關注我的公眾號:「程式那些事」最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!