1. 程式人生 > >Rxjava observeOn()和subscribeOn()初探

Rxjava observeOn()和subscribeOn()初探

所在 ride 多個 div ner ons android開發 alpha 距離

Rxjava這麽強大的類庫怎麽可能沒有多線程切換呢?

其中observeOn()與subscribeOn()就是實現這樣的作用的。本文主要講解observeOn()與subscribeOn()的用法,不去探究其中的原理。

0. 默認情況

在默認情況下,其不做任何線程處理,Observable和Observer處於同一線程,沒有做任何線程切換,依次執行,如下圖所示:

技術分享

可以寫一個demo測試之:

Observable<String> source = Observable.just("Alpha","Beta","Gamma");
        source.subscribe(new
Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "Received " + integer + " on thread:" + Thread.currentThread().getName()); } });

1. subscribeOn()的作用

該方法是指明數據產生的線程,即Observable發射數據所在的線程,如果之後不做任何處理,操作符operator(如map,flatmap等)也在subscribeOn指定的線程做數據處理。

技術分享

多次使用subscribeOn()並不能頻繁地切換線程,只有距離數據源最近的一個subscribeOn()唯一確定數據源發射數據的線程。如代碼所示:

Observable<String> source = Observable.just("Alpha","Beta","Gamma");
source
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(
new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } }); }

其中只有subscribeOn(Schedulers.computation())對數據源source起作用,該source在Schedulers.computation()指定的線程發射數據。如果後面沒有使用observeOn(),操作符operator都會在Schedulers.computation()所指定的線程做數據變換。

2. observeOn()的作用

在Android開發中,我們經常面臨這樣的場景,在工作者線程中產生數據,在UI線程中更新相應的View,subscribeOn()指定了數據發射的線程,但我們更新UI的操作,不可能在發射數據的線程運行,這會造成ANR的問題。此時就必須通過observeOn()方法做線程的切換:

Observable<String> source = Observable.just("Alpha","Beta","Gamma");
        source.map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                Log.i("TAG", "call: " + Thread.currentThread().getName());
                return s.length();
            }
        }).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation()).
                subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(Integer integer) {
                    Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName());
            }
        });

上述代碼的運行結果如下:

TAG: call: RxComputationScheduler-1
TAG: onNext: on thread:RxNewThreadScheduler-1
TAG: call: RxComputationScheduler-1
TAG: onNext: on thread:RxNewThreadScheduler-1
TAG: call: RxComputationScheduler-1
TAG: onNext: on thread:RxNewThreadScheduler-1

可以看到,在observeOn()之前的操作,都運行在subscribeOn(Schedulers.computation())指定的線程,即RxComputationScheduler-1線程;而使用了observeOn()之後,在它之後的操作都

運行在了observeOn(Schedulers.newThread())指定的線程。

所以,給出一個結論observeOn()只對其之後的操作起作用;observeOn()可以使用多次,每次使用對其之後的operator起作用,對之前的操作沒有影響。

技術分享

上圖很好地詮釋了ObserveOn的作用。

3. backpressure的問題

由於ObserveOn的作用,數據流在多個線程中不斷的傳輸,可能存在速度不匹配的情況。如下圖所示,當底部的數據流發射速度快於頂部數據流的處理速度,若產生異常,可能導致一部分數據未被頂部的subscriber處理。

技術分享

廢話太多,說不清楚,看下代碼吧:

Observable<String> source = Observable.just("Alpha","Beta","Gamma");
        source.map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                Log.i("TAG", "call: " + Thread.currentThread().getName());
                if (s.equals("Gamma"))
                    throw new RuntimeException();
                return s.length();
            }
        })
         .doOnError(new Action1<Throwable>() {
             @Override
             public void call(Throwable throwable) {
                  Log.i("TAG", "doOnError: " + Thread.currentThread().getName());
             }
         })
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.computation())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Log.i("TAG", "onError: " + "on thread:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Integer integer) {
                    Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName());
            }
        });
    }

看一下運行結果:

TAG: call: RxComputationScheduler-1
TAG: call: RxComputationScheduler-1
TAG: call: RxComputationScheduler-1
TAG: doOnError: RxComputationScheduler-1
TAG: onError: on thread:RxNewThreadScheduler-1

我們就可以看到,當發射的數據為Gamma時拋出異常,之前發射的數據"Alpha","Beta"還未被subsriber的onNext方法處理,這就是backpressure問題。

4. onErrorResumeNext

onErrorResumeNext是錯誤恢復處理方法,當我們數據鏈中某個操作符拋出異常,此時會中斷整個數據鏈,但我們想嘗試恢復一下,這時可以使用

onErrorResumeNext。比如Android在過於頻繁登錄時,系統會彈出一個dialog(彈窗),讓用戶輸入驗證碼,該邏輯就可以放在onErrorResumeNext中處理。

我們先看一段代碼:

Observable<String> source = Observable.just("Alpha","Beta","Gamma");
        source.map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                Log.i("TAG", "call: " + Thread.currentThread().getName());
                if (s.equals("Gamma"))
                    throw new RuntimeException();
                return s.length();
            }
        })
                .observeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.computation())
                .onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> call(Throwable throwable) {
                return Observable.just(1000).map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        Log.i("TAG", "call:onErrorResumeNext: "  +  Thread.currentThread().getName());
                        return integer;
                    }
                })
                .subscribeOn(Schedulers.computation());
            }
        })
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("TAG", "onError: " + "on thread:" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName());
                    }
                });

看上段代碼中onErrorResumeNext的部分,為什麽在onErrorResumeNext可以再次使用subscribeOn(),我的猜測(並沒有看源碼)可能是該段代碼產生了新的數據源,所以可以使用subsribeOn()指定數據源發射數據的線程。

它的運行結果:

TAG: call: RxComputationScheduler-1
TAG: call: RxComputationScheduler-1
TAG: call: RxComputationScheduler-1
TAG: onNext: on thread:RxNewThreadScheduler-1
TAG: onNext: on thread:RxNewThreadScheduler-1
TAG: call:onErrorResumeNext: RxComputationScheduler-2
TAG: onNext: on thread:RxComputationScheduler-2

看運行結果在onErrorResumeNext方法中使用了subscribeOn(),線程切換到了RxComputationScheduler-2,在之後沒有observeOn的情況下,最後一個onNext也運行在了RxComputationScheduler-2。很神奇!!!!!

Rxjava observeOn()和subscribeOn()初探