1. 程式人生 > >RxJava(十五)RxJava執行緒的自由切換

RxJava(十五)RxJava執行緒的自由切換

RxJava系列文章目錄導讀:

在Android使用RxJava的時候可能需要頻繁的進行執行緒的切換,如耗時操作放在子執行緒中執行,執行完後在主執行緒渲染介面。如下面示例程式碼:

deferObservable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //執行耗時任務
                return "task result";
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new
Action1<String>() { @Override public void call(String s) { //渲染View } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } });

這是最簡單的邏輯:子執行緒處理耗時任務,然後處理結果。
但是在實際的開發當中可能比這個更加複雜,比如有這樣的邏輯,先從本地載入資料(子執行緒),然後介面展示本地資料(主執行緒),接著載入線上資料(子執行緒),然後渲染(主執行緒)。這就需要頻繁的切換執行緒。

RxJava中通過subscribeOnobserveOn兩個操作符進行執行緒切換。subscribeOn()主要改變的是訂閱的執行緒,即call()執行的執行緒,observeOn()主要改變的是傳送的執行緒,即onNext()執行的執行緒。

為了實現上面自由切換的邏輯(子執行緒->主執行緒->子執行緒->->主執行緒)

deferObservable(new Callable<String>() { //defer observable
            @Override
            public String call() throws Exception {
                Log.d("RxThreadFragment", "defer " + Thread.currentThread().getName());
                return "task result";
            }
        })
        .observeOn(AndroidSchedulers.mainThread())//指定下面的 call 在主執行緒中執行
        .flatMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                Log.d("RxThreadFragment", "flatMap1 " + Thread.currentThread().getName());
                return Observable.just(s);
            }
        })
        .observeOn(Schedulers.io())//指定下面的 call  在子執行緒中執行
        .flatMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                Log.d("RxThreadFragment", "flatMap2 " + Thread.currentThread().getName());
                return Observable.just(s);
            }
        })
        .subscribeOn(Schedulers.io())//指定上面沒有指定所線上程的Observable在IO執行緒執行
        .observeOn(AndroidSchedulers.mainThread())//指定下面的 call  在主執行緒中執行
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                //etc
                Log.d("RxThreadFragment", s + Thread.currentThread().getName());
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                throwable.printStackTrace();
            }
        });

輸出結果:

defer RxIoScheduler-2
flatMap2 main
flatMap3 RxIoScheduler-3
task result main

從上面的程式碼可以看出,observeOn 指定該操作符下面相鄰的Observable 發射資料所在的執行緒。
subscribeOn 指定該操作符上面所有沒有指定執行緒的Observable 所在的執行緒。
例如在剛剛的例子中,subscribeOn操作符上面有3個observable(”defer”,”flatMap1”,”flatMap2”)
由於”flatMap1”,”flatMap2”已經分別被observeOn指定了schedule了,所以呢,該subscribeOn只會對”defer”有效。

下面我們再來看一個例子

final Observable<String> observable1 = RxUtils.deferObservable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Log.e("RxThreadFragment", "observable1 thread name : " + Thread.currentThread().getName());
                return "observable1 Schedulers.io()";
            }
        }).subscribeOn(Schedulers.io());//指定上面call方法所在的執行緒

        final Observable<String> observable2 = RxUtils.deferObservable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Log.e("RxThreadFragment", "observable2 thread name : " + Thread.currentThread().getName());
                return "observable2 AndroidSchedulers.mainThread()";
            }
        }).subscribeOn(Schedulers.io());//指定上面call方法所在的執行緒

        final Observable<String> observable3 = RxUtils.deferObservable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Log.e("RxThreadFragment", "observable3 thread name : " + Thread.currentThread().getName());
                return "observable3 Schedulers.io()";
            }
        }).subscribeOn(Schedulers.io());//指定上面call方法所在的執行緒

        RxUtils.deferObservable(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        Log.e("RxThreadFragment", "test thread name : " + Thread.currentThread().getName());
                        return "test thread";
                    }
                })
                .subscribeOn(Schedulers.io())//修改上面Observable call所在的執行緒
                .observeOn(AndroidSchedulers.mainThread())//修改下面flatMap1 call所在的執行緒
                .flatMap(new Func1<String, Observable<String>>() {//flatMap1
                    @Override
                    public Observable<String> call(String s) {
                        Log.e("RxThreadFragment", "flatMap1 thread name : " + Thread.currentThread().getName());
                        return observable1;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//修改下面flatMap2 call所在的執行緒
                .flatMap(new Func1<String, Observable<String>>() {//flatMap2
                    @Override
                    public Observable<String> call(String s) {
                        Log.e("RxThreadFragment", "flatMap2 thread name : " + Thread.currentThread().getName());
                        return observable2;
                    }
                })
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        Log.e("RxThreadFragment", "flatMap3 thread name : " + Thread.currentThread().getName());
                        return observable3;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//修改下面subscribe call所在的執行緒
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("RxThreadFragment", "subscribe Action1 thread name : " + Thread.currentThread().getName());
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });

輸出結果:

test thread name : RxIoScheduler-2              手動設定為後臺執行緒
flatMap1 thread name : main                     手動設定為主執行緒
observable1 thread name : RxIoScheduler-3       手動設定為後臺執行緒
flatMap2 thread name : main                     手動設定為主執行緒
observable2 thread name : RxIoScheduler-2       手動設定為後臺執行緒
flatMap3 thread name : RxIoScheduler-2          後臺執行緒
observable3 thread name : RxIoScheduler-3       手動設定為後臺執行緒
subscribe Action1 thread name : main            手動設定為主執行緒

從這個例子中可以看出,flatMap3沒有設定所在的執行緒,會預設使用上一個observable的執行緒模式, flatMap3 就是使用它上面的 observable2 的執行緒模式
如果上一個操作符不是flatMap,而是使用map(這樣就不是返回observable2),這個時候使用的就是map call所在的執行緒。

通過上面兩個例子我相信對RxJava執行緒切換應該差不多了,需要注意的是我們上面基本上都是一個subscribeOn和多個observeOn組合實現執行緒的自由切換的。

如果使用多個subscribeOn沒有意思,只有第一個subscribeOn有效。

原始碼