1. 程式人生 > >RxJava 2 0中backpressure 背壓 概念的理解

RxJava 2 0中backpressure 背壓 概念的理解

exceptio n) 廣播 通過 pan roc follow 由於 html

英文原文:https://github.com/ReactiveX/RxJava/wiki/Backpressure

Backpressure(背壓、反壓力)

在rxjava中會經常遇到一種情況就是被觀察者發送消息太快以至於它的操作符或者訂閱者不能及時處理相關的消息。那麽隨之而來的就是如何處理這些未處理的消息。

舉個例子,使用zip操作符將兩個無限大的Observable壓縮在一起,其中一個被觀察者發送消息的速度是另一個的兩倍。一個比較不靠譜的做法就是把發送比較快的消息緩存起來,當比較慢的Observable發送消息的時候取出來並將他們結合在一起。這樣做就使得rxjava變得笨重而且十分占用系統資源。

在rxjava中有多重控制流以及背壓(backpressure)策略用來應對當一個快速發送消息的被觀察者遇到一個處理消息緩慢的觀察者。下面的解釋將會向你展示你應當怎麽設計屬於你自己的被觀察者和操作符去應對流量控制(flow control)。

Hot and cold Observables, and multicasted Observables

Observable 數據流有兩種類型:hot 和 cold。這兩種類型有很大的不同。本節介紹他們的區別,以及作為 Rx 開發者應該如何正確的使用他們。

Cold observables

只有當有訂閱者訂閱的時候, Cold Observable 才開始執行發射數據流的代碼。並且每個訂閱者訂閱的時候都獨立的執行一遍數據流代碼。 Observable.interval 就是一個 Cold Observable。每一個訂閱者都會獨立的收到他們的數據流。

我們經常用到的Observable.create 就是 Cold Observable,而 just, range, timer 和 from 這些創建的同樣是 Cold Observable。

Hot observables

Hot observable 不管有沒有訂閱者訂閱,他們創建後就開發發射數據流。 一個比較好的示例就是 鼠標事件。 不管系統有沒有訂閱者監聽鼠標事件,鼠標事件一直在發生,當有訂閱者訂閱後,從訂閱後的事件開始發送給這個訂閱者,之前的事件這個訂閱者是接受不到的;如果訂閱者取消訂閱了,鼠標事件依然繼續發射。

了解更多Hot and cold Observables,參考:
http://blog.csdn.net/jdsjlzx/article/details/51839090

當一個cold observable是multicast(多路廣播)(當轉換完成時或者方法被調用)的時候,為了應對背壓,應當把cold observable轉換成hot observable。

cold observable 相當於響應式拉(就是observer處理完了一個事件就從observable拉取下一個事件),hot observable通常不能很好的處理響應式拉模型,但它卻是處理流量控制問題的不二候選人,例如使用onBackpressureBuffer或者onBackpressureDrop 操作符,和其他操作符比如operators, throttling, buffers, or windows.

此段過於抽象,特提供原文如下,如有好的翻譯建議請提出。

Cold Observables are ideal for the reactive pull model of backpressure described below. Hot Observables typically do not cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the onBackpressureBuffer or onBackpressureDrop operators, throttling, buffers, or windows.

能避免背壓問題的運算符

防止過度創建observable的第一道防線就是使用普通數組去減少observable發送消息的數量,在這一節會使用一些操作符去應對突發的observable發送爆發性數據(一會沒有,一會很多)就像下面的這張圖片所示:

技術分享圖片

這些操作符可以通過微調參數確保slow-consuming觀察者不被生產可觀測的。

Throttling節流

操作符中比如 sample(?) 、 throttleLast(?)、 throttleFirst(?)、 throttleWithTimeout(?) 、 debounce(?) 允許你通過調節速率來改變Observable發射消息的速度。

以下圖表展示如何使用這些操作符。

樣本 (或 throttleLast)

sample 操作符定期收集observable發送的數據items,並發射出最後一個數據item。
技術分享圖片

Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
  • 1

上面代碼解釋,定期且一次收集5個item,發射出最後一個item。

官網解釋:http://reactivex.io/documentation/operators/sample.html

throttleFirst

跟sample有點類似,但是並不是把觀測到的最後一個item發送出去,而是把該時間段第一個item發送出去。

技術分享圖片

Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
  • 1

debounce (or throttleWithTimeout)

debounce操作符會只發送兩個在規定間隔內的時間發送的序列的最後一個。

技術分享圖片

Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
  • 1

Buffers and windows 緩沖區和窗口

可以使用操作符比如buffer(?) 或者window(?) 收集過度生成消息的Observable的數據items,然後發射出較少使用的數據。緩慢的消費者可以決定是否處理每個集合中的某一個特定的項目,或處理集合中的某種組合,或為集合中的每一項預定計劃工作,這都要視情況處理。

以下圖表展示如何使用這些操作符。

buffer

你可以定期關閉並釋放突發性的 Observable 緩沖區。

技術分享圖片

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
  • 1

在突發期間你可以得到的想要的,並在緩沖區收集數據和最終在突發結束的時候釋放緩存。使用debounce操作符釋放緩存並關閉指示器buffer操作符。

此段超過本人翻譯水平,特提供原文如下,如有好的翻譯建議請提出。
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator:

技術分享圖片

