1. 程式人生 > >RxJava 2.x 之圖解建立、訂閱、發射流程

RxJava 2.x 之圖解建立、訂閱、發射流程

從一個例子開始

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 3; i++) {
                    emitter.
onNext(i); } emitter.onComplete(); Log.d(TAG, "subscribe " + Thread.currentThread().getName()); } }).subscribeOn(Schedulers.newThread()) .map(new Function<Integer, String>() { @Override public
String apply(Integer value) throws Exception { Log.d(TAG, "apply " + Thread.currentThread().getName()); return "apply " + value; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith
(new ResourceObserver<String>() { @Override public void onNext(String value) { Log.d(TAG, "onNext " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete " + Thread.currentThread().getName()); } });

來看看輸出:

10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: create RxNewThreadScheduler-1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 0
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 2
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onComplete main

可以看到建立傳送轉換過程都在子執行緒中,而最後的回撥是在主執行緒中

整個過程筆者整理成一張圖,一步一步來跟進分析

建立過程

  • 第一步:通過create操作符建立了一個ObservableCreate型別的Observable,由於是基於匿名內部類建立的,因此持有的是實現了ObservableOnSubscribe介面的HomeActivity例項

  • 第二步:通過subscribeOn操作符建立了一個ObservableSubscribeOn型別的Observable,且其內部的source持有上個步驟的ObservableCreate例項

  • 第三步:通過map操作符建立了一個ObservableMap型別的Observable,且其內部持有上個步驟傳入的ObservableSubscribeOn例項

  • 第四步:通過observeOn操作符建立了一個ObservableObserveOn型別的Observable,且其內部持有上個步驟的ObservableMap例項

  • 第五步:通過subscribeWith方法完成訂閱,由於是基於匿名內部類建立的,因此傳入的實際上是實現了ResourceObserverHomeActivity例項

訂閱過程

上述的幾個步驟其實已經完成的基本的建立過程了,最後我們拿到的實際是ObservableObserveOn的例項,下面開始訂閱流程。

  • 第一步:subscribeWith方法,傳入的observer是實現了ResourceObserver介面HomeActivity例項,通過subscribeActual發起訂閱,內部實際呼叫的是source.subscribe方法,由於source持有的是上面傳入的ObservableMap例項,因此這一步驟實際呼叫的是,ObservableMap例項中的subscribe方法,傳入的引數就是ObserveOnObserver例項(構造引數主要是實現了ResourceObserver的例項即:HomeActivity)

  • 第二步:進入ObservableMap例項subscribe方法中,通過subscribeActual發起訂閱,實際呼叫的是source.subscribe方法,傳入的是MapObserver例項(構造引數為之前傳遞的ObserveOnObserver例項),由於source持有的是ObservableSubscribeOn的例項,因此最終呼叫的其實是ObservableSubscribeOn例項中的subscribe方法

  • 第三步:進入ObservableSubscribeOn例項subscribe方法中,通過subscribeActual發起訂閱,完成MapObserver例項對SubscribeOnObserver的訂閱,接著由NewThreadScheduler執行緒排程器完成對應的任務(該任務的執行是線上程中執行的),SubscribeTask實現了Runnable介面,最終會回撥run方法,執行source.subscribe方法,這裡的source持有的就是最開始的ObservableCreate例項

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //這裡的s就是上個步驟的MapObserver例項
        s.onSubscribe(parent);
        //這裡的scheduler就是我們最開始指定的Schedulers.newThread 即NewThreadScheduler執行緒排程器
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

  • 第四步:進入ObservableCreate例項subscribe方法中,通過subscribeActual發起訂閱,這裡的source持有的是HomeActivity例項,直接呼叫subscribe方法,傳入引數是構建的最頂層的發射器CreateEmitter例項

  • 第五步:上述的幾個過程實際已經完成了訂閱的過程,最後經過層層傳遞,持有的最頂層的是CreateEmitter例項,即我們最終的被觀察者

發射過程

上述的過程已經完成了訂閱過程,在最後訂閱完成之後,最終會通過source.subscribe方法,其實就是呼叫HomeActivity例項的subscribe方法,完成元素髮射

@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    for (int i = 0; i < 3; i++) {
        emitter.onNext(i);
    }
    emitter.onComplete();
    Log.d(TAG, "subscribe " + Thread.currentThread().getName());
}

我們在最頂層的被觀察者裡通過ObservableEmitter例項onNext方法完成元素的發射,最終又會通過一層一層的Observer轉發到最原始的實現了ResourceObserver介面觀察者中來

注意:

  • 這裡的被觀察者裡的所有發射過程實際上都是在NewThreadScheduler執行緒排程器分配的執行緒裡完成的
  • 發射的元素被傳遞到下層的ObservableObserveOn類中的ObserveOnObserver例項onNext方法,實際執行的是HandlerScheduler.HandlerWorkerschedule方法,最終就是通過我們持有的主執行緒的handler切換到主執行緒中

小結

整個建立過程訂閱過程發射過程看起來山路十八彎,但是如果你一步一步跟進檢視,會發現整個流程實際上是很清晰的,整個過程起點終點很明確,
而中間產生的一系列ObservableObserver你都可以看作是代理類,用來轉發訂閱以及最終的元素髮射