小白讀原始碼 | RxJava2 入門篇(一)
轉載請註明出處:ofollow,noindex">juejin.im/user/590207…
題記: RxJava2 想必很多人都用過,擴充套件的觀察者模式,簡潔的鏈式呼叫,通過簡單的API呼叫就可以滿足我們的各種需求,讓人不禁感嘆這玩意兒真爽。當然在我們用著很爽的時候,不禁也會對它產生一些好奇,這玩意兒到底長是個啥模樣,嗯,想看看,那就看看吧。花了些時間看了看它的部分原始碼,作此記錄。
引子
既然我是隻小白,還挑什麼呢,撿最容易的上手噻,Flowable (帶背壓模式的被觀察者),我還沒有看,這裡僅記錄普通的 Observable 原始碼閱讀過程 。下面程式碼即為眾所周知的入門用法,本篇文章就圍繞它來闡述。這是我使用的版本:
compile 'io.reactivex.rxjava2:rxjava:2.1.1' compile 'io.reactivex.rxjava2:rxandroid:2.1.0' 複製程式碼
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "Observable emit 1 "); emitter.onNext(1); Log.d(TAG, "Observable emit 2"); emitter.onNext(2); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe: isDisposable " + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); 複製程式碼
一 、關鍵概念
Observable
(被觀察者):這是一個抽象類,裡面方法眾多,就不列舉了,讀的時候遇到哪個看哪個。Observer
(觀察者):這是個介面,裡面有 4 個方法,是必須都要知道的。
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); } 複製程式碼
ObservableOnSubscribe
(事件發射器的載體):
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; } 複製程式碼
ObservableEmitter
(事件發射器):這是個介面,繼承了 Emitter 介面,用於傳送事件。
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); @NonNull ObservableEmitter<T> serialize(); boolean tryOnError(@NonNull Throwable t); } public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); } 複製程式碼
這些關鍵概念必須要記住,至少大體知道都是什麼,裡面都有些什麼方法。
二、直奔核心
既然這是個鏈式呼叫,我們不妨從頭到尾過一遍。Observable.create(new ObservableOnSubscribe<Integer>(){...})
建立了一個Observable物件,那就進create()
這個靜態方法看一看
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // 判空程式碼,不重要,不看也罷 ObjectHelper.requireNonNull(source, "source is null"); // 建立 Observable 物件的關鍵程式碼 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 複製程式碼
create()
方法裡只有兩行程式碼,我們重點來看看RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
這行程式碼 。很明顯分為兩部分,onAssembly()
和new ObservableCreate<T>(source)
,我們先看onAssembly()
,點進去發現是
@SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } // 上面兩行程式碼是和使用 map 操作符相關的,我們這裡還沒用 map 操作符呢,對我們沒啥卵用,當它不存在, // 那麼方法引數裡傳進來一個 Observable 型別的 source, 現在被原原本本當做返回值返回回去 // 意思就很明顯,Observable 物件是由 new ObservableCreate<T>(source) 生成的 return source; } 複製程式碼
既然onAssembly()
把引數原樣作為返回值返回,那Observable.create(new ObservableOnSubscribe<Integer>(){...})
建立的Observable
物件就是new ObservableCreate<T>(source)
了,那我們就來看new ObservableCreate<T>(source)
,點進去看關鍵程式碼
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } // 方法名暴露了真相 "實際訂閱,真實訂閱" @Override protected void subscribeActual(Observer<? super T> observer) { // 1.建立 CreateEmitter 物件,引數傳的是 observer, 這個 observer 從哪冒出來的,待會兒就知道了 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2.並且這個 observer 還訂閱了 CreateEmitter 物件 observer.onSubscribe(parent); try { // 3.source 也訂閱了 CreateEmitter 物件 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... } 複製程式碼
首先ObservableCreate
繼承Observable
,那ObservableCreate
就是個Observable
了,對,就是這麼簡單,ObservableCreate
就是我們要找的Observable
物件 。然後上面subscribeActual()
方法裡添加註釋的那 3 行程式碼講的很清楚,一個是CreateEmitter
(發射器),一個是observer
(觀察者),一個是source
(這個source
就是ObservableOnSubscribe
,下面我就以ObservableOnSubscribe
指代source
),看到這 3 行程式碼,我們就敢假設整個觀察者模式的訊息訂閱與釋出就是由這 3 行程式碼控制的,要驗證假設,我們還需再往下讀原始碼。既然observer
和ObservableOnSubscribe
都與CreateEmitter
有關,我們就來看看CreateEmitter
,
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } // 每次一定會先判斷連線有沒有切斷(就是有沒有 dispose),沒有切斷才接收事件 // 這這個判斷就保證了一旦切斷肯定就收不到事件了 if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { // 如果連線已經切斷,還呼叫 Observer 的 onError() 方法,那就拋異常了 if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } // 如果連線沒有切斷,就呼叫 Observer 的 onError() 方法 if (!isDisposed()) { try { observer.onError(t); } finally { // 如果連線沒有切斷,在呼叫 Observer 的 onError() 方法後,一定會呼叫 dispose() 切斷連線 dispose(); } return true; } return false; } @Override public void onComplete() { // // 如果連線沒有切斷,就呼叫 Observer 的 onComplete() 方法 if (!isDisposed()) { try { observer.onComplete(); } finally { // 如果連線沒有切斷,在呼叫 Observer 的 onComplete() 方法後,一定會呼叫 dispose() 切斷連線 dispose(); } } } ... } 複製程式碼
可以看到,
CreateEmitter
繼承ObservableEmitter<T>
和Disposable
,那它就既是個ObservableEmitter
,又是個Disposable
,那它什麼時候是ObservableEmitter
,又什麼時候是Disposable
呢,當然是
在observer.onSubscribe(parent)
裡它是Disposable
,在source.subscribe(parent)
裡它是ObservableEmitter
。
為什麼這麼說呢,我們再接著看你就明白了。這裡我先講講source.subscribe(parent)
,其實這句程式碼就是ObservableOnSubscribe.subscribe(ObservableEmitter)
,再看清楚些就是
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception { ... } }).subscribe(new Observer<Integer>() { ... }); 複製程式碼
上面幾行程式碼就能解釋
“ 在source.subscribe(parent)
裡它是ObservableEmitter
”
這句話 。
接下來看連線Observable
和Observer
的subscribe()
方法,
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { // 判空,不用看 ObjectHelper.requireNonNull(observer, "observer is null"); try { // 1.這個其實沒啥用,我們用的是最簡單的用法,所以引數傳的是什麼,返回值將它原樣返回 observer = RxJavaPlugins.onSubscribe(this, observer); // 判空,不用看 ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 2.關鍵,進入這個方法發現它是 Observale 類裡的一個抽象方法,這個抽象方法在哪裡實現呢, // 就在建立 Observable 物件的 create() 方法裡的 new ObservableCreate<T>(source) 裡 // 不信可以翻看上面介紹 new ObservableCreate<T>(source) 的程式碼 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { ... } } 複製程式碼
subscribe()
方法裡的subscribeActual(observer)
方法在new ObservableCreate<T>(source)
裡重寫了,翻看new ObservableCreate<T>(source)
的程式碼後,你會發現它裡面的subscribeActual
方法裡的observer
就是subscribe()
方法裡的observer
。
// 方法名暴露了真相 "實際訂閱,真實訂閱" @Override protected void subscribeActual(Observer<? super T> observer) { // 1.建立 CreateEmitter 物件,引數傳的是 observer, 這個 observer 從哪冒出來的,待會兒就知道了 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2.並且這個 observer 還訂閱了 CreateEmitter 物件 observer.onSubscribe(parent); try { // 3.source 也訂閱了 CreateEmitter 物件 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 複製程式碼
既然subscribe()
方法裡的observer
在subscribeActual
方法裡執行了observer.onSubscribe(parent)
,那我們就來看下subscribe()
方法的引數Observer
物件吧,
...subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe: isDisposable " + d.isDisposed()); } @Override public void onNext(@NonNull Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } } ... 複製程式碼
看沒看到public void onSubscribe(@NonNull Disposable d)
, 在這裡就可以解釋
“ 在observer.onSubscribe(parent)
裡它是Disposable
”
這句話 。
既然
“ 在source.subscribe(parent)
裡它是ObservableEmitter
”
和
“ 在observer.onSubscribe(parent)
裡它是Disposable
”
都解釋清楚了,那Observable
和Observer
之間千絲萬縷的聯絡也就全在上述 2 句話裡了,Observable
和Observer
的事件釋出和接收就是這 2 行程式碼起的作用。
這裡還可以解釋一個問題,為什麼在列印日誌時我們發現
// Observer 裡的 onSubscribe(@NonNull Disposable d) 方法先執行 12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: onSubscribe: isDisposable false // Observable 裡的 subscribe(@NonNull ObservableEmitter<Integer> emitter) 方法後執行 12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: Observable emit 1 12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: onNext: 1 12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: Observable emit 2 12-15 19:05:39.666 18795-18795/com.persist.rxjava D/MainActivity: onNext: 2 12-15 19:05:39.666 18795-18795/com.persist.rxjava D/MainActivity: onComplete: 複製程式碼
因為在subscribeActual()
方法裡它們的先後順序已經定了,註釋 2 和註釋 3就是它們先後順序了,可以看下面程式碼。而且這也就是說Observable
和Observer
建立連線後,ObservableEmitter
才開始傳送事件。
// 方法名暴露了真相 "實際訂閱,真實訂閱" @Override protected void subscribeActual(Observer<? super T> observer) { // 1.建立 CreateEmitter 物件,引數傳的是 observer, 這個 observer 從哪冒出來的,待會兒就知道了 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 2.並且這個 observer 還訂閱了 CreateEmitter 物件 observer.onSubscribe(parent); try { // 3.source 也訂閱了 CreateEmitter 物件 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 複製程式碼