1. 程式人生 > >RxJava2.X 原始碼分析 一

RxJava2.X 原始碼分析 一

本文內容大致如下:

  1. 初步瞭解RxJava2.X的使用流程 ;
  2. 探索Observable傳送資料的流程 ;
  3. 明白Observer是如何接收資料的 ;
  4. 解析Observable與Observer的勾搭(如何關聯)過程 ;
  5. 探索RxJava執行緒切換的奧祕 ;
  6. 瞭解RxJava操作符的實現原理。

探索RxJava2分發訂閱流程

從Demo到原理

//1、觀察者建立一個Observer
Observer observer = new Observer() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "onSubscribe"
); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "onNext data is :" + s); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError data is :" + e.toString()); } @Override public
void onComplete() { Log.d(TAG, "onComplete"); } }; Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"
); e.onComplete(); } }); observable.subscribe(observer);

結果輸出:

onSubscribe
onNext data is :hello
onNext data is :world
onComplete

可以看到,Observer的onSubscribe是最先被呼叫的,這個回撥會有什麼用呢?我們後面會講到。

由於整個流程是從create開始的,我們就從源頭開始分析。create方法返回的是一個observable物件,也就是被觀察的物件。create方法需要傳入一個ObservableOnSubscribe來建立,我們看下ObservableOnSubscribe是什麼:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

該介面會接收一個ObservableEmitter的一個物件,然後通過該物件我們可以傳送訊息也可以安全地取消訊息,我們繼續看ObservableEmitter這個介面類。

public interface ObservableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(@Nullable Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(@Nullable Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence or the
     * emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a
     * successful {@link #tryOnError(Throwable)}.
     * <p>This method is thread-safe.
     * @return true if the downstream disposed the sequence or the emitter was terminated
     */
    boolean isDisposed();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized ObservableEmitter
     */
    @NonNull
    ObservableEmitter<T> serialize();

    /**
     * Attempts to emit the specified {@code Throwable} error if the downstream
     * hasn't cancelled the sequence or is otherwise terminated, returning false
     * if the emission is not allowed to happen due to lifecycle restrictions.
     * <p>
     * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
     * if the error could not be delivered.
     * @param t the throwable error to signal if possible
     * @return true if successful, false if the downstream is not able to accept further
     * events
     * @since 2.1.1 - experimental
     */
    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}

ObservableEmitter是對Emitter的擴充套件,而擴充套件的方法證實RxJava2.0之後引入的,提供了可中途取消等新能力,我們繼續看Emitter。

/**
 * Base interface for emitting signals in a push-fashion in various generator-like source
 * operators (create, generate).
 *
 * @param <T> the value type emitted
 */
public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

裡面的三個方法使用過rx的應該非常眼熟了。看到這裡,我們只是瞭解了傳遞引數的資料結構,瞭解到的資訊還是比較少的。我們繼續看下create內部做了什麼操作。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins或許你會很陌生,其實我也很陌生,不過沒關係,我覺得後面會經常遇到RxJavaPlugins,熟悉它是必然的;

可以看到我們傳入ObservableOnSubscribe被用來建立ObservableCreate,其實ObservableCreate就是Observable的一個實現類哦。

思路梳理

OK,到這裡我們先梳理一下思路:
1、Observable通過呼叫create建立一個Observable
2、呼叫create時需要傳入一個ObservableOnSubscribe型別的例項引數
3、最終傳入的ObservableOnSubscribe型別的例項引數作為ObservableCreate建構函式的引數傳入,一個Observable就此誕生了

ObservableCreate又是個什麼東東呢?我們分步來,先看ObservableCreate的兩個方法。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ......
 }

source:Observable.createc傳入的ObservableOnSubscribe例項;

subscribeActual回撥方法,它在呼叫Observable.subscribe時被呼叫,即與觀察者或則訂閱者發生聯絡時觸發。subscribeActual也是實現我們主要邏輯的地方,我們來仔細分析下subscribeActual方法:
1. 首先subscribeActual傳入的引數為Observer型別,也就是我們subscribe時傳入的觀察者,到底是不是呢?後面會分析到。
2. 傳入的Observer會被包裝成一個CreateEmitter,CreateEmitter繼承了AtomicReference提供了原子級的控制能力。RxJava2.0提供的新特性與之息息相關哦,這個我們先給它來個關鍵標籤,後面再詳細分析。
3. 觀察者(observer)呼叫自己的onSubscribe(parent);將包裝後的observer傳入。這個也是RxJava2.0的變化,真正的訂閱在source.subscribe(parent);這句程式碼被執行後開始,而在此之前先呼叫了onSubscribe方法來提供RxJava2.0後引入的新能力(如中斷能力)。從這裡我們也就知道了為何觀察者的onSubscribe最先被呼叫了。(被訂閱者說:我也很無辜,他自己呼叫了自己,我也控制不了╮(╯_╰)╭)
4. 被訂閱者或者說被觀察者(source)呼叫subscribe訂閱方法與觀察者發生聯絡。這裡進行了異常捕獲,如果subscribe丟擲了未被捕獲的異常,則呼叫 parent.onError(ex);
5. 在執行subscribe時也就對應了我們demo中的

public void subscribe(@NonNull ObservableEmitter e) throws Exception {
    e.onNext("hello");
    e.onNext("world");
    e.onComplete();

}

Ok,看來subscribeActual這個回撥確實很重要,前面我們也說了subscribeActual回撥方法在Observable.subscribe被呼叫時執行的,真的像我說的一樣麼?萬一我看走眼了

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

OK,程式碼不多,可以看到RxJavaPlugins.onSubscribe(this, observer);,我們RxJava2.0中的Hook能力就是來自這裡了。然後繼續看下面subscribeActual(observer);被呼叫了。

思路梳理
  1. 傳入的ObservableOnSubscribe最終被用來建立成ObservableCreate
  2. ObservableCreate持有我們的被觀察者物件以及訂閱時所觸發的回撥subscribeActual
  3. 在subscribeActual實現了我們的主要邏輯,包括observer.onSubscribe(parent);,source.subscribe(parent);,parent.onError(ex);的呼叫
  4. 在Observable的subscribe被呼叫時開始執行事件分發流程。

探索RxJava2神祕的隨意取消訂閱流程的原理

前面初步分析了RxJava從建立到執行的流程。
接著我們將探索RxJava2.x提供給我們的Disposable能力的來源。

先看一個demo。

//1、觀察者建立一個Observer
Observer observer = new Observer() {
    @Override
  public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
        disposable = d;
    }

    @Override
  public void onNext(@NonNull String s) {
        Log.d(TAG, "onNext data is :" + s);
        if (s.equals("hello")) {
            //執行了hello之後終止
  disposable.dispose();
        }
    }

    @Override
  public void onError(@NonNull Throwable e) {
        Log.d(TAG, "onError data is :" + e.toString());
    }

    @Override
  public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
  public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        Log.i(TAG, "傳送 hello");
        e.onNext("world");
        Log.i(TAG, "傳送 world");
        e.onComplete();
        Log.i(TAG, "呼叫 onComplete");

    }

});
observable.subscribe(observer);

輸出結果如下:

  onSubscribe
  onNext data is :hello
  傳送 hello
  傳送 world
  呼叫 onComplete

在傳送玩hello之後,成功終止了後面的Reactive流。從結果我們還發現,後面的Reactive流被終止了,也就是訂閱者或者觀察者收不到後面的資訊了,但是生產者或者說被訂閱者、被觀察者的程式碼還是會繼續執行的。

Ok,我們從哪開始入手呢?我們發現,在我們執行了 disposable.dispose();後,觸發了該事件,我們通過原始碼看下 disposable.dispose();到底做了什麼。

/**
 * Represents a disposable resource.
 */
public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();

    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

