1. 程式人生 > >RxJava2原始碼分析一

RxJava2原始碼分析一

       RxJava 在最近兩年迅速火爆起來,最近學習RxJava2,免不了需要學習它的原始碼,寫下部落格記錄學習結果。

       RxJava 的設計理念基於觀察者模式,這裡就需要先了解一下它所涉及的東西。Observable,稱為被觀察者,由它產生一系列的事件。Observer,稱為觀察者。Observer和Observable之間通過subscribe方法發生訂閱關係。這樣Observer就可以 ”觀察“ Observable發生的事件,並根據這些事件做出相應的動作。

        先看示例程式碼:

  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext( 1);
                emitter.onNext( 2); 
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("FirstMain.onSubscribe d="+d.isDisposed());
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("FirstMain.onNext i="+integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("FirstMain.onError");
            }

            @Override
            public void onComplete() {
                System.out.println("FirstMain.onComplete");
            }
        });

       可以看到如上程式碼,首先呼叫Observablecreate方法建立一個Observable物件,實際建立的是ObservableCreate的物件,ObservableCreate是Observable的子類。在create方法中首先進行判空。然後建立並返回ObservableCreate物件。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判空
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

返回ObservableCreate物件時先呼叫了RxJavaPlugins.onAssembly方法,下面看一下這個方法:

 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
}

 static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
}

    可以看到RxJavaPlugins.onAssembly方法是對Observable物件經行一步操作,就是使用Function物件變數onObservableAssembly給Observable物件經行操作。這裡onObservableAssembly物件變數為null,所以實際上onAssembly方法返回的時原來的Observable物件。如果有需要對Observable物件進行什麼操作的話,可以給onObservableAssembly物件變數賦值。

       接著呼叫了Observable的subscribe方法,是的Observer和Observable發生訂閱關係。

 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
           //...
        }
    }

可以看到在subscribe方法中,先對傳進來的引數判空,然後RxJavaPlugins.onSubscribe也是根據onObservableSubscribe是否為空對observer經行操作。原始碼如下:

public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
}
        return observer;
    }

static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
        try {
            return f.apply(t, u);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
}

接著呼叫subscribeActual方法,這裡可以要知道,因為create方法返回的是ObservableCreate物件,所以呼叫的是ObservableCreatesubscribeActual方法。

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
}

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

       //...
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
       //...
}

      subscribeActual方法中,先建立了CreateEmitter物件,它是用來發射事件的。然後呼叫了ObserveronSubscribe方法。

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);
 
    void onError(@NonNull Throwable e);
 
    void onComplete();
}

          可以看到Observer是一個介面,它是在Observable呼叫subscribe函式的時候建立的,並實現了Observer的四個方法,而ObservableCreatesubscribeActual方法中observer.onSubscribe()這一句呼叫的就是此時實現的onSubscribe方法。

         緊接著呼叫了source.subscribe(parent);這裡的source是在crete方法中建立ObservableCreate物件的時候傳進去的。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

所以source.subscribe呼叫的是ObservableOnSubscribesubscribe方法,而這個方法就是在create方法的引數實現的回撥函式。

總結

     上面程式碼調整幅度太大,這裡簡單總結方法呼叫過程(有部分同名方法,所以帶上了類名):

        Observable.create實現ObservableOnSubscribe.subscribe
        Observable.subscribe(攜帶Observer)呼叫ObservableCreate.subscribeActual(攜帶Observer)
        ObservableCreate.subscribeActual(攜帶Observer)呼叫ObservableOnSubscribe.subscribe
     ObservableOnSubscribe.subscribe又呼叫CreateEmitter物件(攜帶Observer)的方法(onNext、onComplete、onError)發射事件
        CreateEmitter的方法中最終呼叫到了Observer的方法(onNext、onComplete、onError)

        到這,RxJava分析的整個流程就結束了,就是資料從Observable中流出,通過subscribe方法關聯Observer,然後Observer接收資料,並對資料經行處理。