Rxjava2原始碼學習(一)
這一篇主要看一下Rxjava的鏈式呼叫(Builder設計模式) 和 資料如何實現傳輸。
首先看一下下面這個簡單的程式碼片段:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("test");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("----", "onNext: "+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
鏈式呼叫
我們先看一下鏈式呼叫的實現:
Rxjava採用的是鏈式呼叫,比如我們看一下Observable.create()方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T >(source));
}
那返回的RxJavaPlugins.onAssembly又是什麼呢?
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
我們可以看到,返回的是一個Observable物件!
所以Observable.create()之後得到的是一個Observable物件,接著看
.subscribeOn(Schedulers.io())
通過上面我們可以知道,RxJavaPlugins.onAssembly返回的是Observable物件,所以subscribeOn方法中返回的也是一個Observable物件。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
兩者不同的是new的物件不一樣。
class ObservableCreate<T> extends Observable<T>
class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
class AbstractObservableWithUpstream<T, U> extends Observable<U>
但是他們都是繼承自Observable。
接著我們看
.observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
不是猜,返回的也是Observable物件
class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
class AbstractObservableWithUpstream<T, U> extends Observable<U>
通過不斷的返回Observable物件,實現了鏈式呼叫。
資料傳輸
資料傳輸的核心思想是介面的回撥機制來實現的,通過介面的定義宣告和具體實現來完成資料的傳輸。
不妨先回憶一下RecyclerView的item點選事件。
RecyclerView本身是沒有item的點選事件的,那我們自己寫的時候是如何實現呢?
我們一般先定義一個點選事件的Listener。
interface ItemClickListener{
fun setOnItemClick(position: Int)
}
我們在Adapter中宣告這個點選事件,在item的需要點選的地方,例如整個item,對viewHolder.setOnClickListener的具體實現中,宣告我們定義的Listener,例如listener.setOnItemClick。
holder.itemView.setOnClickListener {
listener!!.setOnItemClick(position)
}
然後我們在Activity或者Fragment中,我們需要實現這個Listener,這樣我們在點選item的時候,通過viewHolder實現的點選事件,再通過自定義的Listener的宣告,我們在Activity或者Fragment中的Listener的具體實現中就能拿到我們需要的資料,例如position了。
adapter.setItemClickListener(object : ItemClickListener{
override fun setOnItemClick(position: Int) {
//介面具體的實現
}
})
Rxjava中資料傳輸的本質也是通過上述的方式實現的。
這裡的資料傳輸暫時不包括執行緒切換這一塊。
還是拿上面的例子去掉執行緒切換這塊來說:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("test");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("----", "onNext: "+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("test");
}
})
我們以create方法為例,create方法將ObservableOnSubscribe介面的具體實現的地址作為source,
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
在create方法中,通過new ObservableCreate(source)創造一個observable物件(ObservableCreate物件繼承自Observable)。最後呼叫RxJavaPlugins.onAssembly(new ObservableCreate(source));返回該Observable物件
我們來看一下ObservableCreate類是怎麼做的:
ObservableCreate繼承自Observable,實現了它的subscribeActual(Observer
protected abstract void subscribeActual(Observer<? super T> observer);
我這裡只保留了關鍵程式碼
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}
當Observable呼叫subscribe方法時,真正呼叫的是subscribeActual方法。
那我們回到ObservableCreate類中看看subscribeActual方法做了什麼:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這裡首先new了一個CreateEmitter(資料發射器),建構函式中加入了Observable的Subscribe方法中傳入的Observer物件。然後
source.subscribe(parent);
這裡的source就是上面create的時候傳入的ObservableOnSubscribe物件。到這裡為止,有沒有聯想到之前提的recycleview點選事件的例子?
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableOnSubscribe是一個介面,它的subscribe方法中需要的是一個數據發射器,在Observable的create方法中new一個ObservableOnSubscribe物件作為source之後,這個source在ObservableCreate類中訂閱了發射器,拿到了我們剛new出來的CreateEmitter物件(類似於recycleview例子中adapter中拿到的position),在外部的ObservableOnSubscribe的具體實現中我們拿到了這個CreateEmitter物件,用於發射資料。
同理我們在反過來,在Observable中的create方法中,ObservableOnSubscribe的subscribe方法中我們拿到了CreateEmitter物件,
public interface ObservableEmitter<T> extends Emitter<T>
繼承自Emitter介面
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
在外部調宣告介面,並呼叫它的方法
emitter.onNext("test");
在內部的CreateEmitter類中有它的具體實現
static final class CreateEmitter<T> implements ObservableEmitter<T>{
@Override
public void onNext(T t) {
//關鍵程式碼
observer.onNext(t);
}
}
這裡的思想像是之前RecyclerView例子的翻轉,在外部對於介面的宣告,在內部實現它,並做下一步的處理。
到此資料就通過ObservableOnSubscribe物件拿到的ObservableEmitter發射器,呼叫發射資料的程式碼,到達了其具體實現的地方,我們可以看到,在其具體實現的地方,呼叫了
observer.onNext(t);
這裡的本質是和第一步獲取發射器是一樣的。
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("----", "onNext: "+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
在外部是Observer物件的具體實現,作為引數傳入subscribe方法中。通過
subscribeActual(observer);
進入到ObservableCreate類中,再通過
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
進入到CreateEmitter類中,在CreateEmitter的具體實現,例如onNext方法中,宣告這個介面的方法,上面已經提到過了:
static final class CreateEmitter<T> implements ObservableEmitter<T>{
@Override
public void onNext(T t) {
//關鍵程式碼
observer.onNext(t);
}
}
很熟悉是不是,這裡是內部的Observer介面方法的呼叫,外部是介面方法的具體實現。
最後附上一張畫的不是很好的原理圖: