rxjava2.x原始碼學習隨筆
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,最初是LINQ的一個擴充套件,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個程式設計模型,目標是提供一致的程式設計介面,幫助開發者更方便的處理非同步資料流,Rx庫支援.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支援幾乎全部的流行程式語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET。
rxjava
rxjava是ReactiveX在java平臺的一個實現。是一個程式設計模型,以觀察者模式提供鏈式的介面呼叫,動態控制執行緒的切換,使得可以簡便的處理非同步資料流。
簡介
Github: ofollow,noindex">rxjava
中文文件: ReactiveX/RxJava文件中文版
官網: reactivex
特點
- 鏈式呼叫,使用簡單
- 簡化邏輯
- 靈活的執行緒排程
- 提供完善的資料操作符,功能強大
觀察者模式
觀察者模式定義物件間一種一對多的依賴關係,使得每當一個物件改變狀態,則所以依賴於它的物件都會得到通知並被自動更新。rxjava的核心設計就是採用觀察者模式。Observable是被觀察者,Observer是觀察者,通過subscribe方法進行訂閱。
- 優點
觀察者和被觀察者之間是抽象解耦,應對業務變化
增強系統靈活性、可擴充套件性
具體程式碼示例可參考設計模式-觀察者模式
- 缺點
在應用觀察者模式時需要考慮一下開發效率和執行效率問題,程式中包括一個被觀察者、多個觀察者、開發和除錯等內容會比較複雜,而且在Java中訊息的通知預設是順序執行,一個觀察者卡頓,會影響整體的執行效率,在這種情況下,一般考慮採用非同步的方式
rxjava怎麼用?
gradle引入版本
implementation 'io.reactivex.rxjava2:rxjava:2.2.0' implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
接著舉一個常用的rxjava使用的例子,我們在專案經常需要請求服務端介面,然後獲取資料,將資料進行快取,然後處理ui上的顯示。示例的程式碼如下:
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(ObservableEmitter<Response> e) throws Exception { //獲取服務端的介面資料 Request.Builder builder = new Request.Builder() .url("http://xxx.com") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, Model>() { @Override public Model apply( Response response) throws Exception { //將json資料轉化為對應的Model if (response.isSuccessful()) { ResponseBody body = response.body(); if (body != null) { Log.e(TAG, "map:轉換前:" + response.body()); return new Gson().fromJson(body.string(), Model.class); } } return null; } }).doOnNext(new Consumer<Model>() { @Override public void accept( Model s) throws Exception { //對資料進行其他快取的處理 Log.e(TAG, "doOnNext: 儲存網路載入的資料:" + s.toString() + "\n"); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Model>() { @Override public void accept(Model model) throws Exception { //重新整理ui Log.e(TAG, "成功重新整理介面:" + data.toString() + "\n"); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //進行失敗的異常提示 Log.e(TAG, "失敗處理異常:" + throwable.getMessage() + "\n"); } });
本文主要對rxjava的原始碼進行梳理分析,關於rxjava操作符的使用,推薦參考中文的文件,以及下面的博文介紹。
rxjava核心執行流程是怎樣?
rxjava主要是採用觀察者模式進行設計,當執行相關的操作符是會生成新的Observable及Observer。Observable會持有上游被觀察者,Observer會持有下游的觀察者。當執行subscribe訂閱方法的時候,通過持有上游的被觀察者物件,會往上游逐步執行訂閱方法。當執行到起始的被觀察者回調方法時,如果執行ObservableEmitter的onNext方法時,由於Observer會持有下游的Observer物件,會逐步呼叫下游的onNext方法,直到最終subscribe傳入的觀察者例項。這是rxjava鏈式呼叫的核心執行流程。
當然rxjava還涉及到執行緒的排程及資料的背壓處理,關於這些實現的原理會再後續進行梳理。但rxjava的鏈式呼叫的核心執行流程都是一致。下面我們將通過2個部分來梳理rxjava的核心執行流程,包含一些關鍵類的說明,及通過示例的程式碼相關的執行流程圖進行梳理。
關鍵類功能說明
類 | 說明 |
---|---|
ObservableSource | 介面類,只有一個subscribe方法,引數是Observer物件 |
Observer | 介面類,觀察者。有onSubscribe、onNext、onError、onComplete方法 |
Consumer | 介面有,觀察者。只有一個accept方法,在被訂閱時最終也會轉換成Observer,設計這個類是為了簡化呼叫 |
Observable | 抽象類,繼承了ObservableSource介面,操作符的實現都是繼承與它。內部封裝了大量的操作符呼叫方法,主要是有一個核心的抽象方法abstract void subscribeActual(Observer<? super T> observer),用於實現相關的訂閱分發邏輯。 |
AbstractObservableWithUpstream | 繼承於Observable,構造方法需要傳入ObservableSource
|
ObservableCreate | 繼承於AbstractObservableWithUpstream,source為ObservableOnSubscribe。subscribeActual方法會例項化一個CreateEmitter物件,執行ObservableOnSubscribe的subscribe方法 |
ObservableMap | 繼承於AbstractObservableWithUpstream,訂閱會新生產一個觀察者MapObserver |
MapObserver | ObservableMap的內部類,onNext方法會觸發mapper.apply(t)回撥,然後執行下游觀察者的onNext方法 |
ObservableDoOnEach | 繼承於AbstractObservableWithUpstream,訂閱會新生產一個觀察者DoOnEachObserver |
DoOnEachObserver | ObservableDoOnEach的內部類,onNext會執行onNext.accept(t)方法,然後執行下游觀察者的onNext方法 |
ObservableSubscribeOn | 繼承於AbstractObservableWithUpstream,被觀察者執行緒排程控制。subscribeActual會執行scheduler.scheduleDirect(new SubscribeTask(parent)),SubscribeTask的run方法會執行source.subscribe(parent)。ObservableSubscribeOn根據執行緒排程器的策略去執行上游的訂閱方法實現。 |
ObservableObserveOn | 繼承於AbstractObservableWithUpstream,觀察者執行緒排程控制。subscribeActual方法會判斷scheduler是否為TrampolineScheduler。若是則執行下游的觀察者,否會建立新的ObserveOnObserver,並傳入schedule的work。 |
ObserveOnObserver | ObservableObserveOn內部類,onNext會觸發執行schedule()方法,根據worker去控制下游觀察者的回撥執行緒 |
程式碼執行流程
首先我們根據上面demo例子,梳理出rxjava的簡單執行流程,如下圖:

通過流程圖可知,rxjava當執行相關的操作符是會生成新的Observable及Observer。Observable會持有上游被觀察者,Observer會持有下游的觀察者。當執行subscribe訂閱方法的時候,通過持有上游的被觀察者物件,會往上游逐步執行訂閱方法。當執行到起始的被觀察者回調方法時,如果執行ObservableEmitter的onNext方法時,由於Observer會持有下游的Observer物件,會逐步呼叫下游的onNext方法,直到最終subscribe傳入的觀察者例項。
瞭解了rxjava大致的執行流程,下面我們來詳細的看看原始碼的執行流程。首先還是先上一下整體的流程圖,由於圖片較大,建議結合上述的demo及rxjava的原始碼進行檢視。
下面我們分配通過幾個操作符來看看rxjava原始碼具體的實現。
create
create的操作符會返回一個ObservableCreate的被觀察者。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
接下來看看ObservableCreate物件的關鍵實現程式碼,如下:
//構造方法會傳入ObservableOnSubscribe介面的引用,指定為該被觀察者的source。 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } //核心的subscribeActual @Override protected void subscribeActual(Observer<? super T> observer) { //建立了CreateEmitter發射器 CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { //執行了ObservableOnSubscribe的subscribe回撥方法,傳入了CreateEmitter物件 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
當我們在業務程式碼執行了ObservableEmitter的onNext方法,我們看一下CreateEmitter的onNext的實現程式碼,如下:
//持有下游的觀察者引用 CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //如果沒有取消訂閱,則會執行下游的觀察者的onNext方法,達到鏈式呼叫的效果 if (!isDisposed()) { observer.onNext(t); } }
map
map的操作符會返回一個ObservableMap的被觀察者。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
接下來看看ObservableMap物件的關鍵實現程式碼,如下:
@Override public void subscribeActual(Observer<? super U> t) { //將上游的被觀察者訂閱MapObserver觀察者 source.subscribe(new MapObserver<T, U>(t, function)); }
接下來主要看看MapObserver的onNext方法,該方法會在ObservableEmitter的onNext方法觸發後被呼叫,如下:
//持有下游的觀察者和回撥函式mapper MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { //map的核心執行程式碼,mapper.apply(t)會執行資料的轉換,並將轉換後的結果v繼續交由下游的觀察者執行 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //將轉換後的結果v繼續交由下游的觀察者執行 actual.onNext(v); }
doOnNext
doOnNext的操作符會返回一個ObservableDoOnEach的被觀察者。
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate)); }
接下來看看ObservableDoOnEach物件的關鍵實現程式碼,如下:
@Override public void subscribeActual(Observer<? super T> t) { //例項化一個DoOnEachObserver的觀察者物件 source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate)); }
這裡核心我們還是要看DoOnEachObserver的onNext對於資料的處理,如下:
@Override public void onNext(T t) { if (done) { return; } try { //回撥accept方法 onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); onError(e); return; } //繼續往下游呼叫觀察者的onNext actual.onNext(t); }
subscribeOn
subscribeOn的操作符會返回一個ObservableSubscribeOn的被觀察者,並傳入scheduler執行緒排程引數。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
接下來看看ObservableSubscribeOn物件的關鍵實現程式碼,如下:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { //建立了SubscribeOnObserver的觀察者 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //這個是核心方法,呼叫了執行緒排程去的scheduleDirect方法,並傳入SubscribeTask任務 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
接下來我們看看SubscribeTask的實現,如下:
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //執行上游被觀察的訂閱方法,這裡就是subscribeOn將上游的訂閱方法控制在scheduler指定執行緒執行的核心 source.subscribe(parent); } }
最後看下SubscribeOnObserver的onNext方法,比較簡單,直接執行下游觀察者的onNext方法,如下:
@Override public void onNext(T t) { actual.onNext(t); }
關於scheduler的具體實現,在後續的執行緒原理進行分析。這裡我們只需要知道上游的被觀察者的訂閱在指定的scheduler執行緒策略中執行就可以了。
observerOn
observerOn 的操作符會返回一個ObservableObserveOn的被觀察者,並傳入scheduler執行緒排程引數。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
接下來看看ObservableObserveOn物件的關鍵實現程式碼,如下:
@Override protected void subscribeActual(Observer<? super T> observer) { //TrampolineScheduler 如果是當前的執行緒 則直接將下游的觀察者與上游的被觀察訂閱 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //其他執行緒策略 Scheduler.Worker w = scheduler.createWorker(); //將執行緒策略的worker傳入ObserveOnObserver觀察者 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
接下來關鍵還是看ObserveOnObserver的實現,如下:
@Override public void onNext(T t) { // 上一級的模式如果不是非同步的,加入佇列 if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } //進行執行緒排程 schedule(); } void schedule() { // 判斷當前正在執行的任務數目 if (getAndIncrement() == 0) { worker.schedule(this); } }
ObserveOnObserver本身繼承了Runnable介面,run方法實現如下:
@Override public void run() { //輸出結果是否融合 if (outputFused) { drainFused(); } else { drainNormal(); } }
我們先進入drainNormal方法:
void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; //第一層迴圈 for (;;) { // 檢查異常處理 if (checkTerminated(done, q.isEmpty(), a)) { return; } //第二層迴圈 for (;;) { boolean d = done; T v; //從佇列中獲取資料 v = q.poll(); boolean empty = v == null; // 檢查異常 if (checkTerminated(d, empty, a)) { return; } //如果沒有資料了,跳出 if (empty) { break; } //執行下一次操作。 a.onNext(v); } //減掉執行的次數,並獲取剩於任務數量,然後再次迴圈 //直到獲取剩餘任務量為0,跳出迴圈 missed = addAndGet(-missed); if (missed == 0) { break; } } }
關於scheduler的具體實現,在後續的執行緒原理進行分析。這裡我們只需要知道下游的觀察者的onNext在指定的scheduler執行緒策略中執行就可以了。
subscribe
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
最後的訂閱方法在做了非空檢查後,會呼叫subscribeActual方法,開始往上游逐層執行訂閱。
被觀察者Observable是如何傳送資料?
通過上面的流程分析,我們可以知道。如果使用create建立了Observable,在ObservableOnSubscribe的subscribe方法中會通過ObservableEmitter的onNext去傳送資料,onNext會觸發開始往下游觀察者傳遞資料。當然rxjava的建立型操作符還有很多,如just、from等,本質最後都是觸發下游觀察者的onNext進行資料的傳送。
觀察者Observer是如何接收到資料的?
通過原始碼分析,每一個鏈層的Observer都會持有相鄰下游的Observer物件,當開始傳送資料時,會依次鏈式執行Observer的onNext方法,最後執行到subscribe方法中建立的Observer物件。
被觀察者和觀察者之間是如何實現訂閱?
每一個鏈層的Observable 都會持有相鄰上游的Observable物件,在subscribe方法開始呼叫後,最後會執行到subscribeActual方法,在subscribeActual方法中會將觀察者與上游的被觀察執行訂閱。
rxjava是如何進行執行緒的排程?
rxjava的Scheduler有很多種實現,下面我們介紹Scheduler的相關說明,然後通過最常用的.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())來分析具體的執行緒排程流程。
Scheduler
我們在呼叫subscribeOn與observeOn時,都會傳入Scheduler物件,首先我們先看一下Scheduler的種類及其功能
Scheduler種類 | 說明 |
---|---|
Schedulers.io( ) | 用於IO密集型的操作,例如讀寫SD卡檔案,查詢資料庫,訪問網路等,具有執行緒快取機制,在此排程器接收到任務後,先檢查執行緒快取池中,是否有空閒的執行緒,如果有,則複用,如果沒有則建立新的執行緒,並加入到執行緒池中,如果每次都沒有空閒執行緒使用,可以無上限的建立新執行緒 |
Schedulers.newThread( ) | 在每執行一個任務時建立一個新的執行緒,不具有執行緒快取機制,因為建立一個新的執行緒比複用一個執行緒更耗時耗力,雖然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率沒有Schedulers.io( )高 |
Schedulers.computation() | 用於CPU 密集型計算任務,即不會被 I/O 等操作限制性能的耗時操作,例如xml,json檔案的解析,Bitmap圖片的壓縮取樣等,具有固定的執行緒池,大小為CPU的核數。不可以用於I/O操作,因為I/O操作的等待時間會浪費CPU |
Schedulers.trampoline() | 在當前執行緒立即執行任務,如果當前執行緒有任務在執行,則會將其暫停,等插入進來的任務執行完之後,再將未完成的任務接著執行 |
Schedulers.single() | 擁有一個執行緒單例,所有的任務都在這一個執行緒中執行,當此執行緒中有任務執行時,其他任務將會按照先進先出的順序依次執行 |
Scheduler.from(Executor executor) | 指定一個執行緒排程器,由此排程器來控制任務的執行策略 |
AndroidSchedulers.mainThread() | 在Android UI執行緒中執行任務,為Android開發定製 |
subscribeOn(Schedulers.io())
根據上面的分析,subscribeOn()方法最後會執行到subscribeActual方法,SubscribeTask上面分析了,繼承了Runnable介面, run方法最後會執行source.subscribe(parent)方法。
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
這裡我們主要要分析scheduler.scheduleDirect()方法。
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //建立一個Worker物件 final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //DisposeTasky也是一個包裝類 繼承了Runnable介面 DisposeTask task = new DisposeTask(decoratedRun, w); //這裡是關鍵的實現,執行了worker的schedule方法 w.schedule(task, delay, unit); return task; }
Worker的schedule是一個抽象的方法,Schedulers.io()對應的Worker實現為EventLoopWorker。我們看看EventLoopWorker的schedule實現如下:
static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } @Override public void dispose() { if (once.compareAndSet(false, true)) { tasks.dispose(); // releasing the pool should be the last action pool.release(threadWorker); } } @Override public boolean isDisposed() { return once.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
這裡會執行到 threadWorker的scheduleActual方法,繼續往下看
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
在這裡會使用executor最終去執行run方法。當然看到這裡有一個疑問IoScheduler在這裡是怎麼實現執行緒的複用呢?我們看看threadWorker在IoScheduler中的執行緒的建立,如下:
EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); }
這裡會通過維護一個Worker的執行緒池來達到執行緒複用的效果,具體我們看看CachedWorkerPool的get方法,如下:
ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } //從已經release的work執行緒佇列中獲取快取 while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); //如果找到,返回複用的執行緒 if (threadWorker != null) { return threadWorker; } } // 如果沒有,則會建立一個新的ThreadWorker ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; }
observeOn(AndroidSchedulers.mainThread())
@Override protected void subscribeActual(Observer<? super T> observer) { //如果指定當前執行緒 則不進行排程 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //建立Worker Scheduler.Worker w = scheduler.createWorker(); //例項化ObserveOnObserver觀察者並傳入Worker source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
這裡我們主要需要分析ObserveOnObserver物件,onNext實現如下:
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
關鍵還是執行了worker的schedule,AndroidSchedulers的實現主要為HandlerScheduler,HandlerScheduler中關於Worker的實現為HandlerWorker,我們看下schedule的實現如下:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } //通過handler傳送訊息執行run介面 handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }
關於handler的例項,我們看AndroidSchedulers中的建立如下:
private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false); }
綜上可知AndroidSchedulers.mainThread()是通過訊息將run方法的實現交由主執行緒Looper進行處理,達到將觀察者的資料處理在主執行緒中執行的效果
rxjava背壓策略實現原理是怎樣的?
背壓(backpressure)
當上下游在不同的執行緒中,通過Observable發射,處理,響應資料流時,如果上游發射資料的速度快於下游接收處理資料的速度,這樣對於那些沒來及處理的資料就會造成積壓,這些資料既不會丟失,也不會被垃圾回收機制回收,而是存放在一個非同步快取池中,如果快取池中的資料一直得不到處理,越積越多,最後就會造成記憶體溢位,這便是響應式程式設計中的背壓(backpressure)問題。
背壓處理機制
rxjava2.x使用Flowable來支援背壓的機制,呼叫create方法時需要傳入BackpressureStrategy策略。
Strategy | 作用 |
---|---|
MISSING | 此策略表示,通過Create方法建立的Flowable沒有指定背壓策略,不會對通過OnNext發射的資料做快取或丟棄處理,需要下游通過背壓操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背壓策略 |
ERROR | 在此策略下,如果放入Flowable的非同步快取池中的資料超限了,則會丟擲MissingBackpressureException異常 |
BUFFER | 此策略下,Flowable的非同步快取池同Observable的一樣,沒有固定大小,可以無限制向裡新增資料,不會丟擲MissingBackpressureException異常,但會導致OOM |
DROP | 在此策略下,如果Flowable的非同步快取池滿了,會丟掉上游傳送的資料 |
LATEST | 與Drop策略一樣,如果快取池滿了,會丟掉將要放入快取池中的資料,不同的是,不管快取池的狀態如何,LATEST都會將最後一條資料強行放入快取池中 |
實現原理
首先看看Flowable的create實現
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) { ObjectHelper.requireNonNull(source, "source is null"); ObjectHelper.requireNonNull(mode, "mode is null"); return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode)); }
這裡會建立一個FlowableCreate物件,並傳入指定的BackpressureStrategy策略。接著看看FlowableCreate的訂閱方法
@Override public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; //根據不同的策略初始化不同的資料發射器 switch (backpressure) { case MISSING: { emitter = new MissingEmitter<T>(t); break; } case ERROR: { emitter = new ErrorAsyncEmitter<T>(t); break; } case DROP: { emitter = new DropAsyncEmitter<T>(t); break; } case LATEST: { emitter = new LatestAsyncEmitter<T>(t); break; } default: { emitter = new BufferAsyncEmitter<T>(t, bufferSize()); break; } } t.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); } }
BaseEmitter
abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription { private static final long serialVersionUID = 7326289992464377023L; final Subscriber<? super T> actual; final SequentialDisposable serial; BaseEmitter(Subscriber<? super T> actual) { this.actual = actual; this.serial = new SequentialDisposable(); } } //這裡需要注意的是,Request最終會把n負責給AtomicLong @Override public final void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); onRequested(); } } //省略其他若干方法
通過上面的結束我們知道Flowable有一個緩衝池,那個這個大小是多少,在哪裡進行復制給發射器呢?
//長度是128 static final int BUFFER_SIZE; static { BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128)); } public static int bufferSize() { return BUFFER_SIZE; } //在呼叫observeOn時,會將長度最後傳給emitter發射器,具體可以打斷的追蹤檢視呼叫鏈 public final Flowable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }
MissingEmitter
不會對通過OnNext發射的資料做快取或丟棄處理
@Override public void onNext(T t) { if (isCancelled()) { return; } if (t != null) { actual.onNext(t); } else { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } for (;;) { long r = get(); if (r == 0L || compareAndSet(r, r - 1)) { return; } } }
NoOverflowBaseAsyncEmitter
DropAsyncEmitter和ErrorAsyncEmitter繼承了NoOverflowBaseAsyncEmitter
@Override public final void onNext(T t) { if (isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //如果數量不為0則減1,通過上面的Request,可以知道get()為Flowable的BUFFER_SIZE 128 if (get() != 0) { actual.onNext(t); BackpressureHelper.produced(this, 1); } else { //超出閾值 執行onOverflow onOverflow(); } }
DropAsyncEmitter
如果Flowable的非同步快取池滿了,會丟掉上游傳送的資料
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> { private static final long serialVersionUID = 8360058422307496563L; DropAsyncEmitter(Subscriber<? super T> actual) { super(actual); } @Override void onOverflow() { // nothing to do } }
ErrorAsyncEmitter
如果Flowable的非同步快取池滿了,會丟擲異常
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> { private static final long serialVersionUID = 338953216916120960L; ErrorAsyncEmitter(Subscriber<? super T> actual) { super(actual); } @Override void onOverflow() { onError(new MissingBackpressureException("create: could not emit value due to lack of requests")); } }
BufferAsyncEmitter
Flowable的非同步快取池同Observable的一樣,沒有固定大小,可以無限制向裡新增資料
@Override public void onNext(T t) { if (done || isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //加入佇列 queue為SpscLinkedArrayQueue佇列 queue.offer(t); //通知消費 drain(); }
LatestAsyncEmitter
Flowable的非同步快取池同Observable的一樣,沒有固定大小,可以無限制向裡新增資料
@Override public void onNext(T t) { if (done || isCancelled()) { return; } if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //進行覆蓋 queue為AtomicReference queue.set(t); //通知消費 drain(); }