1. 程式人生 > >RxJava之三—— observeOn()與subscribeOn()的詳解

RxJava之三—— observeOn()與subscribeOn()的詳解

你也可以檢視我的其他同類文章,也會讓你有一定的收貨!

為什麼多次呼叫subscribeOn()卻只有第一個起作用? 
為什麼多次呼叫observeOn()卻可以切換到不同執行緒 
observeOn()後能不能再次呼叫subscribeOn()?

如果你有這些疑問,那接下來的內容必定能解決你心頭的疑惑

subscribeOn()和observeOn()的區別

subscribeOn()和observeOn()都是用來切換執行緒用的

  • subscribeOn()改變呼叫它之前程式碼的執行緒
  • observeOn()改變呼叫它之後程式碼的執行緒

一、subscribeOn()

在講解他的原理之前,先來一個簡單的例子,有個感性認識,學起來更輕鬆

先說結論:subscribeOn 作用於該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的建立操作符總是被其之後最近的 subscribeOn 控制 。沒看懂不要緊,看下面程式碼和圖你就懂了。

這裡寫圖片描述

Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void
call(Subscriber<? super String> subscriber) { threadInfo("OnSubscribe.call()"); subscriber.onNext("RxJava"); } }) .subscribeOn(getNamedScheduler("create之後的subscribeOn")) .doOnSubscribe(() -> threadInfo(".doOnSubscribe()-1"
))
.subscribeOn(getNamedScheduler("doOnSubscribe1之後的subscribeOn")) .doOnSubscribe(() -> threadInfo(".doOnSubscribe()-2")) .subscribe(s -> { threadInfo(".onNext()"); System.out.println(s + "-onNext"); });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

結果如下:

.doOnSubscribe()-2 => main
.doOnSubscribe()-1 => doOnSubscribe1之後的subscribeOn
OnSubscribe.call() => create之後的subscribeOn
.onNext() => create之後的subscribeOn
RxJava-onNext
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

3號框中的.doOnSubscribe(() -> threadInfo(“.doOnSubscribe()-2”)) 的之後由於沒有subscribeOn操作符所以回撥到該段程式碼被呼叫的執行緒(即主執行緒)

由於 subscribe 之前 沒有 使用observeOn 指定Scheduler,所以.onNext()的執行緒是和OnSubscribe.call()使用相同的Scheduler 。

下面通過原始碼來分析一下:

1、示例程式碼:

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("a");
                        subscriber.onNext("b");

                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())

                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String integer) {
                        System.out.println(integer);
                    }
                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

執行如下:

a
b
  • 1
  • 2
  • 1
  • 2

2、subscribeOn()原始碼

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

很明顯,會走if之外的方法。

在這裡我們可以看到,又建立了一個OperatorSubscribeOn物件,但建立時傳入的引數為OperatorSubscribeOn(this,scheduler),我們看一下此物件以及其對應的構造方法

3、create()的原始碼:

public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

我們看到這個方法,使用OperatorSubscribeOn這個類,來建立一個新的Observable,那就把它叫做Observable_2,把原來的Observable叫做Observable_1

4、OperatorSubscribeOn類的原始碼:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  1. OperatorSubscribeOn類implements 了Onsubscribe介面,並實現call()方法
  2. OperatorSubscribeOn的構造方法, 
    • 儲存了Observable物件,就是呼叫了subscribeOn()方法的Observable物件
    • 並儲存了Scheduler物件。

這裡做個總結。

把Observable.create()建立的稱之為Observable_1,OnSubscribe_1。 
把subscribeOn()建立的稱之為Observable_2,OnSubscribe_2

  • Observable_1, OnSubscribe_1是由示例程式碼的第1、2行建立的

  • OperatorSubscribeOn類implements Onsubscribe類,所以可以當做Onsubscribe類使用。(OnSubscribe_2)

  • 並且OnSubscribe_2中儲存了Observable_1的應用,即source。(在OperatorSubscribeOn原始碼的第8行)

  • subscribeOn()原始碼的倒數第二行,create(new OperatorSubscribeOn<T>(this, scheduler))返回新建立的Observable_2物件。

4.1、分析call()方法。

  • inner.schedule()改變了執行緒,此時Action的call()在我們指定的執行緒中執行。
  • 示例程式碼的Subscriber被包裝了一層,賦給物件S(Subscriber_2)。見上面程式碼21行。
  • source.unsafeSubscribe(s);, 
    • 注意:source是Observable_1物件,這裡的s就是Subscriber_2
    • 因為呼叫過subscribeOn(Schedulers.io())後,返回Observable_2物件,所以示例程式碼第13行程式碼的subscribe()就是Observable_2.subscribe(),也就是執行OnSubscribe_2的call()方法(即OperatorSubscribeOn類的原始碼的第12行)。

4.2 看一下source.unsafeSubscribe(s);(第65行)程式碼都做了什麼

這裡的source就是Observable_1,s是Subscriber_2

unsafeSubscribe()原始碼:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

關鍵程式碼:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);
  • 1
  • 1

該方法即呼叫了OnSubscribe_1.call()方法。注意,此時的call()方法在我們指定的執行緒中執行。那麼就起到了改變執行緒的作用。

對於以上執行緒,我們可以總結,其有如下流程:

  • Observable.create() : 建立了Observable_1和OnSubscribe_1;

  • subscribeOn(): 建立Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時OperatorSubscribeOn儲存了Observable_1的引用。

  • 示例程式碼中的subscribe(Observer) 實際上就是呼叫Observable_2.subscribe(Observer):

    • 呼叫OperatorSubscribeOn的call()。call()改變了執行緒的執行,並且呼叫了Observable_1.unsafeSubscribe(s);
    • Observable_1.unsafeSubscribe(s);,該方法的實現中呼叫了OnSubscribe_1的call()。

這樣就實現了在指定執行緒執行OnSubscribe的call()函式,無論我們的subscribeOn()放在哪裡,他改變的是subscribe()的過程,而不是onNext()的過程。

那麼如果有多個subscribeOn(),那麼執行緒會怎樣執行呢。如果按照我們的邏輯,有以下程式

Observable.just("ss") 
                .subscribeOn(Schedulers.io())   // ----1---
                .subscribeOn(Schedulers.newThread()) //----2----
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {

                    }
                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

那麼,我們根據之前的原始碼分析其執行邏輯。

  • Observable.just(“ss”),建立Observable,OnSubscribe