1. 程式人生 > >給初學者的RxJava2.0教程(九)

給初學者的RxJava2.0教程(九)

出處:http://www.jianshu.com/p/36e0f7f43a51

前言

好久不見朋友們,最近一段時間在忙工作上的事情,沒來得及寫文章,這兩天正好有點時間,趕緊寫下了這篇教程,免得大家說我太監了。

正題

先來回顧一下上上節,我們講Flowable的時候,說它採用了響應式拉的方式,我們還舉了個葉問打小日本的例子,再來回顧一下吧,我們說把上游看成小日本, 把下游當作葉問, 當呼叫Subscription.request(1)時, 葉問就說我要打一個! 然後小日本就拿出一個鬼子給葉問, 讓他打, 等葉問打死這個鬼子之後, 再次呼叫request(10), 葉問就又說我要打十個! 然後小日本又派出十個鬼子

給葉問, 然後就在邊上看熱鬧, 看葉問能不能打死十個鬼子, 等葉問打死十個鬼子後再繼續要鬼子接著打。

但是不知道大家有沒有發現,在我們前兩節中的例子中,我們口中聲稱的響應式拉並沒有完全體現出來,比如這個例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1"
); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); } }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new
Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(1); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); mSubscription.request(1); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });

雖然我們在下游中是每次處理掉了一個事件之後才呼叫request(1)去請求下一個事件,也就是說葉問的確是在打死了一個鬼子之後才繼續打下一個鬼子,可是上游呢?上游真的是每次當下遊請求一個才拿出一個嗎?從上上篇文章中我們知道並不是這樣的,上游仍然是一開始就傳送了所有的事件,也就是說小日本並沒有等葉問打死一個才拿出一個,而是一開始就拿出了所有的鬼子,這些鬼子從一開始就在這兒排隊等著被打死。

有個故事是這麼說的:

楚人有賣盾與矛者,先譽其盾之堅,曰:“吾盾之堅,物莫能陷也。”俄而又譽其矛之利,曰:“吾矛之利,萬物莫不陷也。”市人詰之曰:"以子之矛陷子之盾,何如?”其人弗能應也。眾皆笑之。

沒錯,我們前後所說的就是自相矛盾了,這說明了什麼呢,說明我們的實現並不是一個完整的實現,那麼,究竟怎樣的實現才是完整的呢?

我們先自己來想一想,在下游中呼叫Subscription.request(n)就可以告訴上游,下游能夠處理多少個事件,那麼上游要根據下游的處理能力正確的去傳送事件,那麼上游是不是應該知道下游的處理能力是多少啊,對吧,不然,一個巴掌拍不響啊,這種事情得你情我願才行。

那麼上游從哪裡得知下游的處理能力呢?我們來看看上游最重要的部分,肯定就是FlowableEmitter了啊,我們就是通過它來發送事件的啊,來看看它的原始碼吧(別緊張,它的程式碼灰常簡單):

public interface FlowableEmitter<T> extends Emitter<T> {
    void setDisposable(Disposable s);
    void setCancellable(Cancellable c);

    /**
     * The current outstanding request amount.
     * <p>This method is thread-safe.
     * @return the current outstanding request amount
     */
    long requested();

    boolean isCancelled();
    FlowableEmitter<T> serialize();
}

FlowableEmitter是個介面,繼承Emitter,Emitter裡面就是我們的onNext(),onComplete()和onError()三個方法。我們看到FlowableEmitter中有這麼一個方法:

long requested();

方法註釋的意思就是當前外部請求的數量,哇哦,這好像就是我們要找的答案呢. 我們還是實際驗證一下吧.

先來看同步的情況吧:

 public static void demo1() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

這個例子中,我們在上游中打印出當前的request數量,下游什麼也不做。

我們先猜測一下結果,下游沒有呼叫request(),說明當前下游的處理能力為0,那麼上游得到的requested也應該是0,是不是呢?

來看看執行結果:

D/TAG: onSubscribe
D/TAG: current requested: 0

哈哈,結果果然是0,說明我們的結論基本上是對的。

那下游要是呼叫了request()呢,來看看:

 public static void demo1() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(10); //我要打十個!
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

這次在下游中呼叫了request(10),告訴上游我要打十個,看看執行結果:

D/TAG: onSubscribe
D/TAG: current requested: 10

果然!上游的requested的確是根據下游的請求來決定的,那要是下游多次請求呢?比如這樣:

public static void demo1() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(10);  //我要打十個!
                        s.request(100); //再給我一百個!
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

下游先呼叫了request(10), 然後又呼叫了request(100),來看看執行結果:

D/TAG: onSubscribe
D/TAG: current requested: 110

看來多次呼叫也沒問題,做了加法

誒加法?對哦,只是做加法,那什麼時候做減法呢?

當然是傳送事件啦!

