1. 程式人生 > >RxJava observeOn()與subscribeOn()的關係

RxJava observeOn()與subscribeOn()的關係

RxJava系列教程:

observeOn和subscribeOn都是對observable的一種操作,區別就是subscribeOn改變了observable本身產生事件的schedule以及發出事件後相關處理事件的程式所在的scheduler,而obseveron僅僅是改變了對發出事件後相關處理事件的程式所在的scheduler。

或許你會問,這有多大的區別嗎?的確是有的,比如說產生observable事件是一件費時可能會卡主執行緒的操作(比如說獲取網路資料),那麼subscribeOn就是你的選擇,這樣可以避免卡住主執行緒。

兩者最主要的差別是影響的範圍不同,observeOn is more limited,但是卻是可以多次呼叫,多次改變不同的接受者所在的scheduler,在呼叫這個函式之後的observable造成影響。而subscribeOn則是一次性的,無論在什麼地方呼叫,總是從改變最原始的observable開始影響整個observable的處理。

subscribeOn()和observeOn()的區別

  • subscribeOn()主要改變的是訂閱的執行緒,即call()執行的執行緒;
  • observeOn()主要改變的是傳送的執行緒,即onNext()執行的執行緒。

subscribeOn

我們先看一個例子。

       Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call
(Subscriber<? super String> subscriber) { subscriber.onNext("a"); subscriber.onNext("b"); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .subscribe(new
Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String integer) { System.out.println(integer); } });

執行如下:

a
b

我們看一下subscribeOn()中,都幹了什麼

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

很明顯,會走if之外的方法。

在這裡我們可以看到,我們又建立了一個Observable物件,但建立時傳入的引數為OperatorSubscribeOn(this,scheduler),我們看一下此物件以及其對應的構造方法

OperatorSubscribeOn程式碼:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

可以看到,OperatorSubscribeOn實現Onsubscribe,並且由其構造方法可知,他儲存了上一個Observable物件,並儲存了Scheduler物件。

這裡做個總結。

把Observable.create()建立的稱之為Observable_1,OnSubscribe_1。把subscribeOn()建立的稱之為Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。

那麼,前兩步就是建立了兩個的observable,和OnSubscribe,並且OnSubscribe_2中儲存了Observable_1的應用,即source。

呼叫Observable_2.subscribe()方法會呼叫OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。

下面分析下call()方法。

  • inner.schedule()改變了執行緒,此時Action的call()在我們指定的執行緒中執行。
  • Subscriber被包裝了一層。
  • source.unsafeSubscribe(s);,注意source是Observable_1物件。

unsafeSubscribe方法程式碼:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

程式碼很多,關鍵程式碼:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);

該方法即呼叫了OnSubscribe_1.call()方法。注意,此時的call()方法在我們指定的執行緒中執行。那麼就起到了改變執行緒的作用。

對於以上執行緒,我們可以總結,其有如下流程:

  • Observable.create() : 建立了Observable_1和OnSubscribe_1;
  • subscribeOn(): 建立Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時OperatorSubscribeOn儲存了Observable_1的引用。
  • observable_2.subscribe(Observer):
    • 呼叫OperatorSubscribeOn的call()。call()改變了執行緒的執行,並且呼叫了Observable_1.unsafeSubscribe(s);
    • Observable_1.unsafeSubscribe(s);,該方法的實現中呼叫了OnSubscribe_1的call()。
      從這個可以瞭解,無論我們的subscribeOn()放在哪裡,他改變的是subscribe()的過程,而不是onNext()的過程。

那麼如果有多個subscribeOn(),那麼執行緒會怎樣執行呢。如果按照我們的邏輯,有以下程式

Observable.just("ss") 
                .subscribeOn(Schedulers.io())   // ----1---
                .subscribeOn(Schedulers.newThread()) //----2----
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {

                    }
                });

那麼,我們根據之前的原始碼分析其執行邏輯。

  • Observable.just(“ss”),建立Observable_1,OnSubscribe_1
  • Observable_1.subscribeOn(Schedulers.io()):建立observable_2,OperatorSubscribeOn_2並儲存Observable_1的引用。
  • observable_2.subscribeOn(Schedulers.newThread()):建立Observable_3,OperatorSubscribeOn_3並儲存Observable_2的引用。
  • Observable_3.subscribe():
    • 呼叫OperatorSubscribeOn_3.call(),改變執行緒為Schedulers.newThread()。
    • 呼叫OperatorSubscribeOn_2.call(),改變執行緒為Schedulers.io()。
    • 呼叫OnSubscribe_1.call(),此時call()執行在Schedulers.io()。
      根據以上邏輯分析,會按照1的執行緒進行執行。

subscribeOn如何工作,關鍵程式碼其實就是一行程式碼:

source.unsafeSubscribe(s);

注意它所在的位置,是在worker的call裡面,說白了,就是把source.subscribe這一行呼叫放在指定的執行緒裡,那麼總結起來的結論就是:

subscribeOn的呼叫,改變了呼叫前序列所執行的執行緒。

observeOn

看一下observeOn()原始碼:

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
    }

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

這裡引出了新的操作符lift

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

在lift()中,有如下關鍵程式碼:

Subscriber<? super T> st = hook.onLift(operator).call(o);

OperatorObserveOn.call()核心程式碼:

 @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

