1. 程式人生 > >RxJava操作符(六)——功能操作符

RxJava操作符(六)——功能操作符

一、功能操作符:輔助被觀察者(Observable) 在傳送事件時實現一些功能性需求

二、功能操作符按照使用功能,大致分類:

  • 訂閱:subscribe()
  • 執行緒排程:subscribeOn()、observeOn()
  • 延遲:delay()
  • do操作:do()
  • 錯誤處理   :onErrorReturn()  、onErrorResumeNext() 、onExceptionResumeNext()、retry()、retryUntil() 、retryWhen() 
  • 重複操作符:repeat() 、repeatWhen()

三、使用詳解

  • subscribe()
  1. 作用:訂閱觀察者和被觀察者
  2. 使用示例
        //建立被觀察者物件
        Observable observable = Observable.just(1, 2, 3);
        //建立觀察者物件
        Consumer<Integer> integerConsumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        };
        //訂閱
        observable.subscribe(integerConsumer);

完成訂閱後,當被觀察者傳送事件時,觀察者會執行其實現的方法。

  • subscribeOn()/observeOn()
  1. 作用:切換執行執行緒
  2. 介紹:
  • 在 RxJava模型中,被觀察者 (Observable) / 觀察者(Observer)的工作執行緒 = 建立自身的執行緒
  • 建立被觀察者 (Observable) / 觀察者(Observer)的執行緒 = 主執行緒,所以整個過程都會執行在主執行緒
  • 所以生產事件 / 接收& 響應事件都發生在主執行緒,而我們有些耗時操作或網路請求是不需要阻塞執行緒的,所以需要在其他執行緒執行操作後,此時就用到subscribeOn()
  • 當我們在其他執行緒操作資料後需要更新UI時,此時需要切換到主執行緒使用 observeOn()

    3、執行緒型別:在 RxJava中,內建了多種用於排程的執行緒型別

型別含義應用場景
Schedulers.immediate()當前執行緒 = 不指定執行緒預設
AndroidSchedulers.mainThread()Android主執行緒操作UI
Schedulers.newThread()常規新執行緒耗時等操作
Schedulers.io()io操作執行緒網路請求、讀寫檔案等io密集型操作
Schedulers.computation()CPU計算操作執行緒大量計算操作
  • 注:RxJava內部使用 執行緒池 來維護這些執行緒,所以執行緒的排程效率非常高。

    4、使用示例:

 Observable.just(1, 2, 3)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
   5、使用注意:
  • subscribeOn()針對的是目標過程是呼叫此方法之前的過程,observeOn()操作目標為觀察者物件或其後的執行過程
  • 對於subscribeOn()多次呼叫只有第一次起作用,observeOn()每次呼叫都會起作用
Observable.create(new ObservableOnSubscribe() {
           @Override
           public void subscribe(ObservableEmitter e) throws Exception {
               Log.e("Thread===",Thread.currentThread().getName());
               e.onNext(1);
           }
       }).subscribeOn(AndroidSchedulers.mainThread())
               .subscribeOn(Schedulers.newThread())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer() {
                   @Override
                   public void accept(Object o) throws Exception {
                       Log.e("thread===",Thread.currentThread().getName());
                   }
               });

輸出結果:

05-06 06:04:00.454 4814-4814/com.example.administrator.googleplay E/Thread===: main
05-06 06:04:00.547 4814-4814/com.example.administrator.googleplay E/thread===: main
上述過程,制定了兩次訂閱的執行緒分別為主執行緒和建立執行緒,從輸出結果上看被觀察者是在main執行緒中傳送的事件,後面的newThread並未起作用,下面看看observeOn():
      Observable.create(new ObservableOnSubscribe() {
           @Override
           public void subscribe(ObservableEmitter e) throws Exception {
               e.onNext(1);
           }
       }).subscribeOn(Schedulers.io())
              .observeOn(Schedulers.newThread())
              .doOnNext(new Consumer() {
                  @Override
                  public void accept(Object o) throws Exception {
                      Log.e("doOnNext Thread===",Thread.currentThread().getName());
                  }
              })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer() {
                   @Override
                   public void accept(Object o) throws Exception {
                       Log.e("subscribe Thread===",Thread.currentThread().getName());
                   }
               });
上面使用了兩次observeOn切換執行緒,如果兩者都起作用,那第一個應該建立新執行緒,第二列印的應該是主執行緒,看看輸出結果:
05-06 06:10:56.608 5303-5330/com.example.administrator.googleplay E/doOnNext Thread===: RxNewThreadScheduler-1
05-06 06:10:56.624 5303-5303/com.example.administrator.googleplay E/subscribe Thread===: main

輸出結果與分析的一致,所以驗證上面的結論:對於subscribeOn()多次呼叫只有第一次起作用,observeOn()每次呼叫都會起作用

  • delay()
  1. 作用:延遲操作符可以延遲一段時間傳送事件
  2. delay()的過載
// 1. 指定延遲時間
// 引數1 = 時間;引數2 = 時間單位
delay(long delay,TimeUnit unit)

// 2. 指定延遲時間 & 排程器
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延遲時間  & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再丟擲錯誤異常
// 引數1 = 時間;引數2 = 時間單位;引數3 = 錯誤延遲引數
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延遲時間 & 排程器 & 錯誤延遲
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器;引數4 = 錯誤延遲引數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並新增排程

   3、使用示例