來看個例子吧:

 public static void demo2() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "before emit, requested = " + emitter.requested());

                        Log.d(TAG, "emit 1");
                        emitter.onNext(1);
                        Log.d(TAG, "after emit 1, requested = " + emitter.requested());

                        Log.d(TAG, "emit 2");
                        emitter.onNext(2);
                        Log.d(TAG, "after emit 2, requested = " + emitter.requested());

                        Log.d(TAG, "emit 3");
                        emitter.onNext(3);
                        Log.d(TAG, "after emit 3, requested = " + emitter.requested());

                        Log.d(TAG, "emit complete");
                        emitter.onComplete();

                        Log.d(TAG, "after emit complete, requested = " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(10);  //request 10
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

程式碼很簡單,來看看執行結果:

D/TAG: onSubscribe                        
D/TAG: before emit, requested = 10        
D/TAG: emit 1                             
D/TAG: onNext: 1                          
D/TAG: after emit 1, requested = 9        
D/TAG: emit 2                             
D/TAG: onNext: 2                          
D/TAG: after emit 2, requested = 8        
D/TAG: emit 3                             
D/TAG: onNext: 3                          
D/TAG: after emit 3, requested = 7        
D/TAG: emit complete                      
D/TAG: onComplete                         
D/TAG: after emit complete, requested = 7

大家應該能看出端倪了吧,下游呼叫request(n) 告訴上游它的處理能力,上游每傳送一個next事件之後,requested就減一,注意是next事件,complete和error事件不會消耗requested,當減到0時,則代表下游沒有處理能力了,這個時候你如果繼續傳送事件,會發生什麼後果呢?當然是MissingBackpressureException啦,試一試:

public static void demo2() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "before emit, requested = " + emitter.requested());

                        Log.d(TAG, "emit 1");
                        emitter.onNext(1);
                        Log.d(TAG, "after emit 1, requested = " + emitter.requested());

                        Log.d(TAG, "emit 2");
                        emitter.onNext(2);
                        Log.d(TAG, "after emit 2, requested = " + emitter.requested());

                        Log.d(TAG, "emit 3");
                        emitter.onNext(3);
                        Log.d(TAG, "after emit 3, requested = " + emitter.requested());

                        Log.d(TAG, "emit complete");
                        emitter.onComplete();

                        Log.d(TAG, "after emit complete, requested = " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(2);   //request 2
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

還是這個例子,只不過這次只request(2), 看看執行結果:

 D/TAG: onSubscribe
 D/TAG: before emit, requested = 2
 D/TAG: emit 1
 D/TAG: onNext: 1
 D/TAG: after emit 1, requested = 1
 D/TAG: emit 2
 D/TAG: onNext: 2
 D/TAG: after emit 2, requested = 0
 D/TAG: emit 3
 W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                 at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                 at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                 at zlc.season.rxjava2demo.demo.ChapterNine$4.subscribe(ChapterNine.java:80)
                 at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                 at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                 at zlc.season.rxjava2demo.demo.ChapterNine.demo2(ChapterNine.java:89)
                 at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)
                 at android.view.View.performClick(View.java:4780)
                 at android.view.View$PerformClick.run(View.java:19866)
                 at android.os.Handler.handleCallback(Handler.java:739)
                 at android.os.Handler.dispatchMessage(Handler.java:95)
                 at android.os.Looper.loop(Looper.java:135)
                 at android.app.ActivityThread.main(ActivityThread.java:5254)
                 at java.lang.reflect.Method.invoke(Native Method)
                 at java.lang.reflect.Method.invoke(Method.java:372)
                 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
                 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
 D/TAG: after emit 3, requested = 0
 D/TAG: emit complete
 D/TAG: after emit complete, requested = 0

到目前為止我們一直在說同步的訂閱,現在同步說完了,我們先用一張圖來總結一下同步的情況:


同步request.png

這張圖的意思就是當上下游在同一個執行緒中的時候,在下游呼叫request(n)就會直接改變上游中的requested的值,多次呼叫便會疊加這個值,而上游每傳送一個事件之後便會去減少這個值,當這個值減少至0的時候,繼續傳送事件便會拋異常了。

我們再來說說非同步的情況,非同步和同步會有區別嗎?會有什麼區別呢?帶著這個疑問我們繼續來探究。

同樣的先來看一個基本的例子:

public static void demo3() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

這次是非同步的情況,上游啥也不做,下游也啥也不做,來看看執行結果:

D/TAG: onSubscribe
D/TAG: current requested: 128

哈哈,又是128,看了我前幾篇文章的朋友肯定很熟悉這個數字啊!這個數字為什麼和我們之前所說的預設的水缸大小一樣啊,莫非?

帶著這個疑問我們繼續來研究一下:

 public static void demo3() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(1000); //我要打1000個!!
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

這次我們在下游呼叫了request(1000)告訴上游我要打1000個,按照之前我們說的,這次的執行結果應該是1000,來看看執行結果:

D/TAG: onSubscribe
D/TAG: current requested: 128

臥槽,你確定你沒貼錯程式碼?

是的,真相就是這樣,就是128,蜜汁128。。。


what happened?
I don't know !

為了答疑解惑,我就直接上圖了:


非同步request.png

可以看到,當上下游工作在不同的執行緒裡時,每一個執行緒裡都有一個requested,而我們呼叫request(1000)時,實際上改變的是下游主執行緒中的requested,而上游中的requested的值是由RxJava內部呼叫request(n)去設定的,這個呼叫會在合適的時候自動觸發。

現在我們就能理解為什麼沒有呼叫request,上游中的值是128了,因為下游在一開始就在內部呼叫了request(128)去設定了上游中的值,因此即使下游沒有呼叫request(),上游也能傳送128個事件,這也可以解釋之前我們為什麼說Flowable中預設的水缸大小是128,其實就是這裡設定的。

剛才同步的時候我們說了,上游每傳送一個事件,requested的值便會減一,對於非同步來說同樣如此,那有人肯定有疑問了,一開始上游的requested的值是128,那這128個事件傳送完了不就不能繼續傳送了嗎?

剛剛說了,設定上游requested的值的這個內部呼叫會在合適的時候自動觸發,那到底什麼時候是合適的時候呢?是發完128個事件才去呼叫嗎?還是傳送了一半才去呼叫呢?

帶著這個疑問我們來看下一段程式碼:

 public static void request() {
        mSubscription.request(96); //請求96個事件
    }

public static void demo4() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "First requested = " + emitter.requested());