【Android】Rxjava2 Flowable詳解與背壓那些事
1.Rxjava1中的背壓
Rxjava2中有這麼一個被觀察者Flowable,同樣作為被觀察者,它和Observable有什麼區別呢,在Rxjava2中,Observable不再支援背壓,而新增的Flowable支援背壓,何為背壓,就是上游傳送事件的速度大於下游處理事件的速度所產生的現象。
我們來看個例子,先把rxjava切換到rxjava1.0:
implementation 'io.reactivex:rxjava:1.1.6' implementation 'io.reactivex:rxandroid:1.2.1'
然後執行如下程式碼:
//被觀察者在主執行緒中,每1ms傳送一個事件 Observable.interval(1, TimeUnit.MILLISECONDS) //觀察者每1s才處理一個事件 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("tag", "---->" + aLong); } });
執行結果如下:

image.png
我特?說好的背壓呢,說好的異常呢,不要慌,因為上面的程式碼是同步的情況,都是執行在祝執行緒的,所以同步的情況下,被觀察者每傳送一個事件,觀察者就會處理一個事件,等觀察者處理完當前事件後,被觀察者才會繼續傳送事件,兩者分工明確,恩愛和睦,不存在傳送速度不一致的情況。
下面我們來看下非同步的情況:
//被觀察者在主執行緒中,每1ms傳送一個事件 Observable.interval(1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.newThread()) //觀察者在子執行緒中每1s處理一個事件 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("tag", "---->" + aLong); } });
執行後就會出現如下異常:

image.png
出現了背壓的情況,丟擲了MissingBackpressureException異常,非同步情況下被觀察者傳送事件是比較暴力的,一次性全部發完,放在快取池,然後觀察者一條條慢慢去處理,傳送過快就會出現背壓的情況.
背壓產生的條件:必須是非同步的場景下才會出現,即被觀察者和觀察者處於不同的執行緒中。
rxjava1中預設的快取池大小是16,當事件超過就會出現MissingBackpressureException,看如下例子:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 17; i++) { Log.w("tag", "send ----> i = " + i); subscriber.onNext("i = "+i); } } }) .subscribeOn(Schedulers.newThread()) //將觀察者的工作放在新執行緒環境中 .observeOn(Schedulers.newThread()) //觀察者處理每1000ms才處理一個事件 .subscribe(new Action1<String>() { @Override public void call(String value) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("tag", "---->" + value); } });
你看:

image.png
嗯,預設的快取池為什麼是16,這個問題問的好,因為人家rxjava給的預設值就是16啊,不信你看:
public final <B> Observable<List<T>> buffer(Observable<B> boundary) { return buffer(boundary, 16); }
rxjava1中也提供了處理背壓的操作符onBackpressureBuffer和onBackpressureDrop,下面我們來簡單看下onBackpressureBuffer:
//被觀察者在主執行緒中,每1ms傳送一個事件 Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 10000; i++) { Log.w("tag", "send ----> i = " + i); subscriber.onNext("i = "+i); } } }) .onBackpressureBuffer() .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(new Action1<String>() { @Override public void call(String value) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("tag", "---->" + value); } });
執行結果如下:

image.png
其實onBackpressureBuffer也就是增加了快取池的大小,這個值為Long.MAX_VALUE,當然我們也可以自己指定onBackpressureBuffer(size)的大小:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 100; i++) { Log.w("tag", "send ----> i = " + i); subscriber.onNext("i = "+i); } } }) .onBackpressureBuffer(100) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(new Action1<String>() { @Override public void call(String value) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("tag", "---->" + value); } });

image.png
onBackpressureDrop的作用是當觀察者來不及處理事件的時候,會把事件給丟棄掉,而onBackpressureLatest操作符表示當被觀察者Observable發出事件的速度比觀察者消耗得要快,觀察者會接收Observable最新發出的事件進行處理,這兩種情況大家可以自行測試感受下。
從上面的例子可以看出,在rxjava1中,interval操作符預設是不支援背壓的,我們來試試range操作符:
Observable.range(1,10000) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer value) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("tag", "---->" + value); } });
執行結果如下:

image.png
尼瑪,竟然沒有出現背壓,納尼?

image.png
表情包好像放錯了,走錯片場了,哈哈哈,難道range操作符有毛病,不應該啊,最後經過一番查詢,發現問題在observeOn操作符上,observeOn這個操作符內部有一個緩衝區,Android環境下長度是16,它會告訴range最多傳送16個事件,充滿緩衝區即可。
這樣可以看出,之前使用的interval操作符是不支援背壓的,而range則支援背壓,那麼到底什麼樣的Observable支援背壓或不支援背壓呢?

