1. 程式人生 > >Android進階:五、RxJava2源碼解析 2

Android進階:五、RxJava2源碼解析 2

thread 解釋 聯系 bss extend type 自己 ima class

上一篇文章Android進階:四、RxJava2 源碼解析 1裏我們講到Rxjava2 從創建一個事件到事件被觀察的過程原理,這篇文章我們講Rxjava2中鏈式調用的原理。本文不講用法,仍然需要讀者熟悉Rxjava基本的用法。

一.Rxjava2 的基本用法

Rxjava是解決異步問題的,它的鏈式調用讓代碼看起來非常流暢優。現在我們帶上線程切換以及鏈式調用來看看。下面代碼是示例:

 Observable
                .create(new ObservableOnSubscribe<String>() {

                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("a");
                    }
                })
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return 1;
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Object o) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

我們創建一個事件(觀察者),想輸出一個字符串 "a"。這個事件發生在IO線程,結束也在IO線程,事件的狀態回調發生在主線程。示例的用法大家應該都能懂,我們主要討論這個鏈式的原理流程。為什麽這麽說呢?因為這個鏈式跟一般的鏈式不太一樣。

二.create方法

這個方法我們之前看過,返回一個ObservableCreate對象,ObservableCreate繼承自Observable,裏面的source存著我們創建的ObservableOnSubscribe匿名對象。

三.subscribeOn方法

這是Obserbvable的方法,先看源碼:

  public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

代碼結構跟create的差不多,在鉤子函數裏直接返回我們創建的對象ObservableSubscribeOn<T>(this, scheduler),並傳入當前的Observable也就是ObservableCreate對象。所以我們看一下這個類的代碼:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

這個類繼承自AbstractObservableWithUpstream類,構造函數的參數是ObservableSource,所以這裏我們需要介紹兩個類:

  • ObservableSource
    ObservableSource是一個接口,所有的Observable都實現了這個接口,它裏面只有:
    void subscribe(@NonNull Observer<? super T> observer);

    這一個方法。很明顯這個方法是為了讓Observer訂閱Observable的,或者說為了Observable把事件狀態傳遞給Observer的。

  • AbstractObservableWithUpstream
    這個類繼承了Observbable

    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
    
    protected final ObservableSource<T> source;
    
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    }

    從源碼可以看出這個類有變量source,它在構造函數裏傳入值,存儲ObservableSource對象。

    所以當我們調用Observable的subscribeOn方法的時候會創建一個ObservableSubscribeOn對象,並用變量source存儲當前的Observable對象,然後返回ObservableSubscribeOn對象。

四.unsubscribeOn方法

 public final Observable<T> unsubscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableUnsubscribeOn<T>(this, scheduler));
    }

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

這個方法跟上面的方法是一個模子刻的。所以我們主要看ObservableUnsubscribeOn這個類就好。

public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

這個類跟剛才的ObservableSubscribeOn也幾乎一模一樣,繼承自AbstractObservableWithUpstream類,使用source存了當前Observable對象。而此時的Observbvable對象是上一個方法創建的對象,也就是ObservableSubscribeOn對象。

五.observeOn方法和map方法

由於這些方法的內容基本一樣我就省略代碼的解釋。
observeOn方法是創建了ObservableObserveOn對象,並保存上一個方法創建的Observable。map方法是創建ObservableMap對象,並保存上一個方法創建的Observable
所以總結一下可知:鏈式調用這些方法的時候,都會創建一個相關的對象,然後用變量source存儲上一個方法創建的Observable子類對象。

六.subscribe方法

上次文章講到,這個方法內部會調用一個抽象方法,subscribeActual方法,作為真實的訂閱。而這個方法的邏輯需要看子類如何實現。
而第一次調用該這個subscribe方法的對象是ObservableMap對象。所以我們看看它內部如何實現的。
ObservableMap的subscribeActual方法實現:

  public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

內部調用了source的subscribe方法。此時ObservableMap對象裏存的source是上一個方法創建的observable,也就是ObservableObserveOn對象。所以我們要看看ObservableObserveOn是如何實現subscribeActual方法的:

protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

同理他最終也是調用了上一個Observable的subscribe。

於是我們知道當我們調用subscribe方法的時候,會遞歸式的調用source存儲的上一個方法創建的Observable的subscribeActual方法,一直到ObsservableCreate的subscribeActual的方法,把事件狀態傳遞給觀察者。這個上一篇文章已經講過。

七.總結

我們常見的普通的鏈式調用一般都會返回當前同一個對象。和普通的鏈式調用不同當我們調用Rxjava2的鏈式調用時,他們會返回自己對應的Observable子類對象,每個對象都不一樣,然後在subscribeActual方法中遞歸式的調用每個對象的subscribeActual方法,完成一個鏈式的調用。

寫在最後

在最後,我整理了一份資料,如果有需要學習的同學可以聯系我免費分享出來的,而且我們為了感謝很多支持的學者,在騰訊課堂每晚上20點有免費的直播教學,需要的同學可以來學習學習
領取方式:交流群653583088

技術分享圖片

Android進階:五、RxJava2源碼解析 2