使用線程阻塞

處理過快生產item的其他策略就是使用線程阻塞,但是這麽做違背了響應式設計和非阻塞模型設計,但是它的確是一個可行的選擇。在rxJava中並沒有操作符可以做到這一點。

如果observable發送消息,subscriber消耗消息都是在同一個線程這將很好的處理這個問題,但是你要知道,在rxJava中,很多時候生產者和消費者都不在同一個線程。

如何建立“響應式拉動(reactive pull)”backpressure

當subscribe訂閱observable的時候可以通過調用subscribe.request(n),n是你想要的observable發送出來的量。

當在onNext()方法裏處理完數據itme後,你能重新調用 request()方法,通知Observable發射數據items。下面是個例子。

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

你可以通過一個神奇數字request, request(Long.MAX_VALUE),禁用反應拉背力和要求Observable按照自己的步伐發射數據。request(0)是一個合法的調用,但沒有奏效。請求值小於零的請求會導致拋出一個異常。

Reactive pull backpressure isn’t magic

backpressure 不會使得過度生產的observable的問題消失,這只是提供了一種更好的解決問題的方法。 讓我們更仔細的研究剛剛說到的zip操作符的問題。

這裏有兩個observable,a和b,b發射item比a更加的頻繁,當你想zip這兩個observable的時候,你需要把a發送出來的第n個和b發送出來的第n個對象處理,然而由於b發送出來的速率更快,這時候b已經發送出了n+1~n+m個消息了,這時候你要想要把a的n+1~n+m個消息結合的話,就必須持有b已經發送出來的n+1~n+m消息,同時,這意味著緩存的數量在不斷的增長。

當然你可以給b添加操作符throttling,但是這意味著你將丟失某些從b發送出來的項,你真正想要做的其實就是告訴b:“b你需要慢下來,但是你要保持你給我的數據是完整的”。

響應式拉(reective pull)模型可以當你做到這一點,subscriber從observable那裏拉取數據,這比較通常在observable那裏推送數據這種模式形成鮮明的對比。

在rxJava中,zip操作符正是使用了這種技巧。它給每個源observable維護了一個小的緩存池,當它的緩存池滿了以後,它將不會從源observable那裏拉取item。每當zip發送一個item的時候,他從它的緩存池裏面移除相應的項,並從源observable那裏拉取下一個項。

在rxJava中,很多操作符都使用了這種模式(響應式拉),但是有的操作符並沒有使用這種模式,因為他們也許執行的操作跟源observable處於相同的進程。在這種情況下,由於消耗事件會阻塞本進程,所以這一項的工作完成後,才有機會收到下一項。還有另外一種情況,backpressure也是不適合的,因為他們有指定的其他方式去處理流量控制,這些特殊的情況在rxJava的java文檔裏面都會有詳細說明為毛。

但是,observable a和b必須正確的響應request()方法,如果一個observable還沒有被支持響應式拉(並不是每個observable都會支持),你可以采取以下其中一種操作都可以達到backpressure的行為:

onBackpressurebuffer

給observable發送出來的數據持有一個緩存,當request方法被調用的時候,給下層流發送一個item。

技術分享圖片

這個操作符還有一個實驗性的版本允許去設置這個緩存池的大小,但當緩存池滿了以後將會終止執行並拋出異常。

onBackpressureDrop

命令observable丟棄後來的事件,直到subscriber再次調用request(n)方法的時候,就發送給它的subscriber調用時間以後的n個事件。

技術分享圖片

onBackpressureBlock (實驗性的, not in RxJava 1.0)

源Observable的線程操作直到Subscriber發出請求,然後只要有掛起的請求就結束線程。

技術分享圖片

如果你不允許這些操作符操作不支持背壓的Observable,或者Subscriber或一些操作符嘗試申請活性拉反壓力,你會遇到一個MissingBackpressureException,你將被告知通過onError()進行回調。


Flowable與Observable

最後,為了大家更好的理解backpressure概念,這裏補充說一下Flowable。

Observable在RxJava2.0中新的實現叫做Flowable, 同時舊的Observable也保留了。因為在 RxJava1.x 中,有很多事件不被能正確的背壓,從而拋出MissingBackpressureException。

舉個簡單的例子,在 RxJava1.x 中的 observeOn, 因為是切換了消費者的線程,因此內部實現用隊列存儲事件。在 Android 中默認的 buffersize 大小是16,因此當消費比生產慢時, 隊列中的數目積累到超過16個,就會拋出MissingBackpressureException, 初學者很難明白為什麽會這樣,使得學習曲線異常得陡峭。

而在2.0 中,Observable 不再支持背壓,而Flowable 支持非阻塞式的背壓。Flowable是RxJava2.0中專門用於應對背壓(Backpressure)問題。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,比如在Android中常見的點擊事件,點擊過快則經常會造成點擊兩次的效果。其中,Flowable默認隊列大小為128。並且規範要求,所有的操作符強制支持背壓。幸運的是, Flowable 中的操作符大多與舊有的 Observable 類似。

再分享一下我老師大神的人工智能教程吧。零基礎!通俗易懂!風趣幽默!還帶黃段子!希望你也加入到我們人工智能的隊伍中來!https://blog.csdn.net/jiangjunshow

RxJava 2 0中backpressure 背壓 概念的理解