拆輪子系列:RxJava
目錄


整體思路
根據對RxJava使用的基本認識,個人覺得解析RxJava關鍵在於抓住以下幾個問題:
- 事件流源頭(observable)怎麼發出資料
- 響應者(subscriber)怎麼收到資料
- 操作符如何運作(operator/transformer)
- 整個過程的排程(scheduler)
需要說明的一點是,本文基於RxJava1.3.0,RxJava當前最新版本已經升級到了 2.2.4,後續會單開文章講述版本之間的變化。
在具體講述之前,先來介紹RxJava核心的三個類:
Observable
先來看一下原始碼中的說明:The Observable class that implements the Reactive Pattern.
它其實是一次觀察者模式實現的排程者。所謂一個觀察者模式在RxJava中指的是一次subscribe。
一次subscribe的實質可以抽象成下述程式碼,這個抽象很重要,後續的一系列變換都是基於這個抽象來做的:
public class Observable<T> { final OnSubscribe<T> onSubscribe; protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; } public final Subscription subscribe(Subscriber<? super T> subscriber) { this.onSubscribe.call(subscriber); } }
OnSubscribe
同樣的,先來看一下OnSubscribe的官方說明:
/** * Invoked when Observable.subscribe is called. * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity } public interface Action1<T> extends Action { void call(T t); }
Subscriber
Subscriber是介面Observer的抽象子類,
public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t); }
RxJava應用及一次訂閱的流程分析
我們先來看一下RxJava的一個基本示例,然後以此為引子,進行整個流程的追蹤和分析.
這個過程很簡單,通過Observable.just發射資料,經過一次map轉換,經過subscribeOn、observeOn切換執行緒,最後通過subscribe實現訂閱。
Observable .just("Observable.create! User Observable.just!") .map(new Func1<String, String>() { @Override public String call(String s) { return "Observable.create! User Observable.map!"; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i("RxJava", "print message: " + s); } });
簡單梳理一下整個過程的物件轉換關係如下:
just
先來看一下just的呼叫過程
public static <T> Observable<T> just(final T value) { return ScalarSynchronousObservable.create(value); } public static <T> ScalarSynchronousObservable<T> create(T t) { return new ScalarSynchronousObservable<T>(t); } protected ScalarSynchronousObservable(final T t) { super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t))); this.t = t; }
從程式碼中可以看出,其核心過程是:
- 我們建立的是 ScalarSynchronousObservable,一個 Observable 的子類;
- ScalarSynchronousObservable的建構函式中傳入了一個JustOnSubscribe類,這是一個OnSubscribe的實現類。
這裡我們可以這麼理解,Observable的建構函式傳入了一個OnSubscribe,這是一個回撥,它有一個回撥方法void call(T t); 這裡我們先記住這個call回撥,後面再把整個過程串起來。
看一下JustOnSubscribe的具體實現:
static final class JustOnSubscribe<T> implements OnSubscribe<T> { final T value; JustOnSubscribe(T value) { this.value = value; } @Override public void call(Subscriber<? super T> s) { s.setProducer(createProducer(s, value)); } } static <T> Producer createProducer(Subscriber<? super T> s, T v) { // ... return new WeakSingleProducer<T>(s, v); }
我們再來看WeakSingleProducer的原始碼,在request方法中,可以看到呼叫了onNext() 和 onComplete(),這樣,just中的資料就被創造並傳遞出來了。
static final class WeakSingleProducer<T> implements Producer { // ... @Override public void request(long n) { // 省略狀態檢查程式碼 Subscriber<? super T> a = actual; if (a.isUnsubscribed()) { return; } T v = value; try { a.onNext(v); } catch (Throwable e) { Exceptions.throwOrReport(e, a, v); return; } if (a.isUnsubscribed()) { return; } a.onCompleted(); } }
map
map它是一種轉換,將上游輸入的資料轉換之後,傳遞到下游。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return unsafeCreate(new OnSubscribeMap<T, R>(this, func)); } public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { ... @Override public void call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } }
OnSubscribeMap類是OnSubscribe的子類,unsafeCreate()方法就是通過傳入的OnSubscribe構造一個Observable。這一點和just方法本質上是一樣的,通過OnSubscribe構造一個Observable例項。所以Map的本質就是將一個Observable轉換成另外一個Observable,期間會回撥call方法。
那麼,map的call方法具體做了什麼呢?
- 建立了一個MapSubscriber;
- 將MapSubscriber加入到Subscriber的父鏈中;
- 修正訂閱關係,source Observable訂閱的是MapSubscriber,意思是在map之前訂閱的是subscriberA,此時訂閱的就是新的MapSubscriber,而MapSubscriber是subscriberA的parent,它們會有一個巢狀關係
MapSubscriber的原始碼如下,其過程還是比較直接的:
- 上游每新來一個數據,就用我們給的 mapper 進行資料轉換。
- 再把轉換之後的資料傳送給下游。
static final class MapSubscriber<T, R> extends Subscriber<T> { ... public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } }
subscribe
下面我們再來看subscribe的過程,這是Subscriber對OnSubscribe的訂閱過程。
public final Subscription subscribe(final Action1<? super T> onNext) { // 省略引數檢查程式碼 Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED; Action0 onCompleted = Actions.empty(); return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));// 1 } public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // 省略引數檢查程式碼 subscriber.onStart();// 2 if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber);// 3 } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);// 4 return RxJavaHooks.onObservableReturn(subscriber);// 5 } catch (Throwable e) { // 省略錯誤處理程式碼 } }
- 我們首先對傳入的 Action 進行包裝,包裝為 ActionSubscriber,一個 Subscriber 的實現類。
- 呼叫 subscriber.onStart() 通知 subscriber 它已經和 observable 連線起來了。這裡我們就知道,onStart() 就是在我們呼叫 subscribe() 的執行緒執行的。
- 如果傳入的 subscriber 不是 SafeSubscriber,那就把它包裝為一個SafeSubscriber。
- 我們跳過 hook,認為它什麼也沒做,那這裡我們呼叫的其實是observable.onSubscribe.call(subscriber),這裡我們就看到了前面提到的 onSubscribe 的使用程式碼,在我們呼叫 subscribe() 的執行緒執行這個回撥。
- 跳過 hook,那麼這裡就是直接返回了subscriber, Subscriber繼承了Subscription,用於取消訂閱。
我們應該還記得OnSubscribeMap中的call方法吧,這裡的observable.onSubscribe.call(subscriber)呼叫的就是OnSubscribeMap.call()方法。
在OnSubscribeMap.call()之中,有一段程式碼:source.unsafeSubscribe(parent);它會繼續回溯去呼叫上一個observable.onSubscribe.call()的call方法,而這個call方法就是JustOnSubscribe中的call方法
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // 省略錯誤處理程式碼 } return Subscriptions.unsubscribed(); } }
整個過程如下:

RxJava順序流程.png
這裡我們可以看到RxJava中存在這樣一種巢狀關係:

RxJava回溯執行的過程.png
執行緒排程
前面的過程都是通過函式呼叫來完成的,都在subscribe所在的執行緒執行,RxJava進行非同步非常簡單,只需要使用 subscribeOn 和 observeOn 這兩個操作符即可。既然它倆都是操作符,那流程上就是和 map 差不多的,這裡我們主要關注執行緒排程的實現原理。subscribeOn和observeOn操作符的呼叫者是Observable<T>,方法引數是Scheduler,它們的區別是subscribeOn決定的是上游Observable的執行執行緒,observeOn決定的是下游的Subscriber回撥執行的執行緒,下面我們來看具體是怎麼實現的。
subscribeOn
追蹤subscribeOn的呼叫過程,其呼叫過程通過OperatorSubscribeOn進行了一次轉換。過程如下:
- 獲取Scheduler中的Worker物件inner;
- 將Subscriber包裝成SubscribeOnSubscriber,這個是parentSubcriber;
- inner.schedule(parent) 執行具體過程
- SubscribeOnSubscriber中的setProducer方法中,做了進一步的執行緒排程
- 如果當前是在同一個執行緒中,直接request;
- 如果不在同一個執行緒中,發生一次執行緒排程
那麼,這兩次排程有什麼區別呢?簡單的說:
inner.schedule(parent)排程影響的是Subscriber的回撥,也就是下游的監聽;
setProducer排程影響的是上游資料的request;
所以subscribeOn影響的是上下游的執行執行緒,下游如果要切換執行緒,需要通過observeOn進行切換
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; final boolean requestOn; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; this.requestOn = requestOn; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source); subscriber.add(parent); subscriber.add(inner); inner.schedule(parent); } ... }
SubscribeOnSubscriber
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 { ... @Override public void setProducer(final Producer p) { actual.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread() || !requestOn) { p.request(n); } else { worker.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }
observeOn
同樣的,我們追蹤observeOn。過程如下:
- 建立OperatorObserveOn,繼承自Operator;
- 通過lift操作符進行切換
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, RxRingBuffer.SIZE); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
OnSubscribeLift
它的邏輯是先對下游 subscriber 用操作符進行處理,處理會返回一個新的subscriber,然後通知處理後的 subscriber,它將要和 observable 連線起來了,最後把它和上游連線起來。
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { @Override public void call(Subscriber<? super R> o) { ... Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); st.onStart(); parent.call(st); ... } }
OperatorObserveOn
作為操作符的邏輯,也比較簡單,如果 scheduler 是 ImmediateScheduler/TrampolineScheduler,就什麼也不做,否則就把 subscriber 包裝為 ObserveOnSubscriber,看來髒活累活都是 ObserveOnSubscriber 乾的了。
public final class OperatorObserveOn<T> implements Operator<T, T> { // ... @Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>( scheduler, child, delayError, bufferSize); parent.init(); return parent; } } // ... }
ObserveOnSubscriber
繼續看ObserveOnSubscriber,它是observeOn只有生成的新的subscriber, 下面的原始碼是簡化之後的實現。我們可以看到它排程了每個單獨的subscriber.onXXX() 方法。所以這就是observeOn排程隻影響subscriber的原因了!!!!
Observable.create(subscriber -> { Worker worker = scheduler.createWorker(); subscriber.add(worker); source.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onNext(T t) { worker.schedule(() -> subscriber.onNext(t)); } @Override public void onError(Throwable e) { worker.schedule(() -> subscriber.onError(e)); } @Override public void onCompleted() { worker.schedule(() -> subscriber.onCompleted()); } }); });
RxJava應用舉例
使用RxJava實現從DB load 資料
通過Observable 提供的系列create方法建立, create系列方法有:
-
Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure)
-
Observable<T> unsafeCreate(OnSubscribe<T> f)
-
Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe)
-
Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe)
這裡採用第二個方法建立:
Observable.unsafeCreate(new rx.Observable.OnSubscribe<Data>() { @Override public void call(Subscriber<? super Data> subscriber) { Data data = null; // sql操作,loadFromDB subscriber.onNext(data); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .subscribe(new Action1<Data>() { @Override public void call(Data data) { // handle data } });
使用RxJava實現分頁資料載入
開發中,我們會遇到這樣的場景,某個介面採用分頁拉取方式,初始化時我們可能需要迴圈去拉,一次性把資料全部拉取到,假定你不能通過limit設定成無限大的方法拉取一次。這種場景,一般處理可能是迴圈迭代拉,如果採用RxJava則會非常方便。
protected void fetchPatients() { Observable observable = Observable.range(0, Integer.MAX_VALUE) .concatMap(new Func1<Integer, Observable<List<Data>>>() { @Override public Observable<List<Data>> call(Integer page) { return getPageObservable(page); } }) .takeWhile(new Func1<List<Data>, Boolean>() { @Override public Boolean call(List<Data> data) { return data.size() < FETCH_LIMIT; } }).reduce(new ArrayList<Data>(), new Func2<ArrayList<Data>, List<Data>, ArrayList<Data>>() { @Override public ArrayList<Data> call(ArrayList<Data> datas, List<Data> datas2) { datas.addAll(datas2); return datas; } }) .map(new Func1<List<Data>, List<Data>>() { @Override public List<Data> call(List<Data> datas) { // do some last handle return datas; } }).subscribeOn(Schedulers.io()) .observeOn(Schedulers.trampoline()) .subscribe(new Action1() { @Override public void call(List<Data> datas) { } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }); } protected Observable getPageObservable(int page) { Observable observable = apiService.getPager(page, FETCH_LIMIT) .map(new Func1<List<Data>, List<Data>>() { @Override public List<Data> call(List<Data> datas) { // do some pre handle return datas; } }); return observable; }
總結
本文從最簡單的用例出發,追蹤了RxJava的完整過程,也響應了文章開頭所提的四個步驟:
- 事件流源頭(observable)怎麼發出資料
- 響應者(subscriber)怎麼收到資料
- 操作符如何運作(operator/transformer)
- 整個過程的排程(scheduler)
關於RxJava,還有兩個核心的問題:
- RxJava排程器Scheduler
- RxJava中的背壓概念
這兩個問題我會在後續的文章中繼續論述