1. 程式人生 > >RxJava 2.0中backpressure(背壓)概念的理解

RxJava 2.0中backpressure(背壓)概念的理解

Backpressure(背壓、反壓力)

在rxjava中會經常遇到一種情況就是被觀察者傳送訊息太快以至於它的操作符或者訂閱者不能及時處理相關的訊息。那麼隨之而來的就是如何處理這些未處理的訊息。

舉個例子,使用zip操作符將兩個無限大的Observable壓縮在一起,其中一個被觀察者傳送訊息的速度是另一個的兩倍。一個比較不靠譜的做法就是把傳送比較快的訊息快取起來,當比較慢的Observable傳送訊息的時候取出來並將他們結合在一起。這樣做就使得rxjava變得笨重而且十分佔用系統資源。

在rxjava中有多重控制流以及背壓(backpressure)策略用來應對當一個快速傳送訊息的被觀察者遇到一個處理訊息緩慢的觀察者。下面的解釋將會向你展示你應當怎麼設計屬於你自己的被觀察者和操作符去應對流量控制(flow control)。

Hot and cold Observables, and multicasted Observables

Observable 資料流有兩種型別:hot 和 cold。這兩種型別有很大的不同。本節介紹他們的區別,以及作為 Rx 開發者應該如何正確的使用他們。

Cold observables

只有當有訂閱者訂閱的時候, Cold Observable 才開始執行發射資料流的程式碼。並且每個訂閱者訂閱的時候都獨立的執行一遍資料流程式碼。 Observable.interval 就是一個 Cold Observable。每一個訂閱者都會獨立的收到他們的資料流。

我們經常用到的Observable.create 就是 Cold Observable,而 just, range, timer 和 from 這些建立的同樣是 Cold Observable。

Hot observables

Hot observable 不管有沒有訂閱者訂閱,他們建立後就開發發射資料流。 一個比較好的示例就是 滑鼠事件。 不管系統有沒有訂閱者監聽滑鼠事件,滑鼠事件一直在發生,當有訂閱者訂閱後,從訂閱後的事件開始傳送給這個訂閱者,之前的事件這個訂閱者是接受不到的;如果訂閱者取消訂閱了,滑鼠事件依然繼續發射。

當一個cold observable是multicast(多路廣播)(當轉換完成時或者方法被呼叫)的時候,為了應對背壓,應當把cold observable轉換成hot observable。

cold observable 相當於響應式拉(就是observer處理完了一個事件就從observable拉取下一個事件),hot observable通常不能很好的處理響應式拉模型,但它卻是處理流量控制問題的不二候選人,例如使用onBackpressureBuffer或者onBackpressureDrop 操作符,和其他操作符比如operators, throttling, buffers, or windows

.

此段過於抽象,特提供原文如下,如有好的翻譯建議請提出。

Cold Observables are ideal for the reactive pull model of backpressure described below. Hot Observables typically do not cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the onBackpressureBuffer or onBackpressureDrop operators, throttling, buffers, or windows.

能避免背壓問題的運算子

防止過度建立observable的第一道防線就是使用普通陣列去減少observable傳送訊息的數量,在這一節會使用一些操作符去應對突發的observable傳送爆發性資料(一會沒有,一會很多)就像下面的這張圖片所示:

這裡寫圖片描述

這些操作符可以通過微調引數確保slow-consuming觀察者不被生產可觀測的。

Throttling節流

操作符中比如 sample( ) 、 throttleLast( )、 throttleFirst( )、 throttleWithTimeout( ) 、 debounce( ) 允許你通過調節速率來改變Observable發射訊息的速度。

以下圖表展示如何使用這些操作符。

樣本 (或 throttleLast)

sample 操作符定期收集observable傳送的資料items,併發射出最後一個數據item。
這裡寫圖片描述

Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);

上面程式碼解釋,定期且一次收集5個item,發射出最後一個item。

throttleFirst

跟sample有點類似,但是並不是把觀測到的最後一個item傳送出去,而是把該時間段第一個item傳送出去。

這裡寫圖片描述

Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);

debounce (or throttleWithTimeout)

debounce操作符會只發送兩個在規定間隔內的時間傳送的序列的最後一個。

這裡寫圖片描述

Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);

Buffers and windows 緩衝區和視窗

可以使用操作符比如buffer( ) 或者window( ) 收集過度生成訊息的Observable的資料items,然後發射出較少使用的資料。緩慢的消費者可以決定是否處理每個集合中的某一個特定的專案,或處理集合中的某種組合,或為集合中的每一項預定計劃工作,這都要視情況處理。

以下圖表展示如何使用這些操作符。

buffer

你可以定期關閉並釋放突發性的 Observable 緩衝區。

這裡寫圖片描述

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);

在突發期間你可以得到的想要的,並在緩衝區收集資料和最終在突發結束的時候釋放快取。使用debounce操作符釋放快取並關閉指示器buffer操作符。

此段超過本人翻譯水平,特提供原文如下,如有好的翻譯建議請提出。
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator:

這裡寫圖片描述

使用執行緒阻塞

處理過快生產item的其他策略就是使用執行緒阻塞,但是這麼做違背了響應式設計和非阻塞模型設計,但是它的確是一個可行的選擇。在rxJava中並沒有操作符可以做到這一點。

如果observable傳送訊息,subscriber消耗訊息都是在同一個執行緒這將很好的處理這個問題,但是你要知道,在rxJava中,很多時候生產者和消費者都不在同一個執行緒。

如何建立“響應式拉動(reactive pull)”backpressure

當subscribe訂閱observable的時候可以通過呼叫subscribe.request(n),n是你想要的observable傳送出來的量。

當在onNext()方法裡處理完資料itme後,你能重新呼叫 request()方法,通知Observable發射資料items。下面是個例子。

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});

你可以通過一個神奇數字request, request(Long.MAX_VALUE),禁用反應拉背力和要求Observable按照自己的步伐發射資料。request(0)是一個合法的呼叫,但沒有奏效。請求值小於零的請求會導致丟擲一個異常。

Reactive pull backpressure isn’t magic

backpressure 不會使得過度生產的observable的問題消失,這只是提供了一種更好的解決問題的方法。 讓我們更仔細的研究剛剛說到的zip操作符的問題。

這裡有兩個observable,a和b,b發射item比a更加的頻繁,當你想zip這兩個observable的時候,你需要把a傳送出來的第n個和b傳送出來的第n個物件處理,然而由於b傳送出來的速率更快,這時候b已經發送出了n+1~n+m個訊息了,這時候你要想要把a的n+1~n+m個訊息結合的話,就必須持有b已經發送出來的n+1~n+m訊息,同時,這意味著快取的數量在不斷的增長。

當然你可以給b新增操作符throttling,但是這意味著你將丟失某些從b傳送出來的項,你真正想要做的其實就是告訴b:“b你需要慢下來,但是你要保持你給我的資料是完整的”。

響應式拉(reective pull)模型可以當你做到這一點,subscriber從observable那裡拉取資料,這比較通常在observable那裡推送資料這種模式形成鮮明的對比。

在rxJava中,zip操作符正是使用了這種技巧。它給每個源observable維護了一個小的快取池,當它的快取池滿了以後,它將不會從源observable那裡拉取item。每當zip傳送一個item的時候,他從它的快取池裡面移除相應的項,並從源observable那裡拉取下一個項。

在rxJava中,很多操作符都使用了這種模式(響應式拉),但是有的操作符並沒有使用這種模式,因為他們也許執行的操作跟源observable處於相同的程序。在這種情況下,由於消耗事件會阻塞本程序,所以這一項的工作完成後,才有機會收到下一項。還有另外一種情況,backpressure也是不適合的,因為他們有指定的其他方式去處理流量控制,這些特殊的情況在rxJava的java文件裡面都會有詳細說明為毛。

但是,observable a和b必須正確的響應request()方法,如果一個observable還沒有被支援響應式拉(並不是每個observable都會支援),你可以採取以下其中一種操作都可以達到backpressure的行為:

onBackpressurebuffer

給observable傳送出來的資料持有一個快取,當request方法被呼叫的時候,給下層流傳送一個item。

這裡寫圖片描述

這個操作符還有一個實驗性的版本允許去設定這個快取池的大小,但當快取池滿了以後將會終止執行並丟擲異常。

onBackpressureDrop

命令observable丟棄後來的事件,直到subscriber再次呼叫request(n)方法的時候,就傳送給它的subscriber呼叫時間以後的n個事件。

這裡寫圖片描述

onBackpressureBlock (實驗性的, not in RxJava 1.0)

源Observable的執行緒操作直到Subscriber發出請求,然後只要有掛起的請求就結束執行緒。

這裡寫圖片描述

如果你不允許這些操作符操作不支援背壓的Observable,或者Subscriber或一些操作符嘗試申請活性拉反壓力,你會遇到一個MissingBackpressureException,你將被告知通過onError()進行回撥。

Flowable與Observable

最後,為了大家更好的理解backpressure概念,這裡補充說一下Flowable。

Observable在RxJava2.0中新的實現叫做Flowable, 同時舊的Observable也保留了。因為在 RxJava1.x 中,有很多事件不被能正確的背壓,從而丟擲MissingBackpressureException。

舉個簡單的例子,在 RxJava1.x 中的 observeOn, 因為是切換了消費者的執行緒,因此內部實現用佇列儲存事件。在 Android 中預設的 buffersize 大小是16,因此當消費比生產慢時, 佇列中的數目積累到超過16個,就會丟擲MissingBackpressureException, 初學者很難明白為什麼會這樣,使得學習曲線異常得陡峭。

而在2.0 中,Observable 不再支援背壓,而Flowable 支援非阻塞式的背壓。Flowable是RxJava2.0中專門用於應對背壓(Backpressure)問題。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,比如在Android中常見的點選事件,點選過快則經常會造成點選兩次的效果。其中,Flowable預設佇列大小為128。並且規範要求,所有的操作符強制支援背壓。幸運的是, Flowable 中的操作符大多與舊有的 Observable 類似。