Rxjava2原始碼學習(二)
上一篇中看了Rxjava的通過鏈式呼叫來實現資料的傳輸,這一篇接著看加上執行緒切換之後,整個流程是怎樣的。
還是以下面這個流程為例:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("資料");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
通過上一篇我們可以知道,Observable.create()之後返回的是一個ObservableCreate物件,而ObservableCreate物件是繼承自Observable物件。所以create之後,實際上是Observable.subscribeOn(Schedulers.io()),同樣的它返回一個繼承了Observable的物件。關於鏈式呼叫這一塊這裡就不再做詳細的說明了。
這裡的重點是包括執行緒切換在內的整個的流程是怎樣的,以上面的程式碼塊為例,今天我們要搞懂,資料在傳輸過程中到底經歷了什麼。
這裡我們先記錄一下每個操作返回的物件
操作 | 返回的Observable物件 |
---|---|
Observable.create() | ObservableCreate物件 |
Observable.subscribeOn(Schedulers.io()) | ObservableSubscribeOn物件 |
Observable.observeOn(AndroidSchedulers.mainThread()) | ObservableObserveOn物件 |
Observable. subscribe(Observer物件) |
ObservableCreate、ObservableSubscribeOn、ObservableObserveOn都是繼承自Observable,並且實現了subscribeActual(Observer observer) 方法(呼叫Observable.subscribe(Observer) 最終都會呼叫該方法)
通過斷點除錯,我們可以發現上面的這幾個物件最終都呼叫了各自的subscribeActual,而且呼叫的順序是從下到上的(它們的建立順序是從上到下的)。這裡我就直接上圖了,貼上我自己的理解:
對照上圖我們具體來看一下,當走到Observable. subscribe(Observer物件)這一步的時候,實際上是D:ObservableObserveOn.subscribe(Observer物件),而且會把上一層的Observable(這裡是C:ObservableSubscribeOn物件)作為source(建構函式裡的引數)傳入D中,之前說過,呼叫subscribe方法,都會呼叫各自的subscribeActual方法,
在ObservableObserveOn的subscribeActual方法中:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
如果在同一個執行緒,就可以直接訂閱observer,如果不在同一個執行緒,就需要用之前傳入的指定執行緒,並將訂閱的操作指定在該執行緒中。這裡的source就是作為建構函式引數傳入D的C。這裡相當於又呼叫了C.subscribe,經歷的流程和上面的過程差不多。
這裡的核心思想都是上一個Observable作為下一個的Observable的一部分,當下層的Observable開始呼叫subscribe方法之後,其實是會呼叫上一層的Observable的subscribe方法,經過這樣一層一層的遞迴,最終會到最初create這裡。
同理,一開始訂閱的時候,傳入的是一個Observer物件的引用,經過一層一層的將這個物件傳入或者封裝之後再傳入。這樣就實現了資料上下游的連通。
最後看一下執行緒切換
subscribeOn(Schedulers.io())
最終進入的是ObservableSubscribeOn物件中
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
這邊scheduler是之前傳入的Schedulers.io(),新建了一個訂閱的執行緒:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
這裡另起了一個執行緒執行訂閱操作
再來看看observeOn(AndroidSchedulers.mainThread())
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
這裡如果是同一個執行緒,就不用再切換了,如果不是同一個執行緒,需要建立一個新的。
到這裡Rxjava基本流程就看的差不多了,其他的就算換一些操作符,本質上的流程還是一樣的(Flowable可能有點區別)