(11)照虎畫貓深入理解響應式流規範——響應式Spring的道法術器
2 響應式程式設計之法
上一章本著“快速上手”的原則,介紹了響應式流的概念,以及Reactor 3的使用。這一章,我們基於Reactor 3的實現原理,從《響應式流規範》入手,深入瞭解響應式流開發庫。
2.1 響應式流規範
現代軟體對近乎實時地處理資料的需求越來越強烈,對不斷變化的資訊的即時響應,意味著更大的商業價值,流處理是一種快速將資料轉換為有用資訊的手段。
資料流中的元素可以是一個一個的待計算的資料,也可以是一個一個待響應的事件。前者多用於大資料處理,比如Storm、Spark等產品,後者常用於響應式程式設計,比如Netflix在使用的RxJava、Scala程式語言的發明者Typesafe公司(已更名為Lightbend)的Akka Stream、Java開發者都熟悉的Pivotal公司的Project Reactor、走在技術前沿的Vert.x等。
軟體行業是一個非常注重分享和交流的行業。隨著對響應式程式設計技術的討論與溝通逐漸深入,2013年末的時候,Netflix、Pivotal、Typesafe等公司的工程師們共同發起了關於制定“響應式流規範(Reactive Stream Specification)”的倡議和討論,並在github上建立了reactive-streams-jvm專案。到2015年5月份,1.0版本的規範出爐,專案README就是規範正文。
各個響應式開發庫都要遵循這個規範,其好處也是顯而易見的。之所以我們編寫的Java程式碼可以在Hotspot、J9和Zing等JVM執行,是因為它們都遵循Java虛擬機器規範。類似的,由於各個響應式開發庫都遵循響應式流規範,因此互相相容,不同的開發庫之間可以進行互動,我們甚至可以同時在專案中使用多個響應式開發庫。對於Spring WebFlux來說,也可以使用RxJava作為響應式庫。
雖然響應式流規範是用來約束響應式開發庫的,作為使用者的我們如果能夠了解這一規範對於我們理解開發庫的使用也是很有幫助的,因為規範的內容都是對響應式程式設計思想的精髓的呈現。訪問reactive-streams-jvm專案,可以瀏覽規範的細節,包括其中定義的響應式流的特點:
- 具有處理無限數量的元素的能力;
- 按序處理;
- 非同步地傳遞元素;
- 必須實現非阻塞的回壓(backpressure)。
2.1.1 響應式流介面
響應式流規範定義了四個介面,如下:
1.Publisher
是能夠發出元素的釋出者。
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
2.Subscriber
是接收元素並做出響應的訂閱者。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
當執行subscribe
方法時,釋出者會回撥訂閱者的onSubscribe
方法,這個方法中,通常訂閱者會藉助傳入的Subscription
向釋出者請求n個數據。然後釋出者通過不斷呼叫訂閱者的onNext
方法向訂閱者發出最多n個數據。如果資料全部發完,則會呼叫onComplete
告知訂閱者流已經發完;如果有錯誤發生,則通過onError
發出錯誤資料,同樣也會終止流。
訂閱後的回撥用表示式表示就是onSubscribe onNext* (onError | onComplete)?
,即以一個onSubscribe
開始,中間有0個或多個onNext
,最後有0個或1個onError
或onComplete
事件。
Publisher
和Subscriber
融合了迭代器模式和觀察者模式。
我們經常用到的Iterable
和Iterator
就是迭代器模式的體現,可以滿足上邊第1和2個特點關於按需處理資料流的要求;而觀察者模式基於事件的回撥機制有助於滿足第3個特點關於非同步傳遞元素的要求。
3.Subscription
是Publisher
和Subscriber
的“中間人”。
public interface Subscription {
public void request(long n);
public void cancel();
}
當釋出者呼叫subscribe
方法註冊訂閱者時,會通過訂閱者的回撥方法onSubscribe
傳入Subscription
物件,之後訂閱者就可以使用這個Subscription
物件的request
方法向釋出者“要”資料了。回壓機制正是基於此來實現的,因此第4個特點也能夠實現了。
4.Processor
集Publisher
和Subscriber
於一身。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
這四個介面是實現各開發庫之間互相相容的橋樑,響應式流規範也僅僅聚焦於此,而對諸如轉換、合併、分組等等的操作一概未做要求,因此是一個非常抽象且精簡的介面規範。
如果這時候有人要造輪子,再寫一套響應式開發庫,如何基於這幾個介面展開呢?
2.1.2 照虎畫貓,理解訂閱後發生了什麼
Reactor 3是遵循響應式流規範的實現,因此,小擼一把Reactor的原始碼有助於我們理解規範中定義的介面的使用。
Reactor中,我們最先接觸的生成Publisher
的方法就是Flux.just()
,下面我們來動手寫程式碼模擬一下Reactor的實現方式。不過具備生產能力開發庫會考慮效能、併發安全性等諸多因素,所謂“照虎畫貓”,我們的程式碼只是模擬出實現思路,程式碼少的多,但五臟俱全。
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>
首先建立最最基礎的類Flux
,它是一個Publisher
。
package reactor.core.publisher;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
public abstract class Flux<T> implements Publisher<T> {
public abstract void subscribe(Subscriber<? super T> s);
}
在Reactor中,Flux
既是一個釋出者,又充當工具類的角色,當我們用Flux.just()
、Flux.range()
或Flux.interval()
等工廠方法生成Flux
時,會new一個新的Flux
,比如Flux.just
會返回一個FluxArray
物件。
public static <T> Flux<T> just(T... data) {
return new FluxArray<>(data);
}
返回的FluxArray
物件是Flux.just
生成的Publisher
,它繼承自Flux
,並實現了subscribe
方法。
public class FluxArray<T> extends Flux<T> {
private T[] array; // 1
public FluxArray(T[] data) {
this.array = data;
}
@Override
public void subscribe(Subscriber<? super T> actual) {
actual.onSubscribe(new ArraySubscription<>(actual, array)); // 2
}
}
FluxArray
內部使用一個數組來儲存資料;subscribe
方法通常會回撥Subscriber
的onSubscribe
方法,該方法需要傳入一個Subscription
物件,從而訂閱者之後可以通過回撥傳回的Subscription
的request
方法跟FluxArray
請求資料,這也是回壓的應有之義。
繼續編寫ArraySubscription
:
public class FluxArray<T> extends Flux<T> {
...
static class ArraySubscription<T> implements Subscription { // 1
final Subscriber<? super T> actual;
final T[] array; // 2
int index;
boolean canceled;
public ArraySubscription(Subscriber<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
@Override
public void request(long n) {
if (canceled) {
return;
}
long length = array.length;
for (int i = 0; i < n && index < length; i++) {
actual.onNext(array[index++]); // 3
}
if (index == length) {
actual.onComplete(); // 4
}
}
@Override
public void cancel() { // 5
this.canceled = true;
}
}
}
ArraySubscription
是一個靜態內部類。靜態內部類是最簡單的一種內部類,你儘可以把它當成普通的類,只不過恰好定義在其他類的內部;- 可見在
Subscription
內也有一份資料; - 當有可以發出的元素時,回撥訂閱者的
onNext
方法傳遞元素; - 當所有的元素都發完時,回撥訂閱者的
onComplete
方法; - 訂閱者可以使用
Subscription
取消訂閱。
到此為止,釋出者就開發完了。我們測試一下:
@Test
public void fluxArrayTest() {
Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe");
s.request(6); // 2
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
Subscriber
通過匿名內部類定義,其中需要實現介面的四個方法;- 訂閱時請求6個元素。
測試方法執行如下:
1
2
3
4
5
Completed.
如果請求3個元素呢?輸出如下:
1
2
3
沒有完成事件,OK,一個簡單的Flux.just
就完成了,通過這個例子我們能夠初步摸出Flux
工廠方法的一些“套路”:
- 工廠方法返回的是
Flux
子類的例項,如FluxArray
; FluxArray
的subscribe
方法會返回給訂閱者一個Subscription
實現類的物件,這個ArraySubscription
是FluxArray
的靜態內部類,定義了“如何釋出元素”的邏輯;- 訂閱者可以通過這個
ArraySubscription
物件向釋出者請求n個數據;釋出者也可以藉助這個ArraySubscription
物件向訂閱者傳遞資料元素(onNext/onError/onComplete)。
用圖來表示如下(由於Subscription是靜態內部類,可以看做普通類,就單獨放一邊了):
上圖的這個過程基本適用於大多數的用於生成Flux
/Mono
的靜態工廠方法,如Flux.just
、Flux.range
等。
首先,使用類似Flux.just
的方法建立釋出者後,會建立一個具體的釋出者(Publisher
),如FluxArray
。
- 當使用
.subscribe
訂閱這個釋出者時,首先會new一個具有相應邏輯的Subscription
(如ArraySubscription
,這個Subscription
定義瞭如何處理下游的request
,以及如何“發出資料”); - 然後釋出者將這個
Subscription
通過訂閱者的.onSubscribe
方法傳給訂閱者; - 在訂閱者的
.onSubscribe
方法中,需要通過Subscription
發起第一次的請求.request
; Subscription
收到請求,就可以通過回撥訂閱者的onNext
方法發出元素了,有多少發多少,但不能超過請求的個數;- 訂閱者在
onNext
中通常定義對元素的處理邏輯,處理完成之後,可以繼續發起請求; - 釋出者根據繼續滿足訂閱者的請求;
- 直至釋出者的序列結束,通過訂閱者的
onComplete
予以告知;當然序列傳送過程中如果有錯誤,則通過訂閱者的onError
予以告知並傳遞錯誤資訊;這兩種情況都會導致序列終止,訂閱過程結束。
以上從1~7這些階段稱為訂閱期(subscribe time)。
2.1.3 照虎畫貓——操作符“流水線”
響應式開發庫的一個很讚的特性就是可以像組裝流水線一樣將操作符串起來,用來聲明覆雜的處理邏輯。比如:
Flux ff = Flux.just(1, 2, 3, 4, 5)
.map(i -> i * i)
.filter(i -> (i % 2) == 0);
ff.subscribe(...)
通過原始碼,我們可以瞭解這種“流水線”的實現機制。下面我們仍然是通過照虎畫貓的方式模擬一下Reactor中Flux.map
的實現方式。
Flux.map
用於實現轉換,轉換後元素的型別可能會發生變化,轉換的邏輯由引數Function
決定。方法本身返回的是一個轉換後的Flux
,基於此,該方法實現如下:
public abstract class Flux<T> implements Publisher<T> {
...
public <V> Flux<V> map(Function<? super T, ? extends V> mapper) { // 1
return new FluxMap<>(this, mapper); // 2
}
}
- 泛型方法,通過泛型表示可能出現的型別的變化(T → V);
FluxMap
就是新的Flux。
既然FluxMap
是一個新的Flux,那麼與2.1.2中FluxArray
類似,其內部定義有MapSubscription
,這是一個Subscription
,能夠根據其訂閱者的請求發出資料。
public class FluxMap<T, R> extends Flux<R> {
private final Flux<? extends T> source;
private final Function<? super T, ? extends R> mapper;
public FluxMap(Flux<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(Subscriber<? super R> actual) {
source.subscribe(new MapSubscriber<>(actual, mapper));
}
static final class MapSubscription<T, R> implements Subscription {
private final Subscriber<? super R> actual;
private final Function<? super T, ? extends R> mapper;
MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void request(long n) { // 1
// TODO 收到請求,發出元素
}
@Override
public void cancel() {
// TODO 取消訂閱
}
}
}
- 但是
map
操作符並不產生資料,只是資料的搬運工。收到request
後要發出的資料來自上游。
所以MapSubscription
同時也應該是一個訂閱者,它訂閱上游的釋出者,並將資料處理後傳遞給下游的訂閱者(為了跟Reactor原始碼一致,將MapSubscription
改名為MapSubscriber
,其實沒差)。
如圖,對下游是作為釋出者,傳遞上游的資料到下游;對上游是作為訂閱者,傳遞下游的請求到上游。
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { // 1
...
}
- 實現了
Subscriber
和Subscription
。
這樣,總共有5個方法要實現:來自Subscriber
介面的onSubscribe
、onNext
、onError
、onComplete
,和來自Subscription
介面的request
和cancel
。下面我們本著“搬運工”的角色實現這幾個方法即可。
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
private final Subscriber<? super R> actual;
private final Function<? super T, ? extends R> mapper;
boolean done;
Subscription subscriptionOfUpstream;
MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onSubscribe(Subscription s) {
this.subscriptionOfUpstream = s; // 1
actual.onSubscribe(this); // 2
}
@Override
public void onNext(T t) {
if (done) {
return;
}
actual.onNext(mapper.apply(t)); // 3
}
@Override
public void onError(Throwable t) {
if (done) {
return;
}
done = true;
actual.onError(t); // 4
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete(); // 5
}
@Override
public void request(long n) {
this.subscriptionOfUpstream.request(n); // 6
}
@Override
public void cancel() {
this.subscriptionOfUpstream.cancel(); // 7
}
}
- 拿到來自上游的Subscription;
- 回撥下游的
onSubscribe
,將自身作為Subscription
傳遞過去; - 收到上游發出的資料後,將其用mapper進行轉換,然後接著發給下游;
- 將上游的錯誤訊號原樣發給下游;
- 將上游的完成訊號原樣發給下游;
- 將下游的請求傳遞給上游;
- 將下游的取消操作傳遞給上游。
從這個對原始碼的模仿,可以體會到,當有多個操作符串成“操作鏈”的時候:
- 向下:很自然地,資料和訊號(
onSubscribe
、onNext
、onError
、onComplete
)是通過每一個操作符向下傳遞的,傳遞的過程中進行相應的操作處理,這一點並不難理解; - 向上:然而在內部我們看不到的是,有一個自下而上的“訂閱鏈”,這個訂閱鏈可以用來傳遞
request
,因此回壓(backpressure)可以實現從下游向上遊的傳遞。
這一節最開頭的那一段程式碼的執行過程如下圖所示:
2.1.4 LambdaSubscriber
在1.3.2節的時候,介紹了.subscribe
的幾個不同方法簽名的變種:
subscribe( Consumer<? super T> consumer)
subscribe( @Nullable Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer)
subscribe( @Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer)
subscribe( @Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Consumer<? super Subscription> subscriptionConsumer)
用起來非常方便,但是響應式流規範中只定義了一個訂閱方法subscribe(Subscriber subscriber)
。實際上,這幾個方法最終都是呼叫的subscribe(LambdaSubscriber subscriber)
,並通過LambdaSubscriber
實現了對不同個數引數的組裝。如下圖所示:
因此,
flux.subscribe(System.out::println, System.err::println);
是呼叫的:
flux.subscribe(new LambdaSubscriber(System.out::println, System.err::println, null, null));