1. 程式人生 > >RxJava中的doOnSubscribe默認運行線程分析

RxJava中的doOnSubscribe默認運行線程分析

on() orm 導致 word ann ogre ios csdn 處理

假設你對RxJava1.x還不是了解,能夠參考以下文章。

1. RxJava使用介紹 【視頻教程】
2. RxJava操作符
? Creating Observables(Observable的創建操作符) 【視頻教程】
? Transforming Observables(Observable的轉換操作符) 【視頻教程】
? Filtering Observables(Observable的過濾操作符) 【視頻教程】
? Combining Observables(Observable的組合操作符) 【視頻教程】
? Error Handling Operators(Observable的錯誤處理操作符) 【視頻教程】
? Observable Utility Operators(Observable的輔助性操作符) 【視頻教程】
? Conditional and Boolean Operators(Observable的條件和布爾操作符) 【視頻教程】
? Mathematical and Aggregate Operators(Observable數學運算及聚合操作符) 【視頻教程】
? 其它如observable.toList()、observable.connect()、observable.publish()等等。 【視頻教程】
3. RxJava Observer與Subcriber的關系 【視頻教程】
4. RxJava線程控制(Scheduler) 【視頻教程】
5. RxJava 並發之數據流發射太快怎樣辦(背壓(Backpressure)) 【視頻教程】


前言

在有心課堂《RxJava之旅》中有學員留言:map和doOnSubscribe默認調度器是IO調度器,這裏說錯了吧?

以下我們分析下。

在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 能夠用作流程開始前的初始化。然而 onStart() 由於在 subscribe() 發生時就被調用了,因此不能指定線程。而是僅僅能運行在 subscribe() 被調用時的線程。這就導致假設 onStart() 中含有對線程有要求的代碼(比如在界面上顯示一個 ProgressBar,這必須在主線程運行),將會有線程非法的風險,由於有時你無法預測 subscribe() 將會在什麽線程運行。

而與 Subscriber.onStart() 相相應的。有一個方法 Observable.doOnSubscribe() 。

它和 Subscriber.onStart() 相同是在 subscribe() 調用後並且在事件發送前運行。但差別在於它能夠指定線程。

默認情況下, doOnSubscribe() 運行在 subscribe() 發生的線程;而假設在 doOnSubscribe() 之後有 subscribeOn() 的話,它將運行在離它近期的 subscribeOn() 所指定的線程。

演示樣例代碼:

Observable.create(onSubscribe)
    .subscribeOn
(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 須要在主線程運行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);

subscribeOn 作用於該操作符之前的 Observable 的創建操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的創建操作符總是被其之後近期的 subscribeOn 控制 。沒看懂不要緊,看以下代碼你就懂了。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<?

super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("00doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.newThread()) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { System.out.println("map1在線程" + Thread.currentThread().getName() + "中"); return integer + ""; } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("11doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .map(new Func1<String, String>() { @Override public String call(String s) { System.out.println("map2在線程" + Thread.currentThread().getName() + "中"); return s + "1"; } }) .doOnSubscribe(new Action0() { @Override public void call() { System.out.println("22doOnSubscribe在線程" + Thread.currentThread().getName() + "中"); } }) .subscribeOn(Schedulers.newThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext在線程" + Thread.currentThread().getName() + "中"); } });

運行結果例如以下:

22doOnSubscribe在線程RxNewThreadScheduler-111doOnSubscribe在線程RxIoScheduler-300doOnSubscribe在線程RxNewThreadScheduler-2中
map1在線程RxNewThreadScheduler-2中
map2在線程RxIoScheduler-2中
onNext在線程RxIoScheduler-2

依據代碼和運行結果我總結例如以下:

  1. doOnSubscribe()與onStart()相似,均在代碼調用時就會回調。但doOnSubscribe()能夠通過subscribeOn()操作符改變運行的線程且越在後面運行越早;
  2. doOnSubscribe()後面緊跟subscribeOn(),那麽doOnSubscribe()將於subscribeOn()指定的線程保持一致。假設doOnSubscribe()在subscribeOn()之後,他的運行線程得再看情況分析;
  3. doOnSubscribe()假設在observeOn()後(註意:observeon()後沒有緊接著再調用subcribeon()方法)。那麽doOnSubscribe的運行線程就是main線程,與observeon()指定的線程沒有關系。
  4. 假設在observeOn()之前沒有調用過subcribeOn()方法,observeOn()之後subscribe面()方法之前調用subcribeOn()方法,那麽他會改變整個代碼流程中全部調用doOnSubscribe()方法所在的線程。同一時候也會改變observeOn()方法之前全部操作符所在的線程(有個重要前提:不滿足第2點的條件,也就是doOnSubscribe()後面沒有調用subscribeOn()方法)。
  5. 假設在observeOn()前後都沒有調用過subcribeOn()方法,那麽整個代碼流程中的doOnSubscribe()運行在main線程,與observeOn()指定的線程無關。同一時候observeOn()之前的操作符也將運行在main線程,observeOn()之後的操作符與observeOn()指定的線程保持一致。

今天就分析到這裏,假設有問題請大家反饋交流。

RxJava中的doOnSubscribe默認運行線程分析