1. 程式人生 > >RxJava2.x 萌新之路 操作符篇

RxJava2.x 萌新之路 操作符篇

操作符總覽

e9313f5b-8e66-4a16-88d6-390855bf4d24.jpg

Rxjava為函數語言程式設計提供了眾多的操作符,操作符的運用可以使得程式邏輯更為簡潔。

網上已有眾多操作符說明教學,但不親身總結和嘗試一遍,是難以體會到其中奧妙與融會貫通的,簡單記錄總結以備大家使用參考。

建立操作符

just

自動依次傳送事件序列。
例項:
Observable .just("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")
依次傳送呼叫onNext(),最後預設呼叫complete()

create

手動建立事件序列,返回一個可自由操作的emitter,優點是自由控制事件流程。
emitter.onNext();
emitter.onError();
emitter.onComplete();

fromIterable

傳入陣列並按角標依次傳送事件。
Observable.fromIterable(list),每次接收單個元素。

fromArray

傳入陣列一次性發送,一次接收所有元素。

timer

延時傳送事件 Observable .timer(2, TimeUnit.SECONDS)

interval

可取代CountDownTimer、Handler,5秒傳送一次事件:
Observable .interval(5, TimeUnit.SECONDS)

例項:取代handler進行定時計劃

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(20, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        doTask();
                    }
                });
    }

    /**
     * 銷燬時停止計劃
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }
intervalRange

給事件更多的時間控制:
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
引數1:起始傳送值
引數2:傳送數量
引數3:首次傳送延遲事件
引數4:每次傳送事件間隔
引數5:時間單位

Range

依次傳送範圍內的事件
Observable.range(2, 6),接收型別Integer

轉換操作符

map

實現單個數據的轉換

例項:把網路中ResponseBody用Gson轉換為相對應的資料實體再下發給子類。

  .map(new Function<Response, Number>() {
           @Override
           public MobileAddress apply(@NonNull Response response) throws Exception {
            if (response.isSuccessful()) {
                ResponseBody body = response.body();
                if (body != null) {
                    Log.e(TAG, "map:轉換前:" + response.body());
                    return new Gson().fromJson(body.string(), MobileAddress.class);
                }
            }
                    return null;
                }
            }).observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<MobileAddress>() {
             @Override
             public void accept(@NonNull MobileAddress s) throws Exception {
                 Log.e(TAG, "doOnNext: Number:" + s.getNumbser() + "\n");
             }
         })
flatMap和concatMap

兩者都可以實現資料集合中一對多事件的轉換,後者會按傳送的順序獲取接收結果,前者可能是亂序接收(不確定哪個事件先完成)。

一對多事件轉換:在flatMap集合中例如可以操作一個公司實體,並轉換為單個部門實體,返回後在後續的accept中,又可以使用單個部門實體對每個成員進行邏輯處理。

例項:

Observable.fromArray(1,2,3,4,5)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {

                        int delay = 0;
                        if(integer == 3){
                            delay = 500;//延遲500ms
                        }
                        return Observable.just(integer *10).delay(delay, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("tag","accept:"+integer);
            }
        });

使用flatMap結果:10,20,40,30,50
使用contactMap結果:10,20,30,40,50

buffer

分批發送事件

例項:
Observable .just(1, 2, 3, 4, 5, 6) .buffer(2)
傳送1,2;傳送3,4;在傳送5,6

合併操作符

merge和contat

兩者都可以合併多個Observable事件,前者傳送順序不確定(並行無序),後者按順序傳送(序列有序)。
mergeArray和concatArray效果相同,適用於大於4個事件的情況。

例項:
定義cache和network兩個事件,先檢視快取是否有資料,有即onNext去重新整理頁面,沒有則onComplete讀取網路資料。

 Observable.concat(cache,network)
concatDelayError和 mergeDelayError

兩者都可以在merge和contat操作中出現錯誤時停止傳送當前事件集合,但不影響合併中的另一個事件集合傳送

zip

zip 操作符可以將多個 Observable 的資料結合為一個數據源再發射出去

例項:分別請求生日、地址、性別等資訊後,將多個請求結果合成一個,再進行UI更新。

....分別請求生日、地址...
Observable.zip(observable1, observable2, new BiFunction<Birth, Address, String>() {
            @Override
            public String apply(@NonNull Birth birth, @NonNull Address address) throws Exception {
                return "合併後的資料為 Birth:"+birth.getResult()+" Address:"+address.getResult();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                });
過濾操作符
操作符 說明
filter 自定義篩選條件,返回boolean
distinct 去重
distinctUntilChanged 過濾連續相同事件
skip,skipLast 跳過前n個事件或最後n個
take和takeLast 只接收前n個事件或最後n個
elementAt和elementAtOrError 前者只發送第n個,可設定預設值,不拋異常;後者越界拋異常。
ignoreElements 只接收完成和報錯資訊
distinct 去重
ofType 指定接收資料型別
throttleFirst/throttleLast 只接收指定時間內第一個或最後一個事件

其他操作符

do

doOnEach() :當Observable每傳送一次事件就會呼叫一次(包含onNext(),onError(),onComplete())
doOnNext(): 執行 onNext()前呼叫
doAfterNext(): 執行onNext()後呼叫
doOnComplete():執行onComplete()前呼叫
doOnError():執行 onError()前呼叫
doOnTerminate(): 執行終止(無論正常傳送完畢/異常終止)
doFinally(): 最後執行
doOnSubscribe() :觀察者訂閱是呼叫
doOnUnScbscribe(): 觀察者取消訂閱時呼叫

onErrorReturn

捕獲錯誤並返回,不傳送後續事件。

onExceptionResumeNext/onErrorResumeNext

捕獲錯誤跳過當前事件同時不中斷髮送後續事件。

retry

retry() : 出現錯誤時,讓被觀察者重新發送資料。若錯誤一直髮生,則一直重新發送
retry(long time):與retry不同的書,若錯誤一直髮生,被觀察者則一直重新發送資料,但這持續重新發送有次數限制
retry(Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料
retry(new BiPredicate<Integer, Throwable>):出現錯誤時,根據指定邏輯(可以捕獲重發的次數和發生的錯誤)決定是否讓被觀察者重新發送資料
retry(long time,Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料。並且有持續重發的次數限制

retryUntil

遇到錯誤時根據制定規則選擇是否重發

retryWhen

遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者(Observable)

repeat和repeatWhen

repeat重複發射 observable的資料序列,可以使無限次也可以是指定次數.不傳時為重複無限次。
repeatWhen遇到錯誤選擇返回object給新觀察者或中止事件

返回引數選擇:
Observable.empty();
傳送Complete事件,但不會回撥觀察者的Complete()

onComplete()
直接完成。

Observable.error(new Throwable("不再重新訂閱事件"));

Observable.just(1);
繼續傳送事件。

debounce

一定的時間內沒有操作就會發送事件(只會傳送最後一次操作的事件)

例項:
Observable.intervalRange(1, 2, 3, 4, TimeUnit.SECONDS)
.debounce(2, TimeUnit.SECONDS)
只有最後一個4的事件會被髮送(2秒後)

條件操作符

操作符 說明
all 判斷被觀察者所有事件是否滿足某個事件,如果全部滿足則返回true,都在返回false
takeUntil 當事件滿足設定的條件時,該事件的下一個事件不會被髮送了。包含超過臨界條件的第一個事件
takeWhile 當事件滿足設定的條件時,傳送事件
skipUntil 直到設定的條件事件發出之後,開始傳送原始事件。
skipWhile 跳過while範圍內事件
amb 多個Observable序列中,只發送第一個
contains 是否存在特定元素
exists 是否滿足特定條件
DefaultIfEmpty 如果沒有正常結束事件(onComlete執行),返回預設值
SequenceEqual 判斷兩個事件序列是否是相同的資料,相同的順序,相同的終止狀態

相似操作符對比

timer():用於建立Observable,延遲傳送一次。
interval():用於建立Observable,跟TimerTask類似,用於週期性傳送。
delay():用於事件流中,可以延遲傳送事件流中的某一次傳送。