1. 程式人生 > >Rxjava2原始碼學習(二)

Rxjava2原始碼學習(二)

上一篇中看了Rxjava的通過鏈式呼叫來實現資料的傳輸,這一篇接著看加上執行緒切換之後,整個流程是怎樣的。

還是以下面這個流程為例:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("資料");
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new
Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override
public void onComplete() { } });

通過上一篇我們可以知道,Observable.create()之後返回的是一個ObservableCreate物件,而ObservableCreate物件是繼承自Observable物件。所以create之後,實際上是Observable.subscribeOn(Schedulers.io()),同樣的它返回一個繼承了Observable的物件。關於鏈式呼叫這一塊這裡就不再做詳細的說明了。

這裡的重點是包括執行緒切換在內的整個的流程是怎樣的,以上面的程式碼塊為例,今天我們要搞懂,資料在傳輸過程中到底經歷了什麼。
這裡我們先記錄一下每個操作返回的物件

操作 返回的Observable物件
Observable.create() ObservableCreate物件
Observable.subscribeOn(Schedulers.io()) ObservableSubscribeOn物件
Observable.observeOn(AndroidSchedulers.mainThread()) ObservableObserveOn物件
Observable. subscribe(Observer物件)

ObservableCreate、ObservableSubscribeOn、ObservableObserveOn都是繼承自Observable,並且實現了subscribeActual(Observer observer) 方法(呼叫Observable.subscribe(Observer) 最終都會呼叫該方法)
通過斷點除錯,我們可以發現上面的這幾個物件最終都呼叫了各自的subscribeActual,而且呼叫的順序是從下到上的(它們的建立順序是從上到下的)。這裡我就直接上圖了,貼上我自己的理解:
這裡寫圖片描述

對照上圖我們具體來看一下,當走到Observable. subscribe(Observer物件)這一步的時候,實際上是D:ObservableObserveOn.subscribe(Observer物件),而且會把上一層的Observable(這裡是C:ObservableSubscribeOn物件)作為source(建構函式裡的引數)傳入D中,之前說過,呼叫subscribe方法,都會呼叫各自的subscribeActual方法,
在ObservableObserveOn的subscribeActual方法中:

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

如果在同一個執行緒,就可以直接訂閱observer,如果不在同一個執行緒,就需要用之前傳入的指定執行緒,並將訂閱的操作指定在該執行緒中。這裡的source就是作為建構函式引數傳入D的C。這裡相當於又呼叫了C.subscribe,經歷的流程和上面的過程差不多。
這裡的核心思想都是上一個Observable作為下一個的Observable的一部分,當下層的Observable開始呼叫subscribe方法之後,其實是會呼叫上一層的Observable的subscribe方法,經過這樣一層一層的遞迴,最終會到最初create這裡。

同理,一開始訂閱的時候,傳入的是一個Observer物件的引用,經過一層一層的將這個物件傳入或者封裝之後再傳入。這樣就實現了資料上下游的連通。

最後看一下執行緒切換
subscribeOn(Schedulers.io())
最終進入的是ObservableSubscribeOn物件中

public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

這邊scheduler是之前傳入的Schedulers.io(),新建了一個訂閱的執行緒:

final class SubscribeTask implements Runnable {
   private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

這裡另起了一個執行緒執行訂閱操作

再來看看observeOn(AndroidSchedulers.mainThread())

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

這裡如果是同一個執行緒,就不用再切換了,如果不是同一個執行緒,需要建立一個新的。

到這裡Rxjava基本流程就看的差不多了,其他的就算換一些操作符,本質上的流程還是一樣的(Flowable可能有點區別)