Observable.just(1,2,3)
              .delay(1, TimeUnit.SECONDS)
              .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(Integer integer) throws Exception {
                      
                  }
              });
  • do()操作
  1. 作用 :在某個事件的生命週期中呼叫
  2. 型別 :do()操作符有很多個,具體如下

示意圖

    3、使用示例

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("發生錯誤了"));
                 }
               })
                // 1. 當Observable每傳送1次資料事件就會呼叫1次
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG, "doOnEach: " + integerNotification.getValue());
                    }
                })
                // 2. 執行Next事件前呼叫
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext: " + integer);
                    }
                })
                // 3. 執行Next事件後呼叫
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doAfterNext: " + integer);
                    }
                })
                // 4. Observable正常傳送事件完畢後呼叫
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doOnComplete: ");
                    }
                })
                // 5. Observable傳送錯誤事件時呼叫
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "doOnError: " + throwable.getMessage());
                    }
                })
                // 6. 觀察者訂閱時呼叫
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnSubscribe: ");
                    }
                })
                // 7. Observable傳送事件完畢後呼叫,無論正常傳送完畢 / 異常終止
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doAfterTerminate: ");
                    }
                })
                // 8. 最後執行
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doFinally: ");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

輸出結果:

錯誤處理符

  • onErrorReturn() 
  1. 作用: 當發生一個錯誤時會返回一個數據,並且立即停止操作
  2. 使用示例:
Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("傳送異常!"));
            }
        }).onErrorReturn(new Function<Throwable,Integer>() {
            @Override
            public Integer apply(Throwable o) throws Exception {
                Log.e("=====",o.getMessage());
                return 110;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e("=====",o + "");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e("=====",throwable.getMessage());
                    }
                });

輸出結果:

05-06 06:43:17.498 6446-6470/com.example.administrator.googleplay E/=====: 傳送異常!
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 1
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 2
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 3
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 110

在傳送異常事件後首先執行了onErrorReturn輸出異常資訊,然後修改並返回一個數據,此資料作為一個新的事件傳送,從而回掉onNext()方法。

  • onErrorResumeNext()
  1. 作用:當發生一個錯誤時會返回一個新的被觀察著物件
  2. 使用示例:使用方法與onErrorReturn() 相同,只是將傳送的資料換成了一個Onservable物件
.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
            @Override
            public ObservableSource apply(Throwable throwable) throws Exception {
                return Observable.just("A","B");
            }
        }
  • onExceptionResumeNext()
  1. 作用:當發生一個異常時返回一個新的被觀察者
  2. 使用示例同上

注意:

  • onExceptionResumeNext()攔截的錯誤 = Exception;若需攔截Throwable請用onErrorResumeNext()
  • 若onExceptionResumeNext()攔截的錯誤 = Throwable,則會將錯誤傳遞給觀察者的onError方法
  • retry()
  1. 作用:當發生異常時被觀察者會重新發送事件
  2. 型別:
<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送

<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數

<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯 //返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束  //返回true = 重新發送請求(最多重新發送3次)
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試// 引數 = 判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)//返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束  //返回true = 重新發送請求(最多重新發送3次)<-- 5. retry(long time,Predicate predicate) -->// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制// 引數 = 設定重試次數 & 判斷邏輯 //返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束  //返回true = 重新發送請求(最多重新發送3次)

使用示例:


// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
                 }
               })
                // 攔截錯誤後,判斷是否需要重新發送請求
                .retry(3, new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "retry錯誤: "+throwable.toString());

                        //返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束
                        //返回true = 重新發送請求(最多重新發送3次)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • retryUntil()
  1. 作用:當出現錯誤時判斷是否需要重新發送資料,功能與上述類似
  2. 使用示例:
.retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return true;
            }
        })

注意:當返回true時表示不重複傳送,其實從名字中可以看出來retryUntil ,有個Until是當達到某種條件時即可停止

  • retryWhen
  1. 作用:遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者Observable 傳送事件
  2. 使用示例:
Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("傳送異常!"));
                e.onNext(3);
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        return Observable.just(4);
                    }
                });
            }
        })

輸出結果:

/com.example.administrator.googleplay E/=====: 1
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 2
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 1
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 2
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 1

從上面可以看出當傳送異常時會重新發送,傳送資料為1,2 迴圈,

注:1. 若 新的被觀察者 Observable傳送的事件 = Error事件,那麼 原始Observable則不重新發送事件: 

      2. 若 新的被觀察者 Observable傳送的事件 = Next事件 ,那麼原始的Observable則重新發送事件:

現在修改上面的Observable.just(4)為 Observable.error(new Throwable("retryWhen終止啦"));,

Observable.error(new Throwable("retryWhen終止啦"));

再次執行程式,回掉onError()方法,輸出結果:

05-06 07:19:35.237 8015-8015/com.example.administrator.googleplay E/=====: retryWhen終止啦

重複傳送

  • repeat() 
  1. 作用:無條件不停止迴圈
  2. 使用型別
 Observable.just(1, 2, 3)
                .repeat()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e("=====", o + "");
                    }
                });

輸出結果:

05-06 07:24:51.407 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3

過載:repeat(3)引數:迴圈的次數,先修改傳入引數2,執行程式

  .repeat(2)

輸出結果:

05-06 07:28:17.482 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:28:17.509 8210-8210/com.example.administrator.googleplay E/=====: 3
  • repeatUntil () / repeat When()
  1. 作用:按照條件決定是否重複傳送事件
  2. 使用示例:二者的使用方法與上述的retryUntil()/retryWhen()相同

以上就是RxJava的功能操作符的介紹,功能強大之處在開發中會深有體會。