此時我們要回憶一下前面的一段程式碼

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
  • 我們之前分析到在執行source.subscribe(parent);觸發資料分發事件之前先執行了observer.onSubscribe(parent);這句程式碼,所傳入的parent也就對應了我們的Disposable
  • parent是CreateEmitter型別的,但是CreateEmitter是實現了Disposable介面的一個類。而parent又是我們的observer的一個包裝後的物件。
  • OK,分析到這裡我們來整理下前面的環節,根據Demo來解釋下:首先在執行下面程式碼之前
 e.onNext("hello");
 Log.i(TAG, "傳送 hello");
 e.onNext("world");
 Log.i(TAG, "傳送 world");
 e.onComplete();
 Log.i(TAG, "呼叫 onComplete");
  • 先執行了observer.onSubscribe(parent);,我們在demo中也是通過傳入的parent呼叫其dispose方法來終止Reactive流,而執行分發hello等資料的e也是我們的parent,也就是他們都是同一個物件。而執行e.onNext(“hello”);的e物件也是observer的一個包裝後的ObservableEmitter型別的物件。

總結:Observer自己來控制了Reactive流狀態。

Ok,此時如果我說關鍵點應該在ObservableEmitter這個類上面,你覺得可能性有多少呢?( ̄∇ ̄)

關鍵點就是CreateEmitter parent = new CreateEmitter(observer);這個包裝的過程,我們來看下其原始碼。

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }

    @Override
    public void setCancellable(Cancellable c) {
        setDisposable(new CancellableDisposable(c));
    }

    @Override
    public ObservableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }

    @Override
    public String toString() {
        return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
    }
}

CreateEmitter是ObservableCreate類的靜態內部類。CreateEmitter實現了ObservableEmitter, Disposable介面類,所以需實現其方法。這裡其實是使用了裝飾者模式,其魅力所在一會就會看到了。

我們可以看到在ObservableEmitter內部通過Observer< ? super T> observer儲存了我們的observer,這樣有什麼用呢?看Demo,我們在呼叫e.onNext(“hello”);時,呼叫的時ObservableEmitter物件的onNext方法,然後ObservableEmitter物件的onNext方法內部再通過observer呼叫onNext方法,但是從原始碼我們可以發現,其並不是簡單的呼叫哦。

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

1、先判斷傳入的資料是否為null
2、判斷isDisposed(),如果isDisposed()返回false則不執行onNext。

isDisposed()什麼時候會返回false呢?按照demo,也就是我們呼叫了disposable.dispose();後,disposable前面分析了就是CreateEmitter parent,我們看CreateEmitter.dispose()

@Override
public void dispose() {
    DisposableHelper.dispose(this);
}

裡面呼叫了DisposableHelper.dispose(this);,我們看isDisposed()

@Override
public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
}

RxJava的onComplete();與onError(t);只有一個會被執行的祕密原來是它?
再看另外兩個方法的呼叫

@Override
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}

@Override
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            dispose();
        }
        return true;
    }
    return false;
}

@Override
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

其內部也基本做了同樣的操作,先判斷!isDisposed()後再決定是否執行。

但是再這裡還有一點哦,我們應該知道onComplete();和onError(t)只有一個會發生,其實現原理也是通過isDisposed這個方法哦,我們可以看到,不關是先執行onComplete();還是先執行onError(t),最終都會呼叫dispose();,而呼叫了dispose();後,isDisposed()為false,也就不會再執行另外一個了。而且如果人為先呼叫onComplete再呼叫onError,onComplete不會被觸發,而且會丟擲NullPointerException異常。

小結:

此時我們的目的基本達到了,我們知道了Reactive流是如何被終止的以及RxJava的onComplete();與onError(t);只有一個會被執行的原因。

我們雖然知道了原因,但是秉著刨根問底的態度,抵擋不住內心的好奇,我還是決定挖一挖DisposableHelper這個類,當然如果不想了解DisposableHelper的話,看到這裡也就可以了;

Ok,前面分析到,程式碼裡呼叫了DisposableHelper類的靜態方法,我們看下其呼叫的兩個靜態方法分別做了什麼?

public enum DisposableHelper implements Disposable {
 DISPOSED;

