Rxjava Reactor之響應式入門
響應式到底是什麼?
現實生活中,當我們聽到有人喊我們的時候,我們會對其進行響應,也就是說,我們是基於事件驅動模式來進行的程式設計。
所以這個過程其實就是對於所產生事件的下發,我們的消費者對其進行的一系列的消費。
從這個角度,我們可以思考,整個程式碼的設計我們應該是針對於消費者來講的,比如看電影,有些畫面我們不想看,那就閉上眼睛,有些聲音不想聽,那就捂上耳朵,說白了,就是對於消費者的增強包裝,我們將這些複雜的邏輯給其拆分,然後分割成一個個的小任務進行封裝,於是就有了諸如filter、map、skip、limit等操作。而對於其中原始碼的設計邏輯,我們放在後面來講。
併發與並行的關係
可以這麼說,併發很好的利用了CPU時間片的特性,也就是作業系統選擇並執行一個任務,接著在下一個時間片會執行另一個任務,並把前一個任務設定成等待狀態。
其實這裡想表達的是併發並不意味著並行 。
具體來舉幾個情況:
- 有時候多執行緒執行會提高應用程式的效能,而有時候反而會降低程式的效能。這在關於JDK中其Stream API的使用上體現的很明顯,如果任務量很小,而我們又使用了並行流,反而降低了效能。
- 我們在多執行緒程式設計中可能會同時開啟或者關閉多個執行緒,這會產生的很多效能開銷,這也降低了程式效能。
- 當我們的執行緒同時都在等待IO過程,此時併發也就可能會阻塞CPU資源,其造成的後果不僅僅是使用者在等待結果,同時會浪費CPU的計算資源。
-
如果幾個執行緒共享了一個數據,情況就變得有些複雜了,我們需要考慮資料在各個執行緒中狀態的一致性。為了達到這個目的,我們很可能會使用Synchronized或者是lock來解決。
現在,應該對併發有一定的認知了吧。併發是一個很好的東西,但並不一定會實現並行。並行是在多個CPU核心上的同一時間執行多個任務或者一個任務分為多塊執行(如ForkJoin)。單核CPU的話就不要考慮了。
補充一點,實際上多執行緒就意味著併發,但是並行只發生在當這些執行緒在同一時間排程分配在不同CPU上執行。也就是說,並行是併發的一種特定的形式。往往我們一個任務裡會產生很多元素,然而這些個元素在不做操作的情況下大都只能在當前執行緒中操作,要麼我們就要對其進行ForkJoin,但這些對於我們很多程式員來講有時候很不好操作控制,上手難度有些高,響應式的話,我們可以簡單的通過其排程API就可以輕鬆做到事件元素的下發分配,其內部將每個元素包裝成一個任務提交到執行緒池中,我們可以根據是否是計算型任務還是IO型別的任務來選擇相應的執行緒池。
這裡,需要強調 一下:執行緒只是一個物件而已,不要把其想象成cpu中的某一個執行核心,這是很多人都在犯的錯,cpu時間片切換執行這些個執行緒。
響應式中的背壓到底是一種怎樣的理解
用一個不算很恰當的中國的成語來講,就是承上啟下。為了更好的解釋,我們來看一個場景,大壩,在洪水時期,下游沒有辦法一下子消耗那麼多水,大壩在此的作用就是攔截洪水,並根據下游的消耗情況酌情排放。再者,父親的背,我們小時候,社會上很多的事情首先由父親用自己的背來幫我們來扛起,然後根據我們自身的能力來適當的下發給我們壓力,也就是說,背壓應該寫在連線元素生產者和消費者的一個地方,即生產者和消費者的連線者。然後,通過這裡的描述,背壓應該具有承載元素的能力,也就是其必須是一個容器的,而且元素的儲存與下發應該具有先後的,那麼使用佇列則是最適合不過了。
如何去看Rxjava或者Reactor的原始碼,根據原始碼的介面的設計我們可以得到一些什麼啟示
關於響應式的Rx標準已經寫入了JDK中:java.util.concurrent.Flow
:
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } public static interface Subscription { public void request(long n); public void cancel(); } public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
可以看到,Flow這個類中包含了這4個介面定義,Publisher
通過subscribe
方法來和Subscriber
產生訂閱關係,而Subscriber
依靠onSubscribe
來首先和上游產生聯絡,這裡就是靠Subscription
來做到的,所以說,Subscription
往往會作為生產者的內部類定義其中,其用來接收生產者所生產的元素,支援背壓的話,Subscription
應該首先將其放入到一個佇列中,然後根據請求數量來呼叫Subscriber
的onNext
等方法進行下發。這個在Rx程式設計中都是統一的模式,我們通過Reactor中reactor.core.publisher.Flux#fromArray
所涉及的FluxArray
的原始碼來對此段內容進行理解:
final class FluxArray<T> extends Flux<T> implements Fuseable, Scannable { final T[] array; @SafeVarargs public FluxArray(T... array) { this.array = Objects.requireNonNull(array, "array"); } @SuppressWarnings("unchecked") public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) { if (array.length == 0) { Operators.complete(s); return; } if (s instanceof ConditionalSubscriber) { s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array)); } else { s.onSubscribe(new ArraySubscription<>(s, array)); } } @Override public void subscribe(CoreSubscriber<? super T> actual) { subscribe(actual, array); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.BUFFERED) return array.length; return null; } static final class ArraySubscription<T> implements InnerProducer<T>, SynchronousSubscription<T> { final CoreSubscriber<? super T> actual; final T[] array; int index; volatile boolean cancelled; volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<ArraySubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(ArraySubscription.class, "requested"); ArraySubscription(CoreSubscriber<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public void request(long n) { if (Operators.validate(n)) { if (Operators.addCap(REQUESTED, this, n) == 0) { if (n == Long.MAX_VALUE) { fastPath(); } else { slowPath(n); } } } } void slowPath(long n) { final T[] a = array; final int len = a.length; final Subscriber<? super T> s = actual; int i = index; int e = 0; for (; ; ) { if (cancelled) { return; } while (i != len && e != n) { T t = a[i]; if (t == null) { s.onError(new NullPointerException("The " + i + "th array element was null")); return; } s.onNext(t); if (cancelled) { return; } i++; e++; } if (i == len) { s.onComplete(); return; } n = requested; if (n == e) { index = i; n = REQUESTED.addAndGet(this, -e); if (n == 0) { return; } e = 0; } } } void fastPath() {...} } static final class ArrayConditionalSubscription<T> implements InnerProducer<T>, SynchronousSubscription<T> { .... } }
我們可以看到之前文字在原始碼內部的表達。這裡就不多說了。而對於各種中間操作的包裝我們該如何去做,依據之前的介面定義,我們應該更注重功能的設定,而無論是filter,flatmap,map等這些常用的操作,其實都是消費動作,理應定義在消費者層面,想到這裡,我們該如何去做?
這裡,我們就要結合我們的設計模式,裝飾模式,對subscribe(Subscriber<? super T> subscriber)
所傳入的Subscriber
進行功能增強,即從Subscriber
這個角度來講,使用的是裝飾增強模式,但從外面來看,其整體定義的依然是一個Flux
或者Mono
,這裡FluxArray
的話就是例子,這樣,從這個角度來講,其屬於向上適配,也就是適配模式,這裡的適配玩的比較有意思,完全就是靠對內部類的包裝然後通過subscribe(Subscriber<? super T> subscriber)
銜接來完成的。
所以,我們應該想到中國古代蘇軾的題西林壁裡有一句話:橫看成嶺側成峰 遠近高低各不同
講的就是從不同的角度去看待一個事物,就會得到不同的結果。同樣,一百個人心中有一百個哈姆雷特,也是對於同一個事物的看法,從這裡,我們應該能學到設計模式千萬不要特別刻意的去絕對化!
我們可以結合reactor.core.publisher.Flux#filter
涉及的FluxFilter
來觀察理解上述涉及的內容:
final class FluxFilter<T> extends FluxOperator<T, T> { final Predicate<? super T> predicate; FluxFilter(Flux<? extends T> source, Predicate<? super T> predicate) { super(source); this.predicate = Objects.requireNonNull(predicate, "predicate"); } @Override @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber<? super T> actual) { if (actual instanceof ConditionalSubscriber) { source.subscribe(new FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate)); return; } source.subscribe(new FilterSubscriber<>(actual, predicate)); } static final class FilterSubscriber<T> implements InnerOperator<T, T>, Fuseable.ConditionalSubscriber<T> { final CoreSubscriber<? super T> actual; final Predicate<? super T> predicate; Subscription s; boolean done; FilterSubscriber(CoreSubscriber<? super T> actual, Predicate<? super T> predicate) { this.actual = actual; this.predicate = predicate; } @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { Operators.onNextDropped(t, actual.currentContext()); return; } boolean b; try { b = predicate.test(t); } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, actual.currentContext())); return; } if (b) { actual.onNext(t); } else { s.request(1); } } @Override public boolean tryOnNext(T t) { if (done) { Operators.onNextDropped(t, actual.currentContext()); return false; } boolean b; try { b = predicate.test(t); } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, actual.currentContext())); return false; } if (b) { actual.onNext(t); } return b; } @Override public void onError(Throwable t) { if (done) { Operators.onErrorDropped(t, actual.currentContext()); return; } done = true; actual.onError(t); } @Override public void onComplete() { if (done) { return; } done = true; actual.onComplete(); } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return s; if (key == Attr.TERMINATED) return done; return InnerOperator.super.scanUnsafe(key); } @Override public CoreSubscriber<? super T> actual() { return actual; } @Override public void request(long n) { s.request(n); } @Override public void cancel() { s.cancel(); } } static final class FilterConditionalSubscriber<T> implements InnerOperator<T, T>, Fuseable.ConditionalSubscriber<T> { ... } }
根據這些設計,我們自己也是完全可以作為參考來通過一套api介面設計,可以衍生出很多規範邏輯的開發,比如我們看到的眾多的Rx衍生操作API的設計實現,其都是按照一套模板來進行的,我們可以稱之為程式碼層面的微服務設計。
如何去看待眾多函式表示式
人類最擅長描述場景,比如一套動作,假如是舞蹈的話,可以講是什麼什麼編舞,但是這個編舞又要在一定的框架之下,即有一定的規範,同樣,我們施展一套拳法,也需要一個規範,不能踢一腳也叫拳法。而對於這個規範的實現,那就是一套動作,對於拳法來講,可能就是一個很簡單的左勾拳或者右勾拳,也可以是比較複雜的詠春拳,太極拳等,而且一套拳法可能有很多小套路組成,這些小套路也是遵循著這個規範進行的,那麼依據這個思路,我們來看下面的函式式介面定義:
@FunctionalInterface public interface Predicate<T> { boolean test(T t); default Predicate<T> and(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) && other.test(t); } default Predicate<T> negate() { return (t) -> !test(t); } default Predicate<T> or(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) || other.test(t); } static <T> Predicate<T> isEqual(Object targetRef) { return (null == targetRef) ? Objects::isNull : object -> targetRef.equals(object); } } @FunctionalInterface public interface BiConsumer<T, U> { void accept(T t, U u); default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) { Objects.requireNonNull(after); return (l, r) -> { accept(l, r); after.accept(l, r); }; } }
可以看到無論是條件判斷表示式Predicate
還是無返回值動作處理函式BiConsumer
都遵循一個標準動作的設計定義思路,並通過default
方法來對同類動作進行編排,以達到更加豐富的效果。所以,函式式的應用更加傾向於乾淨利落,凸顯自己要做的事情就好,未來,我會在自己的Java程式設計方法論- JDK篇
中花大量篇幅來解讀函數語言程式設計的各種奇特而實用的使用方法,來降低我們複雜介面的設計邏輯難度,做到知名見義,瞭然於胸的效果。這個在我的Java程式設計方法論- Reactor與Spring webflux篇
中也是有涉及的。
關於響應式的使用效能的考究
響應式程式設計知識一種模式,用的好與壞全看自己對於api的理解程度,不要想著會多麼的降低效能,這個並沒有進行什麼過度包裝這一說的,當講到jdbc這裡如何表現不行的時候,當前並沒有一個開源的Reactor-jdbc的框架,也就造成的測試的不合理性,何況新的知識是需要大家一起共同來學習推動的,不好的地方我們推動就好,不需要上來就對其進行否定,mongodb有提供相應的響應式api,但其內部還是之前的方式,同樣,關係型資料庫也是一個道理,響應式程式設計注重的是中間過程的處理,關於生產元素的獲取它沒太多關係,更多的還是看元素生產者的效能,一家之言,可能有偏頗,希望理解,有問題提出就好。
最後,本人的關於 Java程式設計方法論-響應式 之 Rxjava篇一書的同步分享視訊已經分享在了B站,有感興趣的小夥伴可以自行觀看,歡迎一起交流的
ofollow,noindex"> https://www. bilibili.com/video/av34 537840/?p=1