RxJava 2.x 程式設計實戰 - 02 基本模式
原文地址:樑桂釗的部落格
部落格地址:blog.720ui.com
歡迎轉載,轉載請註明作者及出處,謝謝!
RxJava 2.x 提供了五種模式,如下所示。
模式/型別 |
描述 |
介面 |
消費者 |
Observable
|
支援 0...N個數據 不支援背壓 |
io.reactivex.Observable
|
Observer
|
Flowable
|
支援 0...N個數據 支援背壓 |
io.reactivex.Flowable
|
Subscriber
|
Single
|
只支援1個數據 |
io.reactivex.Single
|
SingleObserver
|
Completable
|
不支援資料 |
io.reactivex.Completable
|
CompletableObserver
|
Maybe
|
只支援0或1個數據 |
io.reactivex.Maybe
|
MaybeObserver
|
Observable
建立 Observable 非常容易,我們首先需要建立一個 Observable 作為被觀察者,然後在建立一個 Observer 作為觀察者,然後通過 subscribe() 進行訂閱。
public class ObservableDemo { public static void main(String[] args) { Observable<String> observable = Observable.create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }); Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Observer.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Observer.onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("Observer.onError"); } @Override public void onComplete() { System.out.println("Observer.onComplete"); } }; observable.subscribe(observer); } } 複製程式碼
我們可以使用 create 建立一個 Observable,它擁有 onNext, onError, onCompleted 方法。其中,onNext用於發射資料項,可以多次呼叫,每呼叫一次發射一條資料, onError 或 onCompleted 只能呼叫一次,onError發射錯誤事件,除非使用 retry() 操作符來截獲錯誤,否則事件流通常會終止。onCompleted 傳遞一個完成事件,表示不會再發生onNext呼叫。兩者之間互斥,此後就不能再呼叫該 Observable 的其他方法。
這裡,我們也可以改造成鏈式呼叫。
public class ObservableDemo2 { public static void main(String[] args) { Observable.<String>create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("Observer.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Observer.onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("Observer.onError"); } @Override public void onComplete() { System.out.println("Observer.onComplete"); } }); } } 複製程式碼
閱讀 RxJava 2.x 原始碼 io.reactivex.Observable
,我們可以知道 subscribe 具有很多過載的方法。有興趣的讀者,可以深入瞭解下。 我們 可以省略 onComplete(),只實現 onNext() 和 onError()。這將不再對 onComplete() 執行任何操作。我們甚至可以忽略 onError(),只指定 onNext()。但是,不實現 onError() 是在生產環境中應該避免的事情。在事件流的任何地方發生的錯誤都將傳播到 onError() 進行處理,然後終止事件流。如果我們沒有為 onError() 指定一個操作,那麼該錯誤將不會處理。當然,如果出現錯誤,我們可以先嚐試使用 retry() 操作符恢復並重新訂閱可觀察到的資料項。
public final Disposable subscribe() public final Disposable subscribe(Consumer<? super T> onNext) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) public final void subscribe(Observer<? super T> observer) 複製程式碼
這裡,我們簡單來了解一下 subscribe(Consumer<? super T> onNext)
的使用吧。
public class ObservableDemo3 { public static void main(String[] args) { Observable.<String>create(emitter -> { emitter.onNext("Hello World"); emitter.onNext("Hello World"); emitter.onComplete(); emitter.onNext("Hello World"); }).subscribe(System.out::println); } } 複製程式碼
注意, onNext
, onError
, onCompleted
方法不需要直接推送到最終的觀察者,它們可以通過 map() 和 filter() 等操作符建立新的 Observable 然後繼續傳送。

Flowable
Flowable 是唯一支援背壓的模式,它的用法與 Observable 非常相似。(關於背壓,筆者會在之後的文章中進行講解。)
public class FlowableDemo { public static void main(String[] args) { Flowable.<String>create(e -> { e.onNext("Hello world!"); e.onNext("Hello World"); e.onComplete(); e.onNext("Hello World"); }, BackpressureStrategy.MISSING).subscribe(new Subscriber<String>(){ @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscriber.onSubscribe"); } @Override public void onNext(String s) { System.out.println("Subscriber.onNext: " + s); } @Override public void onError(Throwable throwable) { System.out.println("Subscriber.onError"); } @Override public void onComplete() { System.out.println("Subscriber.onComplete"); } }); } } 複製程式碼
閱讀 RxJava 2.x 原始碼 io.reactivex.Flowable
,我們可以知道 subscribe 也具有很多過載的方法。
public final Disposable subscribe() public final Disposable subscribe(Consumer<? super T> onNext) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) public final void subscribe(FlowableSubscriber<? super T> s) public final void subscribe(Subscriber<? super T> s) 複製程式碼
Single
Single 的工作就像 Observable 一樣,但是它只有 onSuccess
事件和 onError
事件,並且它有自己的 SingleObserver
介面。 onSuccess
整合了 onNext
和 onComplete
事件,因此,這裡 onSuccess
只能傳送一個數據,換句話說,即使多次傳送也不會產生效果。
public class SingleDemo { public static void main(String[] args) { Single.<String>create(e -> { e.onSuccess("success"); e.onSuccess("success"); }).subscribe(new SingleObserver<String>(){ @Override public void onSubscribe(Disposable d) { System.out.println("SingleObserver.onSubscribe"); } @Override public void onSuccess(String s) { System.out.println("SingleObserver.onSuccess:"+s); } @Override public void onError(Throwable e) { System.out.println("SingleObserver.onError"); } }); } } 複製程式碼
從控制檯的列印結果可以看出,即使多次傳送“success”,但是隻會消費一次。
閱讀 RxJava 2.x 原始碼 io.reactivex.Single
,我們可以知道 subscribe 也具有很多過載的方法。
public final Disposable subscribe() public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) public final Disposable subscribe(Consumer<? super T> onSuccess) public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) public final void subscribe(SingleObserver<? super T> subscriber) 複製程式碼
這裡,我們簡單來了解一下 subscribe(Consumer<? super T> onSuccess)
的使用吧。
public class SingleDemo2 { public static void main(String[] args) { Single.<String>create(e -> { e.onSuccess("success"); }).subscribe(System.out::println); } } 複製程式碼
我們可以通過 toObservable 轉換成一個 Observable 物件。
Single.just("success").toObservable().subscribe(System.out::println); 複製程式碼
Completable
Completable 不傳送資料,只有 onComplete
事件和 onError
事件。
public class CompletableDemo { public static void main(String[] args) { Completable.create(e -> { e.onComplete(); }) .subscribe(System.out::println); } } 複製程式碼
此外,我們可以通過 complete()
快速建立一個 Completable 物件,它會立即呼叫 onComplete
事件。
Completable.complete().subscribe(System.out::println); 複製程式碼
或者,也可以通過 fromAction()
或 fromRunnable()
在呼叫 onComplete
事件之前執行指定的操作。
Completable.fromAction(System.out::println).subscribe(); 複製程式碼
Maybe
Maybe 結合了 Single 和 Completable 特性。Maybe 包含 onSuccess
、 onError
、 onComplete
事件。 這裡, onSuccess
可以傳送 0 ~ 1 個數據,換句話說,即使多次傳送也不會產生效果。如果呼叫 onComplete
事件,就會停止傳送資料。
public class MaybeDemo { public static void main(String[] args) { Maybe.<String>create(e -> { e.onComplete(); e.onSuccess("success"); e.onSuccess("success"); }).subscribe(System.out::println); } } 複製程式碼