1. 程式人生 > >RxJava2:observeOn和subscribeOn的使用

RxJava2:observeOn和subscribeOn的使用

RxJava的好處大家都知道,能使程式碼邏輯結構看起來更清晰,當需要進行前後臺處理的時候,一般會進行observeOn和subscribeOn的呼叫,然而這2個方法的呼叫沒有那麼簡單:
observeOn:設定Observer觀察者在什麼執行緒執行;
subscribeOn:設定Observable被觀察者在什麼執行緒執行;

以上是最基本的使用,但是在使用的時候,呼叫的順序和次數都會有影響:
subscribeOn: subscribeOn 作用於該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符
observeOn:

 observeOn除了設定Observer觀察者在什麼執行緒執行,還將影響後面的onNext,map….的執行執行緒.

上例項:

 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log
.d(TAG, "Observable thread is : " + Thread.currentThread().getName()); Log.d(TAG, "emit 1"); emitter.onNext(1); } }); //該類只接收next發出的事件 Consumer<Integer> consumer = new Consumer<Integer>() { @Override public
void accept(Integer integer) throws Exception { Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName()); Log.d(TAG, "onNext: " + integer); } }; observable .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(10000); Log.d(TAG, "a thread is: " + Thread.currentThread().getName()); Log.d(TAG, "doOnNext a: " + integer); } }) // .flatMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception { Thread.sleep(10000); Log.d(TAG, "c thread is: " + Thread.currentThread().getName()); Log.d(TAG, "doOnNext c: " + integer); return Observable.just(integer); } }) .subscribeOn(Schedulers.newThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(10000); Log.d(TAG, "b thread is : " + Thread.currentThread().getName()); Log.d(TAG, "doOnNext b: " + integer); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer);

如果把observeOn的方法放到前面,可以看看各個方法所在的執行緒

 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
                Log.d(TAG, "emit 1");
                emitter.onNext(1);

            }
        });
        //該類只接收next發出的事件
        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext: " + integer);
            }
        };

        observable
           .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Thread.sleep(10000);
                    Log.d(TAG, "a thread is: " + Thread.currentThread().getName());
                    Log.d(TAG, "doOnNext a: " + integer);
                }
            })
//
           .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
                        Thread.sleep(10000);
                        Log.d(TAG, "c thread is: " + Thread.currentThread().getName());
                        Log.d(TAG, "doOnNext c: " + integer);
                        return Observable.just(integer);
                    }
                })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Thread.sleep(10000);
                    Log.d(TAG, "b thread is : " + Thread.currentThread().getName());
                    Log.d(TAG, "doOnNext b: " + integer);
                }
            })

            .subscribe(consumer);