RxJava2 中 doFinally 和 doAfterTerminate 的比較

鄰家小妹.jpg
在 RxJava 中 doFinally 和 doAfterTerminate 這兩個操作符很類似,都會在 Observable 的 onComplete 或 onError 呼叫之後進行呼叫。
使用了這兩個操作符在 Observable 結束後,會呼叫 doFinally、doAfterTerminate 所提供的 Action。
這兩個操作符雖然有一定的相似度,但他們依然有差別。並且兩者在使用時,會存在呼叫的先後順序。
doAfterTerminate
從 doAfterTerminate 操作符的原始碼來看
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable<T> doAfterTerminate(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, onFinally); }
它呼叫的是 doOnEach() 方法。
其實,doOnNext、doOnError、doOnComplete、doOnTerminate 等這些操作符也是呼叫 doOnEach() 方法。
doOnEach() 實際上呼叫的是 ObservableDoOnEach 類。( RxJavaPlugins.onAssembly 本身是一個 hook 方法,會返回一個 Observable 物件。)
doOnEach() 需要四個引數:onNext、onError、onComplete、onAfterTerminate。doAfterTerminate 操作符的引數 onFinally 對應的是 onAfterTerminate。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate)); }
ObservableDoOnEach 繼承了 Observable 類,並實現了它的抽象方法 subscribeActual()。該方法是 Observable 和 Observer 連線的紐帶。其中,source 代表了被觀察者 Observable 本身,而 DoOnEachObserver 是實際的觀察者。
@Override public void subscribeActual(Observer<? super T> t) { source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate)); }
從 DoOnEachObserver 中的 onError()、onComplete() 方法中可以看到 onAfterTerminate 是在 downstream.onError(t) 或者 downstream.onComplete() 之後,才執行 run()。這也符合我們最初對它的認識。
@Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } done = true; try { onError.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); t = new CompositeException(t, e); } downstream.onError(t); try { onAfterTerminate.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); } } @Override public void onComplete() { if (done) { return; } try { onComplete.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } done = true; downstream.onComplete(); try { onAfterTerminate.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); } }
doFinally
doFinally 是 RxJava 在 2.0.1 版本新增的操作符。
doFinally 除了擁有 doAfterTerminate 的特性之外,還會在下游(downstream)取消時被呼叫。這是 doFinally 和 doAfterTerminate 最大的區別。
同樣,看一下 doFinally 的原始碼。它呼叫的是 ObservableDoFinally 類。
public final Observable<T> doFinally(Action onFinally) { ObjectHelper.requireNonNull(onFinally, "onFinally is null"); return RxJavaPlugins.onAssembly(new ObservableDoFinally<T>(this, onFinally)); }
在 ObservableDoFinally 類中,從 subscribeActual() 可以看出它的實際觀察者是 DoFinallyObserver 類。其中,onFinally 是 doFinally 操作符所傳遞的引數。
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new DoFinallyObserver<T>(observer, onFinally)); }
在 DoFinallyObserver 類中的 onError、onComplete、dispose 方法中都會呼叫 runFinally() 方法。而 runFinally() 執行的正是 onFinally 的 run()。
@Override public void onError(Throwable t) { downstream.onError(t); runFinally(); } @Override public void onComplete() { downstream.onComplete(); runFinally(); } @Override public void dispose() { upstream.dispose(); runFinally(); } ...... void runFinally() { if (compareAndSet(0, 1)) { try { onFinally.run(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); RxJavaPlugins.onError(ex); } } }
所以,從原始碼可以得出在 doFinally 是在觀察者執行完 onError、onComplete 或取消時執行的操作。
二者的順序
doFinally 和 doAfterTerminate 都會在 onComplete 之後才執行,那麼它們二者的順序是如何呢?
不妨寫一段程式碼測試一下:
Observable.just("Hello World") .doOnNext(s -> System.out.println("doOnNext:")) .doFinally(() -> System.out.println("doFinally:")) .doAfterTerminate(() -> System.out.println("doAfterTerminate:")) .subscribe( s -> System.out.println("onNext:" + s), throwable -> System.out.println("onError:"), () -> System.out.println("onComplete:"));
執行結果:
doOnNext: onNext:Hello World onComplete: doAfterTerminate: doFinally:
發現 doFinally 是在 doAfterTerminate 之後呼叫。
那麼交換一下它們的順序會如何呢?
Observable.just("Hello World") .doOnNext(s -> System.out.println("doOnNext:")) .doAfterTerminate(() -> System.out.println("doAfterTerminate:")) .doFinally(() -> System.out.println("doFinally:")) .subscribe( s -> System.out.println("onNext:" + s), throwable -> System.out.println("onError:"), () -> System.out.println("onComplete:"));
執行結果:
doOnNext: onNext:Hello World onComplete: doFinally: doAfterTerminate:
這一次,doFinally 先執行而 doAfterTerminate 後執行。
因為,它們都需要在 downstream.onComplete() 執行之後,才會執行。而 downstream 對應的下游是觀察者。下流的資料流向跟上游的資料流向是相反的,從下向上的。所以,離觀察者越近,就越先執行。這就是兩段程式碼執行順序不同的緣故。
最後,寫一個極端一點的例子,先後呼叫 doFinally、doAfterTerminate、doFinally、doAfterTerminate:
Observable.just("Hello World") .doOnNext(s -> System.out.println("doOnNext:")) .doFinally(() -> System.out.println("doFinally1:")) .doAfterTerminate(() -> System.out.println("doAfterTerminate1:")) .doFinally(() -> System.out.println("doFinally2:")) .doAfterTerminate(() -> System.out.println("doAfterTerminate2:")) .subscribe( s -> System.out.println("onNext:" + s), throwable -> System.out.println("onError:"), () -> System.out.println("onComplete:"));
執行結果:
doOnNext: onNext:Hello World onComplete: doAfterTerminate2: doFinally2: doAfterTerminate1: doFinally1:
在 onComplete 呼叫之後,先列印了"doAfterTerminate2:",再列印"doFinally2:",然後列印"doAfterTerminate1:",最後列印"doFinally1:"。這正好符合剛才的分析。
總結
本文是對 doFinally 和 doAfterTerminate 兩個操作符的總結。也是對《RxJava 2.x 實戰》一書中,第二章第一節最後一部分內容do操作符的補充。
只有瞭解原始碼,才能更踏實地去寫我們的程式。