Rxjava observeOn()和subscribeOn()初探
Rxjava這麽強大的類庫怎麽可能沒有多線程切換呢?
其中observeOn()與subscribeOn()就是實現這樣的作用的。本文主要講解observeOn()與subscribeOn()的用法,不去探究其中的原理。
0. 默認情況
在默認情況下,其不做任何線程處理,Observable和Observer處於同一線程,沒有做任何線程切換,依次執行,如下圖所示:
可以寫一個demo測試之:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.subscribe(newSubscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "Received " + integer + " on thread:" + Thread.currentThread().getName()); } });
1. subscribeOn()的作用
該方法是指明數據產生的線程,即Observable發射數據所在的線程,如果之後不做任何處理,操作符operator(如map,flatmap等)也在subscribeOn指定的線程做數據處理。
多次使用subscribeOn()並不能頻繁地切換線程,只有距離數據源最近的一個subscribeOn()唯一確定數據源發射數據的線程。如代碼所示:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } }); }
其中只有subscribeOn(Schedulers.computation())對數據源source起作用,該source在Schedulers.computation()指定的線程發射數據。如果後面沒有使用observeOn(),操作符operator都會在Schedulers.computation()所指定的線程做數據變換。
2. observeOn()的作用
在Android開發中,我們經常面臨這樣的場景,在工作者線程中產生數據,在UI線程中更新相應的View,subscribeOn()指定了數據發射的線程,但我們更新UI的操作,不可能在發射數據的線程運行,這會造成ANR的問題。此時就必須通過observeOn()方法做線程的切換:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.i("TAG", "call: " + Thread.currentThread().getName()); return s.length(); } }).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation()). subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } });
上述代碼的運行結果如下:
TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1
可以看到,在observeOn()之前的操作,都運行在subscribeOn(Schedulers.computation())指定的線程,即RxComputationScheduler-1線程;而使用了observeOn()之後,在它之後的操作都
運行在了observeOn(Schedulers.newThread())指定的線程。
所以,給出一個結論observeOn()只對其之後的操作起作用;observeOn()可以使用多次,每次使用對其之後的operator起作用,對之前的操作沒有影響。
上圖很好地詮釋了ObserveOn的作用。
3. backpressure的問題
由於ObserveOn的作用,數據流在多個線程中不斷的傳輸,可能存在速度不匹配的情況。如下圖所示,當底部的數據流發射速度快於頂部數據流的處理速度,若產生異常,可能導致一部分數據未被頂部的subscriber處理。
廢話太多,說不清楚,看下代碼吧:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.i("TAG", "call: " + Thread.currentThread().getName()); if (s.equals("Gamma")) throw new RuntimeException(); return s.length(); } }) .doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.i("TAG", "doOnError: " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.computation()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.i("TAG", "onError: " + "on thread:" + Thread.currentThread().getName()); } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } }); }
看一下運行結果:
TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: doOnError: RxComputationScheduler-1 TAG: onError: on thread:RxNewThreadScheduler-1
我們就可以看到,當發射的數據為Gamma時拋出異常,之前發射的數據"Alpha","Beta"還未被subsriber的onNext方法處理,這就是backpressure問題。
4. onErrorResumeNext
onErrorResumeNext是錯誤恢復處理方法,當我們數據鏈中某個操作符拋出異常,此時會中斷整個數據鏈,但我們想嘗試恢復一下,這時可以使用
onErrorResumeNext。比如Android在過於頻繁登錄時,系統會彈出一個dialog(彈窗),讓用戶輸入驗證碼,該邏輯就可以放在onErrorResumeNext中處理。
我們先看一段代碼:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.i("TAG", "call: " + Thread.currentThread().getName()); if (s.equals("Gamma")) throw new RuntimeException(); return s.length(); } }) .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.computation()) .onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { return Observable.just(1000).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { Log.i("TAG", "call:onErrorResumeNext: " + Thread.currentThread().getName()); return integer; } }) .subscribeOn(Schedulers.computation()); } }) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.i("TAG", "onError: " + "on thread:" + Thread.currentThread().getName()); } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } });
看上段代碼中onErrorResumeNext的部分,為什麽在onErrorResumeNext可以再次使用subscribeOn(),我的猜測(並沒有看源碼)可能是該段代碼產生了新的數據源,所以可以使用subsribeOn()指定數據源發射數據的線程。
它的運行結果:
TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: call:onErrorResumeNext: RxComputationScheduler-2 TAG: onNext: on thread:RxComputationScheduler-2
看運行結果在onErrorResumeNext方法中使用了subscribeOn(),線程切換到了RxComputationScheduler-2,在之後沒有observeOn的情況下,最後一個onNext也運行在了RxComputationScheduler-2。很神奇!!!!!
Rxjava observeOn()和subscribeOn()初探