1. 程式人生 > >RxJava2(二)五種被觀察者

RxJava2(二)五種被觀察者

五種被觀察者為Observable,Flowable,Single,Completable,Maybe。
五種被觀察者可通過toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉換

Observable

一簡介
1.Observable即被觀察者,決定什麼時候觸發事件以及觸發怎樣的事件。
2.Oberver即觀察者,他可以在不同的執行緒中執行任務,極大的簡化了併發操作,因為他建立了一個處於待命狀態的觀察者,可以在某一時刻響應Observable的通知,而不會造成阻塞。
3.ObservableEmitter資料發射器,發射Observable的onNext,onError,onComplete,onSubscribe方法。
4.subscribe() 訂閱Observable的四個方法,只有呼叫此方法才會開始發射資料。其有4個構造方法:

 subscribe(onNext())
 subscribe(onNext(),onError())
 subscribe(onNext(),onError(),onComplete())
 subscribe(onNext(),onError(),onComplete(),onSubscribe())

寫個過載:

//建立被觀察者
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe
(ObservableEmitter<String> emitter) { //發射資料,先檢查訂閱者是否取消訂閱 if (!emitter.isDisposed()){ emitter.onNext("Hello World"); } emitter.onComplete(); } //訂閱給觀察者,接收資料 }).subscribe(new Consumer<
String>
() { @Override public void accept(@NonNull String s) throws Exception { Log.d(TAG, "accept: onNext = " + s); Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show(); } //錯誤 }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "accept: onError = " + throwable.getMessage()); } //執行完成 }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "run: onComplete"); } }, new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.d(TAG, "accept: onSubscribe"); } });

5.若所有觀察者取消訂閱,則資料流停止,若重新訂閱,重新開始資料流;若部分取消訂閱,不會停止資料流,仍然繼續發射資料,當再次訂閱,不會重新開始資料流,只會收到當前發射資料。
二,HotObservable和ColdObservable
1.HotObservable:無論有無觀察者訂閱,事件始終會發生,與訂閱者們成一對多關係,共享資訊。適用於某些事件不確定何時發生和不確定發射的元素數量。
2. ColdObservable:只有訂閱者訂閱了,才開始發射資料。和訂閱者們成一 一對應關係,各自訊息是重新完整發送,彼此獨立互不干擾。
3. Observable的just,create,range,fromXX等操作符建立的是ColdObservable。
4. 相互轉換:publish 操作符 ColdObservable====>HotObservable;
呼叫connect()後才真正執行轉換。

//建立被觀察者HotObservable
        ConnectableObservable<String> connectableObservable =
                Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) {
                        //發射資料
                        emitter.onNext("Hello World");
                    }
                    //開啟一個新執行緒,將ColdObservable轉換為HotObservable
                }).observeOn(Schedulers.newThread()).publish();
        //執行轉化
        connectableObservable.connect();
        //訂閱給觀察者,接收資料
        connectableObservable.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.d(TAG, "accept: onNext = " + s);
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        });
  1. 相互轉換:refCount 操作符HotObservable====>ColdObservable
//建立被觀察者HotObservable
        ConnectableObservable<String> connectableObservable =
                Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) {
                        //發射資料
                        emitter.onNext("Hello World");
                    }
                    //開啟一個新執行緒,將ColdObservable轉換為HotObservable
                }).observeOn(Schedulers.newThread()).publish();
        //執行轉化
        connectableObservable.connect();
        //再次轉化為ColdObservable
        Observable observable = connectableObservable.refCount();
        //訂閱給觀察者,接收資料
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.d(TAG, "accept: onNext = " + s);
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        });

Flowable

Flowable可以看成是Observable的實現,只是它支援背壓,其所有操作付強制支援背壓。
Observable:
·一般處理不超過1000條資料,幾乎不會造成記憶體溢位。
·不會背壓
·處理同步流
Flowable:
·處理超過10KB的資料元素
·檔案讀取與分析
·讀取資料庫
·處理網路I/O流
·建立一個響應式的非阻塞介面

Single

只有onSuccess可onError事件,只能用onSuccess發射一個數據或一個錯誤通知,之後再發射資料也不會做任何處理,直接忽略。

Completable

只有onComplete和onError事件,不發射資料,沒有map,flatMap操作符。常常結合andThen操作符使用。
andThen:接下來執行。

Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    //完成
                    emitter.onComplete();
                } catch (InterruptedException e) {
                	//錯誤
                    emitter.onError(e);
                }
            }
            //Completable已執行完成,接下來執行:新建Observable發射1-10這幾個資料
        }).andThen(Observable.range(1, 10))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer.toString());
                    }
                });

Maybe

沒有onNext方法,同樣需要onSuccess發射資料,且只能發射0或1個數據,多發也不再處理。