 public static boolean isDisposed(Disposable d) {
   // 判斷上次記錄的終點標識的是否是 當前執行的Observer,如果是返回true
    return d == DISPOSED;
}
....
public static boolean dispose(AtomicReference field) {
    //1、current為我們當前的observer的Disposable的值,第一次呼叫時current是null
    Disposable current = field.get();
    //2、終止標識
    Disposable d = DISPOSED;
    //3、兩次不相同,說明observer未呼叫過dispose,
    if (current != d) {
        //4、將終止標識的值設定給當前的observer的Disposable,並返回設定前的observer的Disposable的值,此時如果呼叫isDisposed(Disposable d)返回的就是ture了
        current = field.getAndSet(d);
        if (current != d) {
            //第一次呼叫時會走到這裡,此時current==null,返回true,
            //current不為null時說明當前的observer呼叫了多次dispose(),而如果多次呼叫了Disposable的值還不是DISPOSED,說明之前設定失敗,所以再次呼叫dispose();
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}
....
}    

1、DISPOSED:作為是否要終止的列舉型別的標識
2、isDisposed:判斷上次記錄的終點標識的是否是 當前執行的Observer,如果是返回true
3、dispose:採用了原子性引用類AtomicReference,目的是防止多執行緒操作出現的錯誤。

總結:
  • 我們瞭解了RxJava的隨意終止Reactive流的能力的來源;
  • 過程中也明白了RxJava的onComplete();與onError(t);只有一個會被執行的祕密。
  • 實現該能力的主要方式還是利用了裝飾者模式
  • 從中體會了設計模式的魅力所在,當然我們還接觸了AtomicReference這個類,在平時估計很少接觸到。

探索RxJava2之訂閱執行緒切換原理

本次我們將探索RxJava2.x執行緒切換的實現原理。

先看一個demo。

 //1、觀察者建立一個Observer
  Observer observer = new Observer() {
      @Override
    public void onSubscribe(@NonNull Disposable d) {
          Log.d(TAG, "onSubscribe");
          Log.d(TAG,"work thread is"+Thread.currentThread().getName());
          disposable = d;
      }

      @Override
    public void onNext(@NonNull String s) {
          Log.d(TAG, "onNext data is :" + s);
          Log.d(TAG,"work thread is"+Thread.currentThread().getName());
          if (s.equals("hello")) {
              //執行了hello之後終止
              disposable.dispose();
              disposable.dispose();
          }
          CompositeDisposable compositeDisposable = new CompositeDisposable();
          compositeDisposable.dispose();

      }

      @Override
    public void onError(@NonNull Throwable e) {
          Log.d(TAG, "onError data is :" + e.toString());
      }

      @Override
    public void onComplete() {
          Log.d(TAG, "onComplete");
      }
  };

  Observable observable = Observable.create(new ObservableOnSubscribe() {
      @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
          Log.d(TAG,"work thread is"+Thread.currentThread().getName());
          e.onNext("hello");
          Log.i(TAG, "傳送 hello");
          e.onNext("world");
          Log.i(TAG, "傳送 world");
          e.onComplete();
          Log.i(TAG, "呼叫 onComplete");

      }

  });

版本1:不存線上程切換

observable.subscribe(observer);

輸出結果:

07-13 14:58:13.173 818-865/? D/RxJavaDemo2: onSubscribe
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: onNext data is :hello
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 傳送 hello
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 傳送 world
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 呼叫 onComplete
07-13 14:58:13.173 818-865/? I/TestRunner: finished: testRx(com.guanaj.rxdemo.RxJavaDemo2)

版本2:切換執行緒(切換操作是如此的瀟灑)

observable
        .subscribeOn(Schedulers.io())//切換到io執行緒
        .observeOn(AndroidSchedulers.mainThread())//切換到主執行緒
        .subscribe(observer);

輸出結果:

07-13 14:43:56.699 29727-29749/? D/RxJavaDemo2: onSubscribe
07-13 14:43:56.699 29727-29749/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:43:56.699 29727-29749/? I/TestRunner: finished: testRx(com.guanaj.rxdemo.RxJavaDemo2)
07-13 14:43:56.699 29727-29753/? D/RxJavaDemo2: work thread isRxCachedThreadScheduler-1
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 傳送 hello
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 傳送 world
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 呼叫 onComplete
07-13 14:43:56.699 29727-29727/? D/RxJavaDemo2: onNext data is :hello
07-13 14:43:56.699 29727-29727/? D/RxJavaDemo2: work thread ismain

結果分析(因為我用的是@RunWith(AndroidJUnit4.class)執行程式碼,所以在工作執行緒是AndroidJUnitRunner):
現在我們現象,後面根據現象分析原因。

沒執行緒切換的版本:

無論在哪裡呼叫subscribe,都在當前執行緒執行。

存在版本切換的版本:

1、被觀察者的onSubscribe在呼叫subscribe的執行緒中執行,
2、被觀察者的subscribe在RxJava2的RxCachedThreadScheduler-1中執行。
3、onNext工作在主執行緒

OK,現象看完了,我們開始看本質吧!但是,從哪入手呢?還是老辦法,哪裡觸發的行為就哪裡下手( ̄∇ ̄)

我們先來探索切換Observable工作執行緒的subscribeOn方法入手。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

看到了RxJavaPlugins.onAssembly,前面分析過,為hook服務,現在看成是返回傳入的Obserable即可。這裡的this為我們的observable,scheduler就是我們傳入的Schedulers.io();我們繼續看ObservableSubscribeOn;

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {....}

其繼承AbstractObservableWithUpstream

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

AbstractObservableWithUpstream繼承自Observable,其作用是通過source欄位儲存上游的Observable,因為Observable是ObservableSource介面的實現類,所以我們可以認為Observable和ObservableSource在本文中是相等的:,

也就是說ObservableSubscribeOn是對Observble進行了一次wrapper操作

我們繼續回來看ObservableSubscribeOn的原始碼

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
  //1、執行緒排程器
  final Scheduler scheduler;
  public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
      //2、儲存上游的obserble
      super(source);
      this.scheduler = scheduler;
  }

  @Override
