1. 程式人生 > >Rxjava2原始碼學習(一)

Rxjava2原始碼學習(一)

這一篇主要看一下Rxjava的鏈式呼叫(Builder設計模式) 和 資料如何實現傳輸。

首先看一下下面這個簡單的程式碼片段:

Observable.create(new ObservableOnSubscribe<String>() {
       @Override
       public void subscribe(ObservableEmitter<String> emitter) throws Exception {
           emitter.onNext("test");
       }
   })
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(new
Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.d("----", "onNext: "+s); } @Override public void onError(Throwable e) { } @Override public
void onComplete() { } });

鏈式呼叫

我們先看一下鏈式呼叫的實現:
Rxjava採用的是鏈式呼叫,比如我們看一下Observable.create()方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T
>(source)); }

那返回的RxJavaPlugins.onAssembly又是什麼呢?

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我們可以看到,返回的是一個Observable物件!

所以Observable.create()之後得到的是一個Observable物件,接著看

.subscribeOn(Schedulers.io())

通過上面我們可以知道,RxJavaPlugins.onAssembly返回的是Observable物件,所以subscribeOn方法中返回的也是一個Observable物件。

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

兩者不同的是new的物件不一樣。

class ObservableCreate<T> extends Observable<T>

class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
class AbstractObservableWithUpstream<T, U> extends Observable<U>

但是他們都是繼承自Observable。

接著我們看

.observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

不是猜,返回的也是Observable物件

class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
class AbstractObservableWithUpstream<T, U> extends Observable<U>

通過不斷的返回Observable物件,實現了鏈式呼叫。

資料傳輸

資料傳輸的核心思想是介面的回撥機制來實現的,通過介面的定義宣告和具體實現來完成資料的傳輸。
不妨先回憶一下RecyclerView的item點選事件。
RecyclerView本身是沒有item的點選事件的,那我們自己寫的時候是如何實現呢?
我們一般先定義一個點選事件的Listener。

interface ItemClickListener{
    fun setOnItemClick(position: Int)
}

我們在Adapter中宣告這個點選事件,在item的需要點選的地方,例如整個item,對viewHolder.setOnClickListener的具體實現中,宣告我們定義的Listener,例如listener.setOnItemClick。

holder.itemView.setOnClickListener {
   listener!!.setOnItemClick(position)
}

然後我們在Activity或者Fragment中,我們需要實現這個Listener,這樣我們在點選item的時候,通過viewHolder實現的點選事件,再通過自定義的Listener的宣告,我們在Activity或者Fragment中的Listener的具體實現中就能拿到我們需要的資料,例如position了。

adapter.setItemClickListener(object : ItemClickListener{
    override fun setOnItemClick(position: Int) {
        //介面具體的實現
    }

})

Rxjava中資料傳輸的本質也是通過上述的方式實現的。
這裡的資料傳輸暫時不包括執行緒切換這一塊。

還是拿上面的例子去掉執行緒切換這塊來說:

 Observable.create(new ObservableOnSubscribe<String>() {
   @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("test");
    }
})
.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("----", "onNext: "+s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
Observable.create(new ObservableOnSubscribe<String>() {
   @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("test");
    }
})

我們以create方法為例,create方法將ObservableOnSubscribe介面的具體實現的地址作為source,

return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));

在create方法中,通過new ObservableCreate(source)創造一個observable物件(ObservableCreate物件繼承自Observable)。最後呼叫RxJavaPlugins.onAssembly(new ObservableCreate(source));返回該Observable物件

我們來看一下ObservableCreate類是怎麼做的:
ObservableCreate繼承自Observable,實現了它的subscribeActual(Observer

protected abstract void subscribeActual(Observer<? super T> observer);

我這裡只保留了關鍵程式碼

public final void subscribe(Observer<? super T> observer) {

        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {

        }
    }

當Observable呼叫subscribe方法時,真正呼叫的是subscribeActual方法。
那我們回到ObservableCreate類中看看subscribeActual方法做了什麼:

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

這裡首先new了一個CreateEmitter(資料發射器),建構函式中加入了Observable的Subscribe方法中傳入的Observer物件。然後

source.subscribe(parent);

這裡的source就是上面create的時候傳入的ObservableOnSubscribe物件。到這裡為止,有沒有聯想到之前提的recycleview點選事件的例子?

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

ObservableOnSubscribe是一個介面,它的subscribe方法中需要的是一個數據發射器,在Observable的create方法中new一個ObservableOnSubscribe物件作為source之後,這個source在ObservableCreate類中訂閱了發射器,拿到了我們剛new出來的CreateEmitter物件(類似於recycleview例子中adapter中拿到的position),在外部的ObservableOnSubscribe的具體實現中我們拿到了這個CreateEmitter物件,用於發射資料。

同理我們在反過來,在Observable中的create方法中,ObservableOnSubscribe的subscribe方法中我們拿到了CreateEmitter物件,

public interface ObservableEmitter<T> extends Emitter<T> 

繼承自Emitter介面

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

在外部調宣告介面,並呼叫它的方法

emitter.onNext("test");

在內部的CreateEmitter類中有它的具體實現

static final class CreateEmitter<T> implements ObservableEmitter<T>{
    @Override
    public void onNext(T t) {

        //關鍵程式碼
        observer.onNext(t);

    }
}

這裡的思想像是之前RecyclerView例子的翻轉,在外部對於介面的宣告,在內部實現它,並做下一步的處理。

到此資料就通過ObservableOnSubscribe物件拿到的ObservableEmitter發射器,呼叫發射資料的程式碼,到達了其具體實現的地方,我們可以看到,在其具體實現的地方,呼叫了

observer.onNext(t);

這裡的本質是和第一步獲取發射器是一樣的。

.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.d("----", "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

在外部是Observer物件的具體實現,作為引數傳入subscribe方法中。通過

subscribeActual(observer);

進入到ObservableCreate類中,再通過

CreateEmitter<T> parent = new CreateEmitter<T>(observer);

進入到CreateEmitter類中,在CreateEmitter的具體實現,例如onNext方法中,宣告這個介面的方法,上面已經提到過了:

static final class CreateEmitter<T> implements ObservableEmitter<T>{
    @Override
    public void onNext(T t) {

        //關鍵程式碼
        observer.onNext(t);

    }
}

很熟悉是不是,這裡是內部的Observer介面方法的呼叫,外部是介面方法的具體實現。

最後附上一張畫的不是很好的原理圖:
這裡寫圖片描述