image.png
其實在rxjava1中,不是所有Observable都支援背壓,從上面的例子也可以看出來這一點,我們知道Observable有hot和cold之分,rxjava1中hot observable是不支援背壓的,而cold observable中也有一部分不支援背壓,這裡不再深究,想繼續瞭解可以自行google,另外一個原因是現在都tm Rxjava2了,我還在這扯rxjava1,罪過罪過,我也是為了引出問題。
簡單扯一下解決背壓的思路,無非是限制傳送的速度,俗稱限流,很多操作符都可以做到這些,比如sample在一段時間內只處理最後一個數據等,也可以使用rxjava1中提供的onBackpressureBuffer,onBackpressureDrop,onBackpressureLatest。
雖然rxjava1也有處理背壓的方法,但設計並不完美,快取池大小隻有16,而且被觀察者無法得知下游觀察者對事件的處理速度,一次性把事件拋給了下游觀察者,所以rxjava2中對背壓進行了改進。
2.Rxjava2中的背壓
Rxjava2中新增了一個被觀察者Flowable用來專門支援背壓,預設佇列大小128,並且其所有的操作符都強制支援背壓,先看個簡單的例子:
Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { for (int i = 0;i < 1000000; i++) { emitter.onNext("i = "+i); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("tag","----> "+s); } });
執行結果如下:

image.png
說好的支援背壓呢,怎麼這個熟悉的異常又出現了????

image.png
細心的同學肯定發現了,Flowable.create方法第二個引數BackpressureStrategy.ERROR,這個BackpressureStrategy類其實就是處理背壓的策略類,看下這個類的原始碼:
public enum BackpressureStrategy { //不指定背壓策略 MISSING, //出現背壓就丟擲異常 ERROR, //指定無限大小的快取池,此時不會出現異常,但無限制大量傳送會發生OOM BUFFER, //如果快取池滿了就丟棄掉之後發出的事件 DROP, //在DROP的基礎上,強制將最後一條資料加入到快取池中 LATEST }
依次來看下這幾種策略的區別吧!
MISSING
Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { for (int i = 0;i < 1000000; i++) { emitter.onNext("i = "+i); } } }, BackpressureStrategy.MISSING) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("tag","----> "+s); } });
不出所料,果然丟擲了異常:

MISSING
ERROR
BackpressureStrategy.ERROR上面已經測試過了,不再重複了,依然會報異常。
BUFFER
Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { for (int i = 0;i < 1000000; i++) { emitter.onNext("i = "+i); } } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("tag","----> "+s); } });
執行結果如下,確實不會出現背壓異常了,但是記憶體佔用嗖嗖的升高,資料量足夠大足夠快的時候,OOM指日可待,哈哈哈!!!

BUFFER
DROP
Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { for (int i = 0;i < 1000000; i++) { emitter.onNext("i = "+i); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("tag","----> "+s); } });
執行結果如下:

DROP
可以發現,在填充滿了預設的128個大小的快取池後,丟棄了很多資料,DROP就是幹這事的,發不下就不放了,有點狠啊!!
LATEST
Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { for (int i = 0;i < 1000; i++) { emitter.onNext("i = "+i); } } }, BackpressureStrategy.LATEST) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("tag","----> "+s); } });
這次我們只發送1000個事件,執行結果如下:

LATEST
LATEST策略下,當快取池滿了之後也是會丟棄事件的,不僅如此,它還會把事件的最後一個強制放入到快取池中,所以可以看到999被觀察者收到了。
上面我們都是用的Flowable的create建立的被觀察者,如果我們使用just,fromArray等操作符該如何指定背壓策略呢?其實也很簡單,因為rxjava2像rxjava1那樣也提供了onBackpressureBuffer(),onBackpressureDrop(),onBackpressureLatest(),這樣用就可以了:
Flowable.range(1,1000) .onBackpressureBuffer(1000) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer s) throws Exception { Log.e("tag","----> "+s); } });
嗯,執行結果很穩:

onBackpressureBuffer
那麼可能我們會有個疑問,上面的例子都是觀察者被動的接收事件,能不能主動拉取事件呢,當然可以,我們看下下面這個例子:
Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { for (int i = 0; i < 1000; i++) { emitter.onNext("i = " + i); } } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { subscription = s; } @Override public void onNext(String s) { Log.e("tag", "----> " + s); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
看下執行結果:

image.png
其實搞了半天,文章基本也要結束了,雖然rxjava提供了處理背壓的策略,但是最好還是能儘量避免上游被觀察者傳送事件過快過多,實在需要處理,就結合各種策略和操作符進行按需處理。
3.專案中的使用
上週在專案中遇到了這麼一個場景,就是在跳轉頁面之前需要釋放camera,這是個耗時操作,返回當前頁面的時候需要重新open Camera,而且open Camera的時機需要在SurfaceView的create中執行,這個場景剛好用request可以解決,例子和上面類似,就不再上程式碼了。

大家拜拜
關於Flowable的使用就先到這吧,由於個人水平有限,難免會犯些錯誤,有問題歡迎留言討論。