public void subscribeActual(final Observer<? super T> s) {
      //以下為關鍵部分
      //3、對我們下游的observer進行一次wrapper
      final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
      //4、同樣,先自己呼叫自己的onSubscribe
      s.onSubscribe(parent);
      //5、(高能量聚集了)將排程的執行緒的Disposable賦值給當前的Disposable。scheduler可以看成是某個執行緒上的排程器。new SubscribeTask(parent)工作在其指定的執行緒裡面。SubscribeTask是一個Runnable,也就是說排程器觸發Runnable的run()執行,
      //***是不是恍然大悟,那麼run()裡面的程式碼就是執行在scheduler的執行緒上了。這樣也就實現了執行緒的切換了。
      parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  }
  static final class SubscribeOnObserver<T> extends AtomicReference implements Observer<T>, Disposable {....}
  ...
  }

我們開看下SubscribeTask

final class SubscribeTask implements Runnable {
  private final SubscribeOnObserver<T> parent;

  SubscribeTask(SubscribeOnObserver<T> parent) {
      this.parent = parent;
  }

  @Override
  public void run() {
      source.subscribe(parent);
  }
}

1、parent就是我們包裝後的observe,其內部儲存了下游的observer
2、source即通過ObservableSubscribeOnwrapper後儲存我們上游的observable

所以run裡面的source.subscribe(parent);即為wrapper的observer訂閱了上游的observable,觸發了上游observable的subscribeActual,開始執行資料的分發
上層obserable -》wrapper產生的observer -》真實的observser

思路梳理(重要)

Ok,分析到這裡思路基本清晰了
1、在執行subscribeOn時,在Observable和observer中插入了一個(wrapper)obserabler和(wrapper)Observer

原observable->【(Wrapper)observable||(Wrapper)Observer】->(原)Observer

2、observable.subscribe觸發->(Wrapper)Observable.subscribeActual()內部呼叫->parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));,->scheduler在指定執行緒呼叫(完成執行緒切換)->SubscribeTask.run,run內部呼叫->原Observable.subscribe((Wrapper)Observer)觸發->(原)Observable.subscribeActual()開始資料分發

此時分發給的是(Wrapper)Observer,那應該是(Wrapper)Observer分發給了(原)Observer,我們看下是不是。