我們看到其返回了ObserveOnSubscriber< T>,注意:此時只調用了call()方法,但call()方法中並沒有改變執行緒的操作,此時為subscribe()過程。

我們直奔重點,因為,我們瞭解到其改變的是onNext()過程,那麼我們肯定要看一下ObserveOnSubscriber.onNext()找找在哪改變執行緒

@Override
public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

這裡做了兩件事,首先把結果快取到一個佇列裡,然後呼叫schedule啟動傳入的worker

我們這裡需要注意下:

在呼叫observeOn前的序列,把結果傳入到onNext就是它的工作,它並不關心後續的流程,所以工作就到這裡就結束了,剩下的交給ObserveOnSubscriber繼續。

onNext方法最後呼叫了schedule(),從方法名可以看到,其肯定是改變執行緒用的,並且該方法經過一番迴圈之後,呼叫了該類的call()方法。

protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }

recursiveScheduler 就是之前我們傳入的Scheduler,我們一般會在observeOn傳入AndroidScheluders.mainThread()。

scheduler中呼叫的call()方法

        // only execute this from schedule()
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;

            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;

            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)

            for (;;) {
                long requestAmount = requested.get();

                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;

                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    localChild.onNext(localOn.getValue(v));

                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                    }
                }

                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }

                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
        }

call()中有localChild.onNext(localOn.getValue(v));呼叫。

在Scheduler啟動後, 我們在Observable.subscribe(a)傳入的a就是這裡的child, 我們看到,在call中終於呼叫了它的onNext方法,把真正的結果傳了出去,但是在這裡,我們是工作在observeOn的執行緒上的。

總結起來的結論就是:

  1. observeOn 對呼叫之前的序列默不關心,也不會要求之前的序列執行在指定的執行緒上
  2. observeOn 對之前的序列產生的結果先快取起來,然後再在指定的執行緒上,推送給最終的subscriber

observeOn改變的是onNext()呼叫

subcribeOn和observeOn 對比分析

Observable
.map                    // 操作1
.flatMap                // 操作2
.subscribeOn(io)
.map                    //操作3
.flatMap                //操作4
.observeOn(main)
.map                    //操作5
.flatMap                //操作6
.subscribeOn(io)        //!!特別注意
.subscribe(handleData)

有如上邏輯,則我們對其執行進行分析。

首先,我們需要先明白其內部執行的邏輯。

在呼叫subscribe之後,邏輯開始執行。分別呼叫每一步OnSubscribe.call(),注意:自下往上。當執行到最上,即Observable.create()後,我們在其中呼叫了subscriber.onNext(),於是程式開始自上往下執行每一個物件的subscriber.onNext()方法。最終,直到subscribe()中的回撥。

其次,從上面對subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。

那麼對於以上的邏輯,我們可以得出如下結論:

  • 操作1,2,3,4在io執行緒中,因為在如果沒有observeOn()影響,他們的回撥操作預設在訂閱的執行緒中。而我們的訂閱執行緒在subscribeOn(io)發生了改變。注意他們執行的先後順序。
  • 操作5,6在main執行緒中執行。因為observeOn()改變了onNext().
  • 特別注意那一個邏輯沒起到作用

再簡單點總結就是

  1. subscribeOn的呼叫切換之前的執行緒。
  2. observeOn的呼叫切換之後的執行緒。
  3. observeOn之後,不可再呼叫subscribeOn 切換執行緒

複雜情況

我們經常多次使用subscribeOn切換執行緒,那麼以後是否可以組合observeOn和subscribeOn達到自由切換的目的呢?

組合是可以的,但是他們的執行順序是有條件的,如果仔細分析的話,可以知道observeOn呼叫之後,再呼叫subscribeOn是無效的,原因是什麼?

因為subscribeOn改變的是subscribe這句呼叫所在的執行緒,大多數情況,產生內容和消費內容是在同一執行緒的,所以改變了產生內容所在的執行緒,就改變了消費內容所在的執行緒。

經過上面的闡述,我們知道,observeOn的工作原理是把消費結果先快取,再切換到新執行緒上讓原始消費者消費,它和生產者是沒有一點關係的,就算subscribeOn呼叫了,也只是改變observeOn這個消費者所在的執行緒,和OperatorObserveOn中儲存的原始消費者一點關係都沒有,它還是由observeOn控制。

@扔物線 大神給的總結:

  1. 下面提到的“操作”包括產生事件、用操作符操作事件以及最終的通過 subscriber 消費事件;
  2. 只有第一subscribeOn() 起作用(所以多個 subscribeOn() 無意義);
  3. 這個 subscribeOn() 控制從流程開始的第一個操作,直到遇到第一個 observeOn();
  4. observeOn() 可以使用多次,每個 observeOn() 將導致一次執行緒切換(),這次切換開始於這次 observeOn() 的下一個操作;
  5. 不論是 subscribeOn() 還是 observeOn(),每次執行緒切換如果不受到下一個 observeOn() 的干預,執行緒將不再改變,不會自動切換到其他執行緒。

最後需要注意的是:

預設情況下, doOnSubscribe() 執行在 subscribe() 發生的執行緒;而如果在 doOnSubscribe() 之後有subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的執行緒。
想要了解更多請參考部落格:RxJava中的doOnSubscribe預設執行執行緒分析

這個(學習總結)部落格花了3個小時多,看原始碼真的很頭疼,希望以後能有所提高。