1. 程式人生 > >RxJava2原始碼分析二之just、fromArray、fromIterable

RxJava2原始碼分析二之just、fromArray、fromIterable

     Observable.just:接收1個以上,10個以下的引數,然後逐個發射。   

     Observable.fromArray:接收一個數組,從陣列中一個一個取出來發射。  

     今天從原始碼來看一下Observable的just方法和FromArray。

      

public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

public static <T> Observable<T> just(T item1) {
     //...
}

//...

public static <T> Observable<T> just(T item1, T item2, T item3, T item4) {
        ObjectHelper.requireNonNull(item1, "The first item is null");
        ObjectHelper.requireNonNull(item2, "The second item is null");
        ObjectHelper.requireNonNull(item3, "The third item is null");
        ObjectHelper.requireNonNull(item4, "The fourth item is null");

        return fromArray(item1, item2, item3, item4);
}

//...

public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) {
    //...
}

       通過原始碼可以看到,just方法又多個過載方法,引數從1到10。由此可以知道,just接收1個以上10個以下引數。以4個引數的just方法為例。just會對所有引數逐個判空。然後呼叫fromArray方法,這裡很明顯,除了只接收1個引數的just之外,just方法最終呼叫的是。

   先看接收1個引數的just方法,這裡返回RxJavaPlugins.onAssembly(new ObservableJust<T>(item))RxJavaPlugins.onAssembly上一篇文章講過它的作用,這裡直接返回的是ObservableJust

的物件。ObservableJust它繼承自Observable類,在上一篇文章中可以知道,在ObservableObserver發生訂閱關係的使用呼叫的subscribe方法會用到Observable的subscribeActual方法。所以這裡會呼叫ObservableJustsubscribeActual方法。

@Override
protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
}

public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

        final Observer<? super T> observer;

        final T value;

        static final int START = 0;
        static final int FUSED = 1;
        static final int ON_NEXT = 2;
        static final int ON_COMPLETE = 3;

        public ScalarDisposable(Observer<? super T> observer, T value) {
            this.observer = observer;
            this.value = value;
        }

       //...

        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

      在ObservableJustsubscribeActual方法中,先建立ScalarDisposable,緊接著呼叫run方法。ScalarDisposable繼承AtomicInteger類,在run方法中呼叫get()方法返回的是記憶體中儲存著一個值o,一開始是和START一樣為0。

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

compareAndSet方法是AtomicInteger中的方法,它的作用是:判斷expect和0是不是想等,如果相等則將0的值改成update。

    接著就是呼叫observer.onNext(value);因為引數之後一個,並且0的值改成了ON_NEXT,所以很快呼叫了observer.onComplete();。lazySet(ON_COMPLETE);的作用是延遲一會兒設定o的值為ON_COMPLETE。

       到這,只接收一個引數的just方法分析完成,接收多個引數的just會將引數轉換成陣列,呼叫fromArray方法。下面看fromArray方法。

public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

         可以看到fromArray方法中,分為陣列長度為0,1,大於1的情況。其中長度為1時,呼叫接收一個引數的just方法。為0時呼叫empty()方法。

public static <T> Observable<T> empty() {
        return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
}

public final class ObservableEmpty extends Observable<Object> implements ScalarCallable<Object> {
    public static final Observable<Object> INSTANCE = new ObservableEmpty();

    private ObservableEmpty() {
    }

    @Override
    protected void subscribeActual(Observer<? super Object> o) {
        EmptyDisposable.complete(o);
    }

   //...
}


public static void complete(Observer<?> s) {
        s.onSubscribe(INSTANCE);
        s.onComplete();
}

     在empty()方法中,返回的是ObservableEmpty物件。在ObservableEmpty類中可以看到subscribeActual方法呼叫EmptyDisposable.complete,在這個方法中,可以看到,只調用了Observer的onSubscribe和onComplete方法。

        當fromArray中陣列長度大於1時,  返回ObservableFromArray的類物件。

public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

 static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> actual;

        final T[] array;


        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

    //...

        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
}

        可以看到subscribeActual方法建立FromArrayDisposable物件,最終呼叫FromArrayDisposable物件的run方法。在這個方法中通過for迴圈呼叫Observer的onNext方法處理接收的資料。

        fromArray結束之後,再看一下fromIterable。

public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}

public final class ObservableFromIterable<T> extends Observable<T> {
    final Iterable<? extends T> source;
    public ObservableFromIterable(Iterable<? extends T> source) {
        this.source = source;
    }

    @Override
    public void subscribeActual(Observer<? super T> s) {
        Iterator<? extends T> it;
        try {
            it = source.iterator();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, s);
            return;
        }
        boolean hasNext;
        try {
            hasNext = it.hasNext();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, s);
            return;
        }
        if (!hasNext) {
            EmptyDisposable.complete(s);
            return;
        }

        FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
        s.onSubscribe(d);

        if (!d.fusionMode) {
            d.run();
        }
    }

    static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> {

        FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
            this.actual = actual;
            this.it = it;
        }

         void run() {
            boolean hasNext;

            do {
                if (isDisposed()) {
                    return;
                }
                T v;

                try {
                    v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    actual.onError(e);
                    return;
                }

                actual.onNext(v);

                if (isDisposed()) {
                    return;
                }
                try {
                    hasNext = it.hasNext();
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    actual.onError(e);
                    return;
                }
            } while (hasNext);

            if (!isDisposed()) {
                actual.onComplete();
            }
        }
     
    }

        這裡建立了ObservableFromIterable物件,可以看到,程式碼很簡單,也是通過迭代器的方式處理接收到的資料。