OK,(Wrapper)Observer對(原)Observer進行了wrapper,我們看下原始碼:

static final class SubscribeOnObserver<T> extends AtomicReference implements Observer<T>, Disposable {
     //6、儲存下游的observer
     final Observer<? super T> actual;
     //7、儲存下游observer的Disposable,下游的Disposable交由其管理
     final AtomicReference<Disposable> s;

     SubscribeOnObserver(Observersuper T> actual) {
         this.actual = actual;
         this.s = new AtomicReference<Disposable>();
     }

     @Override
   public void onSubscribe(Disposable s) {
   //8、onSubscribe()方法在observer呼叫subscribe時觸發,Observer傳入自己的Disposable,賦值給this.s,交由當前的包裝的Observer管理。同樣是裝飾者模式的魅力所在。
         DisposableHelper.setOnce(this.s, s);
     }
     //當前Observer可以理解為下游observer和上游obserable的一箇中間observer。
     //9、這裡直接呼叫下游observer的對應方法。
     @Override
   public void onNext(T t) {
         actual.onNext(t);
     }

     @Override
   public void onError(Throwable t) {
         actual.onError(t);
     }

     @Override
   public void onComplete() {
         actual.onComplete();
     }
   //10、取消訂閱時,要同時取消下游的observer和當前的observer,因為上游obserable分發訂閱資料判斷是否需要派發時判斷的是與之最近的observer。
   //上層obserable-》wrapper產生的observer》真實的observser
     @Override
   public void dispose() {
         DisposableHelper.dispose(s);
         DisposableHelper.dispose(this);
     }

     @Override
   public boolean isDisposed() {
         return DisposableHelper.isDisposed(get());
     }
     //11、subscribeActual()中被呼叫,目的是將Schedulers返回的Worker加入管理
     void setDisposable(Disposable d) {
         DisposableHelper.setOnce(this, d);
     }
 }

確實是(Wrapper)Observer分發給了(原)Observer。
到這裡的時候,整個流程基本OK了,但是,我們在5和11處說了,排程Worker也會加入Disposable進行管理,我還是要一探究竟( ̄∇ ̄)。

有了對SubscribeOnObserver分析的鋪墊,我們現在可以分析第5處parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));的程式碼了,我們先看scheduler.scheduleDirect()這句

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    //12、以毫秒為單位,無延遲排程
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

其返回一個Disposable,我們看下這個Disposable是否真的是排程的執行緒的。

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //13、Worker實現了Disposable的一個排程工作者類
    final Worker w = createWorker();
    //14、hook,排除hook干擾,可以理解為decoratedRun==run
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //15、DisposeTask同樣是實現了Disposable的Task
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //16、開始執行
    w.schedule(task, delay, unit);
    //17、確實是返回了管理run的worker
    return task;
}

現在重點轉移到DisposeTask,我們把run給了DisposeTask,然後worker排程task開始執行run

那麼我們可以猜測w.schedule(task, delay, unit)執行後應該是排程了task的某個方法,然後task開始執行Runnable的run

是不是真的呢?我們來看下new DisposeTask(decoratedRun, w)做了什麼

static final class DisposeTask implements Runnable, Disposable {
      //18、我們外部傳入的runnable
      final Runnable decoratedRun;
      //19、排程工作者
      final Worker w;
      //20、當前執行緒
      Thread runner;
      DisposeTask(Runnable decoratedRun, Worker w) {
          this.decoratedRun = decoratedRun;
          this.w = w;
      }

      @Override
    public void run() {
          //21、獲取執decoratedRun的執行緒
          runner = Thread.currentThread();
          try {
            //22、OK,高能來了。decoratedRun的run被執行
              decoratedRun.run();
          } finally {
              dispose();
              runner = null;
          }
      }

      @Override
    public void dispose() {

          if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
              ((NewThreadWorker)w).shutdown();
          } else {
              //在DisposeTask被取消時,告訴Worker取消,因為DisposeTask是Worker進行管理的
              w.dispose();
          }
      }

      @Override
    public boolean isDisposed() {
          return w.isDisposed();
      }
  }