1. 程式人生 > >RxJava 2.X 中的observable鏈是怎樣形成的?

RxJava 2.X 中的observable鏈是怎樣形成的?

要理解RxJava框架,就需要理清楚其鏈路是怎樣形成的。

先看一段簡單的程式碼:

       Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("result:"+s);
            }
        });

打印出來的結果:

result:hello

我們先來分析一下其中的observable鏈:

public abstract class Observable<T> implements ObservableSource<T> {

Observable是一個抽象類,它實現了ObservableSource介面,也就說我們後面如果看到ObservableSource介面的例項,可以看作是Observable向上轉型的例項。

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

在Observable create的過程:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

先不看RxJavaPlugins中的內容,呼叫者只需要傳入一個ObservableOnSubscribe的實現類,RxJava會將其以建構函式引數的形式傳入,並封裝構造一個ObservableCreate物件。ObservableCreate不再是一個抽象類,而且繼承了Observable物件。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
...

於是這個時候,我們構造出了一個具體的Observable例項。

Observable抽象類,中只有一個方法是抽象的。

protected abstract void subscribeActual(Observer<? super T> observer);

所以,每個Observable子類例項化過程中,都需要實現subscribeActual方法。

回到之前的問題,Observable在鏈式呼叫過程中是怎樣形成Observable鏈的呢?

我們接著看原始碼,得到ObservableCreate例項後,構建了一個單一入參的Consumer物件,並呼叫了subcribe方法,跟進去看一下:

public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION,         
                Functions.emptyConsumer());
}

繼續跟:

 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ...
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

LambdaObserver同樣地,以建構函式的方式將Consumer物件封裝到了Observer中,這樣讓呼叫者用很簡潔的程式碼構造出一個Observer

public final void subscribe(Observer<? super T> observer) {
        ...
        try {
            ...
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            ...
        }
    }

最後,發現subscribe呼叫的是Observable的實現類中的subscribeActual()方法。我們來看看之前的Observable子類ObservableCreate是如何實現subscribeActual方法的:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

看到這裡有個source.subscribe(),於是直接呼叫了我們傳入的介面ObservableOnSubscribe例項中的subscribe方法。

因為我們這裡只有一個Observable,所以還沒有看到鏈,沒關係,先走一遍,後面再加入幾個Observable再瞧瞧。

source.subscribe()後,由於我們在subscribe方法中執行了emitter的onNext()方法,所以,consumer收到了具體的訊息。

Emitter是個什麼東東?什麼時候傳給subscribe()方法?

之前講到Consumer被封裝到了一個lambdaObserver中,現在傳了進來,並且以建構函式的方式傳到CreateEmitter中,這種構造類的方式好眼熟啊,在哪兒看到過?

        try {
            InputStream stream = new BufferedInputStream(new FileInputStream(new             
            File("//abc")));
            ...
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } finally{
            ...
        }

裝飾者模式。ContextWrapper和ContextImpl也比較類似。

這裡Emitter把Observer作為入參傳進來

 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            ...
        }

        @Override
        public boolean tryOnError(Throwable t) {
            ...
        }

        @Override
        public void onComplete() {
            ...
        }

        @Override
        public void setDisposable(Disposable d) {
            ...
        }

        ...

    }

可以看到起對onNext()方法進行了增強,添加了異常處理,disposable中斷,最後還是呼叫了observer的onNext()方法。然而CreateEmitter與Observer之間並沒有相同的父類,雖然有同樣名稱的方法。所以此處並非裝飾者模式。

在emitter中呼叫了observer的OnNext(),於是lambdaObserver收到了onNext()回來的資料,

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }

lambdaObserver收到資料後,呼叫了Consumer的accept()方法,於是乎,我們最後成功在Consumer中列印到最後的字串。

流程走了一遍。接下來,我們看,如果再加入幾個Observable的轉換後,多個Observable是如何形成鏈式關係了。

比如,我們把最開頭的test1原始碼做一些修改:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello");
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "abc-"+s;
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s+"-def";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("result:"+s);
            }
        });

再次插入兩條map操作。

看看observable的靜態方法map()都做了些什麼:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

似乎看到了一些細微的區別,我們來對比一下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

create()的時候只有一個入參,而現在new ObservableMap()的時候有兩個入參,除了Function()用來構造一個新的Observable外,還傳入了一個this,把當前這個observable例項也傳了進去。

傳進去幹嘛?

莫非這裡就與鏈有關了?

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
...

只看到source傳了進來,沒看到哪兒用到了啊?

只看到被父類的建構函式拿去用來。

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

我們來分析一下父類,ObservableSource是幹嘛?之間好像有討論過,對不對?

抽象類Observable實現了ObservableSource,所以這裡相當於向上轉型了,也就說我在構造下一個mapObservable的時候,把上一個Observable存了起來,這不正是一個連結串列嗎?

明白了吧。

接著分析下,加入map後,後面的subscribe()發生了一些怎樣的變化,observer的鏈又是怎樣形成的。

欲知後事如何,且看下回分解。