1. 程式人生 > >筆記之RxJava原理

筆記之RxJava原理

RxJava也用了一段時間了,操作符看了一遍又一遍,熟悉了很多,記錄下RxJava原理和自己的理解

RxJava最基本,先貼程式碼

  Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }                        
                });

上門程式碼是不是很簡單呀,看下操作,首先看追create

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        // 非null判斷
        ObjectHelper.requireNonNull(source, "source is null");
       // 建立了ObservableCreate物件,onAssembly沒必要追, 就是判斷了onObservableAssembly是否為null,不為null執行其的apply方法,都返回ObservableCreate物件
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

那麼接下來看下ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ......
}

ObservableCreate集成了Observable,這裡就可以衍生出一個問題,所有的建立觀察者的操作符其實都是集成了Observable的,接下來追subscribe方法

   public final void subscribe(Observer<? super T> observer) {
        // 非null判斷
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // 還是返回當前的observer
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            // 注意關鍵在這裡,也就是呼叫了實現類的subscribeActual也就是
            //ObservableCreate的subscribeActual方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

接下來閱讀ObservableCreate的subscribeActual方法

 protected void subscribeActual(Observer<? super T> observer) {
        // 建立了一個CreateEmitter物件包裹了下,然後傳遞給了
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 呼叫了observer的onSubscribe方法,也就是訂閱開始方法
        observer.onSubscribe(parent);

        try {
        // 大家還記的source是誰嗎?就是ObservableOnSubscribe物件,在建立的時候建立的物件
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

讀到這裡就是最後,拿著包裹著的observer的CreateEmitter物件執行了onext或onError方法,彈射資料,這就是RxJava最基本的實現邏輯