RxJava2原始碼分析二之just、fromArray、fromIterable
Observable.just:接收1個以上,10個以下的引數,然後逐個發射。
Observable.fromArray:接收一個數組,從陣列中一個一個取出來發射。
今天從原始碼來看一下Observable的just方法和FromArray。
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); } public static <T> Observable<T> just(T item1) { //... } //... public static <T> Observable<T> just(T item1, T item2, T item3, T item4) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); ObjectHelper.requireNonNull(item3, "The third item is null"); ObjectHelper.requireNonNull(item4, "The fourth item is null"); return fromArray(item1, item2, item3, item4); } //... public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) { //... }
通過原始碼可以看到,just方法又多個過載方法,引數從1到10。由此可以知道,just接收1個以上10個以下引數。以4個引數的just方法為例。just會對所有引數逐個判空。然後呼叫fromArray方法,這裡很明顯,除了只接收1個引數的just之外,just方法最終呼叫的是。
先看接收1個引數的just方法,這裡返回RxJavaPlugins.onAssembly(new ObservableJust<T>(item))。RxJavaPlugins.onAssembly上一篇文章講過它的作用,這裡直接返回的是ObservableJust
@Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable { final Observer<? super T> observer; final T value; static final int START = 0; static final int FUSED = 1; static final int ON_NEXT = 2; static final int ON_COMPLETE = 3; public ScalarDisposable(Observer<? super T> observer, T value) { this.observer = observer; this.value = value; } //... @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } }
在ObservableJust的subscribeActual方法中,先建立ScalarDisposable,緊接著呼叫run方法。ScalarDisposable繼承AtomicInteger類,在run方法中呼叫get()方法返回的是記憶體中儲存著一個值o,一開始是和START一樣為0。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
compareAndSet方法是AtomicInteger中的方法,它的作用是:判斷expect和0是不是想等,如果相等則將0的值改成update。
接著就是呼叫observer.onNext(value);因為引數之後一個,並且0的值改成了ON_NEXT,所以很快呼叫了observer.onComplete();。lazySet(ON_COMPLETE);的作用是延遲一會兒設定o的值為ON_COMPLETE。
到這,只接收一個引數的just方法分析完成,接收多個引數的just會將引數轉換成陣列,呼叫fromArray方法。下面看fromArray方法。
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
可以看到fromArray方法中,分為陣列長度為0,1,大於1的情況。其中長度為1時,呼叫接收一個引數的just方法。為0時呼叫empty()方法。
public static <T> Observable<T> empty() {
return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
}
public final class ObservableEmpty extends Observable<Object> implements ScalarCallable<Object> {
public static final Observable<Object> INSTANCE = new ObservableEmpty();
private ObservableEmpty() {
}
@Override
protected void subscribeActual(Observer<? super Object> o) {
EmptyDisposable.complete(o);
}
//...
}
public static void complete(Observer<?> s) {
s.onSubscribe(INSTANCE);
s.onComplete();
}
在empty()方法中,返回的是ObservableEmpty物件。在ObservableEmpty類中可以看到subscribeActual方法呼叫EmptyDisposable.complete,在這個方法中,可以看到,只調用了Observer的onSubscribe和onComplete方法。
當fromArray中陣列長度大於1時, 返回ObservableFromArray的類物件。
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> actual;
final T[] array;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
//...
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
}
可以看到subscribeActual方法建立FromArrayDisposable物件,最終呼叫FromArrayDisposable物件的run方法。在這個方法中通過for迴圈呼叫Observer的onNext方法處理接收的資料。
fromArray結束之後,再看一下fromIterable。
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}
public final class ObservableFromIterable<T> extends Observable<T> {
final Iterable<? extends T> source;
public ObservableFromIterable(Iterable<? extends T> source) {
this.source = source;
}
@Override
public void subscribeActual(Observer<? super T> s) {
Iterator<? extends T> it;
try {
it = source.iterator();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
boolean hasNext;
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
if (!hasNext) {
EmptyDisposable.complete(s);
return;
}
FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
s.onSubscribe(d);
if (!d.fusionMode) {
d.run();
}
}
static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> {
FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
this.actual = actual;
this.it = it;
}
void run() {
boolean hasNext;
do {
if (isDisposed()) {
return;
}
T v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
actual.onNext(v);
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
} while (hasNext);
if (!isDisposed()) {
actual.onComplete();
}
}
}
這裡建立了ObservableFromIterable物件,可以看到,程式碼很簡單,也是通過迭代器的方式處理接收到的資料。