1. 程式人生 > >Android進階:四、RxJava2 源碼解析 1

Android進階:四、RxJava2 源碼解析 1

andro 內部 創建 protected 直接 ram 存在 代碼 原理

本文適合使用過Rxjava2或者了解Rxjava2的基本用法的同學閱讀

一.Rxjava是什麽
Rxjava在GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫)。

通俗來說,Rxjava是一個采用了觀察者模式設計處理異步的框架。鏈式調用設計讓代碼優雅易讀。
舉個例子:

   Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }
        });

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

這是Rxjava2最簡單的用法:
1.創建一個Observable,重寫subscribe方法,這裏主要處理被觀察的事件。
2.訂閱這個Observable,事件會回調observer的方法,我們可以對事件做響應的處理
二.Rxjava源碼解析

2.1. 創建Observable:
創建Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。這個方法的參數是ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

ObservableEmitter對接口Emitter進行擴展,增加了setDisposable、setCancellable等方法
基本參數了解了,現在看看create方法裏面做了什麽,代碼如下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

調用了RxJavaPlugins的onAssembly方法。又有一個新參數ObservableCreate<T>(source),我們看看它是什麽:

final class ObservableCreate<T> extends Observable<T> {

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

}

繼承了Observable,所以也是個被觀察對象,在構造函數中我們看到我們new的ObservableOnSubscribe對象,被存在了ObservableCreate的source裏面
那我們繼續看看onAssembly方法做什麽:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

一個Hook方法。onObservableAssembly是一個靜態變量,我們沒有設置,默認為空,所以直接返回source對象。也就是說,Observable的create方法其實就是把我們ObservableOnSubscribe對象,存儲在ObservableCreate對象的source裏面,然後返回ObservableCreate對象。
我們知道ObservableCreate是繼承Observable的,所以創建了ObservableCreate對象,我們的Observable也就創建完了。

2.2 訂閱事件(被觀察者)
訂閱被觀察者的操作是observable.subscribe(new Observer<String>())。這個操作符其實是個“被動”,就是事件被觀察者觀察。因為subscribe方法裏的參數Observer才是觀察者。我們也會在Observer裏的各個會調方法裏接收到事件相關的返回值。
我們看看subscribe方法的源碼:

 public final void subscribe(Observer<? super T> observer) {
        try {
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            RxJavaPlugins.onError(e);
        }
    }

看代碼我們知道最主要調用的方法是:subscribeActual(observer);,這個方法是Observable裏的抽象方法,而此時我們的Observable是一個ObservableCreate對象(前面create方法返回的對象)。所以我們去看一下ObservableCreate裏面是如何重寫這個方法的。代碼如下:

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

我們一看到這個方法主要做了三件事:
①創建一個CreateEmitter對象parent。
②把parent傳給source的subscribe方法。上面我們知道source就是剛才存的ObservableOnSubscribe對象,subscribe也就是我們重寫的方法:

          @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }

所以我們在這個方法裏就能收到一個CreateEmmiter,通過CreateEmitter可以回調相應的方法。CreateEmitter是實現ObservableEmitter接口,我們看看它內部實現,如:onNext源碼如下:

@Override
public void onNext(T t) {
    observer.onNext(t);
}

也就是說,當我們在ObservableOnSubscribe的subscribe方法裏調用ObservableEmitter的onNext方法的時候,它裏面會調用observer的onNext。於是通過這樣的傳遞,我們就能在observer裏響應的回調方法裏收到事件的相關狀態。

至此一個簡單Rxjava流式傳遞原理已經講完了,總結流程如下:

  • 使用Observbable.create方法,產生一個ObservableCreate對象,對象裏存著ObservableOnSubscribe對象source。
  • 調用ObservableCreate.subscribe方法,實際調用的是subscribeActual方法,傳入一個Observer對象。
  • subscribeActual方法中創建一個CreateEmmiter對象,調用source.subscribe方法,傳入CreateEmmiter對象。
  • 於是我們在ObservableOnSubscribe中就接收到了一個CreateEmmiter,CreateEmmiter是ObservableEmmiter的子類。我們可以在這裏調用CreateEmmiter的方法進行事件回調。
  • 調用CreateEmmiter方法,實際上會調用Observer的響應的方法。也就是CreateEmmiter把事件狀態傳遞給觀察者。

Android進階:四、RxJava2 源碼解析 1