1. 程式人生 > >Android RxJava操作符的學習---功能性操作符

Android RxJava操作符的學習---功能性操作符

3.4 功能性操作符

  • 作用
    輔助被觀察者(Observable) 在傳送事件時實現一些功能性需求

  • 實際應用場景

  1. 連線(訂閱) 觀察者 & 被觀察者
  2. 執行緒排程(切換)
  3. 錯誤處理
  4. 事件生命週期操作
  5. 延時操作
  6. 重複傳送操作
  • 型別
    根據上述應用場景,常見的功能性操作符 主要有:

3.4.3. 應用場景 & 對應操作符詳解

注:在使用RxJava 2操作符前,記得在專案的Gradle中新增依賴:

dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在
}

3.4.3.1 連線被觀察者 & 觀察者

  • 需求場景
    即使得被觀察者 & 觀察者 形成訂閱關係

  • 對應操作符

subscribe()

  • 作用
    訂閱,即連線觀察者 & 被觀察者

  • 具體使用

observable.subscribe(observer);
// 前者 = 被觀察者(observable);後者 = 觀察者(observer 或 subscriber)


<-- 1. 分步驟的完整呼叫 -->
//  步驟1: 建立被觀察者 Observable 物件
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });

// 步驟2:建立觀察者 Observer 並 定義響應事件行為
        Observer<Integer> observer = new Observer<Integer>() {
            // 通過複寫對應方法來 響應 被觀察者
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始採用subscribe連線");
            }
            // 預設最先呼叫複寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件"+ value +"作出響應"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        };

        
        // 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
        observable.subscribe(observer);


<-- 2. 基於事件流的鏈式呼叫 -->
        Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 建立被觀察者 & 生產事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通過通過訂閱(subscribe)連線觀察者和被觀察者
            // 3. 建立觀察者 & 定義響應事件的行為
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始採用subscribe連線");
            }
            // 預設最先呼叫複寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件"+ value +"作出響應"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }

        });
    }
}

  • 測試結果

  • 擴充套件說明
<-- Observable.subscribe(Subscriber) 的內部實現 -->

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    // 在觀察者 subscriber抽象類複寫的方法 onSubscribe.call(subscriber),用於初始化工作
    // 通過該呼叫,從而回調觀察者中的對應方法從而響應被觀察者生產的事件
    // 從而實現被觀察者呼叫了觀察者的回撥方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
    // 同時也看出:Observable只是生產事件,真正的傳送事件是在它被訂閱的時候,即當 subscribe() 方法執行時
}

3.4.3.2 執行緒排程

  • 需求場景
    快速、方便指定 & 控制被觀察者 & 觀察者 的工作執行緒

1). RxJava執行緒控制(排程 / 切換)的作用是什麼?

指定 被觀察者 (Observable) / 觀察者(Observer) 的工作執行緒型別。


2). 為什麼要進行RxJava執行緒控制(排程 / 切換)?

2).1 背景

  • RxJava模型中,被觀察者 (Observable) / 觀察者(Observer)的工作執行緒 = 建立自身的執行緒

即,若被觀察者 (Observable) / 觀察者(Observer)在主執行緒被建立,那麼他們的工作(生產事件 / 接收& 響應事件)就會發生在主執行緒

  • 因為建立被觀察者 (Observable) / 觀察者(Observer)的執行緒 = 主執行緒
  • 所以生產事件 / 接收& 響應事件都發生在主執行緒

下面請看1個RxJava的基礎使用

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 步驟1:建立被觀察者 Observable & 傳送事件
        // 在主執行緒建立被觀察者 Observable 物件
        // 所以生產事件的執行緒是:主執行緒

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, " 被觀察者 Observable的工作執行緒是: " + Thread.currentThread().getName());
                // 列印驗證
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

// 步驟2:建立觀察者 Observer 並 定義響應事件行為
        // 在主執行緒建立觀察者 Observer 物件
        // 所以接收 & 響應事件的執行緒是:主執行緒
        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始採用subscribe連線");
                Log.d(TAG, " 觀察者 Observer的工作執行緒是: " + Thread.currentThread().getName());
                // 列印驗證

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件"+ value +"作出響應"  );
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        };

        // 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
        observable.subscribe(observer);
    }
}
  • 測試結果

2).2 衝突

  • 對於一般的需求場景,需要在子執行緒中實現耗時的操作;然後回到主執行緒實現 UI操作
  • 應用到 RxJava模型中,可理解為:
    1. 被觀察者 (Observable)子執行緒 中生產事件(如實現耗時操作等等)
    2. 觀察者(Observer)主執行緒 接收 & 響應事件(即實現UI操作)

2).3 解決方案

所以,為了解決上述衝突,即實現 真正的非同步操作,我們需要對RxJava進行 執行緒控制(也稱為排程 / 切換)


3). 實現方式

採用 RxJava內建的執行緒排程器Scheduler ),即通過 功能性操作符subscribeOn() & observeOn()實現

3).1 功能性操作符subscribeOn() & observeOn()簡介

  • 作用
    執行緒控制,即指定 被觀察者 (Observable) / 觀察者(Observer) 的工作執行緒型別
  • 執行緒型別
    RxJava中,內建了多種用於排程的執行緒型別
型別 含義 應用場景
Schedulers.immediate() 當前執行緒 = 不指定執行緒 預設
AndroidSchedulers.mainThread() Android主執行緒 操作UI
Schedulers.newThread() 常規新執行緒 耗時等操作
Schedulers.io() io操作執行緒 網路請求、讀寫檔案等io密集型操作
Schedulers.computation() CPU計算操作執行緒 大量計算操作
  • 注:RxJava內部使用 執行緒池 來維護這些執行緒,所以執行緒的排程效率非常高。

3).2 具體使用

  • 具體是在 (上述步驟3)通過訂閱(subscribe)連線觀察者和被觀察者中實現

<-- 使用說明 -->
  // Observable.subscribeOn(Schedulers.Thread):指定被觀察者 傳送事件的執行緒(傳入RxJava內建的執行緒型別)
  // Observable.observeOn(Schedulers.Thread):指定觀察者 接收 & 響應事件的執行緒(傳入RxJava內建的執行緒型別)

<-- 例項使用 -->
// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
        observable.subscribeOn(Schedulers.newThread()) // 1. 指定被觀察者 生產事件的執行緒
                  .observeOn(AndroidSchedulers.mainThread())  // 2. 指定觀察者 接收 & 響應事件的執行緒
                  .subscribe(observer); // 3. 最後再通過訂閱(subscribe)連線觀察者和被觀察者

  • 測試結果

  • 特別注意

1. 若Observable.subscribeOn()多次指定被觀察者 生產事件的執行緒,則只有第一次指定有效,其餘的指定執行緒無效

// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
        observable.subscribeOn(Schedulers.newThread()) // 第一次指定被觀察者執行緒 = 新執行緒
                  .subscribeOn(AndroidSchedulers.mainThread()) // 第二次指定被觀察者執行緒 = 主執行緒
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(observer);
  • 測試結果:被觀察者的執行緒 = 第一次指定的執行緒 = 新的工作執行緒,第二次指定的執行緒(主執行緒)無效

2. 若Observable.observeOn()多次指定觀察者 接收 & 響應事件的執行緒,則每次指定均有效,即每指定一次,就會進行一次執行緒的切換

// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
        observable.subscribeOn(Schedulers.newThread())
                  .observeOn(AndroidSchedulers.mainThread()) // 第一次指定觀察者執行緒 = 主執行緒
                  .doOnNext(new Consumer<Integer>() { // 生產事件
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "第一次觀察者Observer的工作執行緒是: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.newThread()) // 第二次指定觀察者執行緒 = 新的工作執行緒
                .doOnNext(new Consumer<Integer>() { // 生產事件
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "第二次觀察者Observer的工作執行緒是: " + Thread.currentThread().getName());
                    }
                })
                .subscribe(observer); // 生產事件


// 注:
// 1. 整體方法呼叫順序:觀察者.onSubscribe()> 被觀察者.subscribe()> 觀察者.doOnNext()>觀察者.onNext()>觀察者.onComplete() 
// 2. 觀察者.onSubscribe()固定在主執行緒進行
  • 測試結果:每呼叫一次observeOn(),觀察者的執行緒就會切換一次

4). 具體例項

下面,我將採用最常見的 Retrofit + RxJava 實現 網路請求 的功能,從而說明 RxJava的執行緒控制的具體應用

4).1 功能說明

  • 實現功能:將中文翻譯成英文 - > 顯示到介面
  • 實現方案:採用Get方法對 金山詞霸API 傳送網路請求
  1. 先切換到工作執行緒 傳送網路請求
  2. 再切換到主執行緒進行 UI更新

4).2 步驟說明

  1. 新增依賴
  2. 建立 接收伺服器返回資料 的類
  3. 建立 用於描述網路請求 的介面(區別於傳統形式)
  4. 建立 Retrofit 例項
  5. 建立 網路請求介面例項 並 配置網路請求引數(區別於傳統形式)
  6. 傳送網路請求(區別於傳統形式)
  7. 傳送網路請求
  8. 對返回的資料進行處理

4).3 步驟實現

步驟1: 新增依賴

a. 在 Gradle加入Retrofit庫的依賴

build.gradle

dependencies {

// Android 支援 Rxjava
// 此處一定要注意使用RxJava2的版本
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

// Android 支援 Retrofit
compile 'com.squareup.retrofit2:retrofit:2.1.0'

// 銜接 Retrofit & RxJava
// 此處一定要注意使用RxJava2的版本
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

// 支援Gson解析
compile 'com.squareup.retrofit2:converter-gson:2.1.0'

}

b. 新增 網路許可權
AndroidManifest.xml

<uses-permission android:name="android.permission.INTERNET"/>

步驟2:建立 接收伺服器返回資料 的類

  • 金山詞霸API 的資料格式說明如下:
// URL模板
http://fy.iciba.com/ajax.php

// URL例項
http://fy.iciba.com/ajax.php?a=fy&f=auto&t=auto&w=hello%20world

// 引數說明:
// a:固定值 fy
// f:原文內容型別,日語取 ja,中文取 zh,英語取 en,韓語取 ko,德語取 de,西班牙語取 es,法語取 fr,自動則取 auto
// t:譯文內容型別,日語取 ja,中文取 zh,英語取 en,韓語取 ko,德語取 de,西班牙語取 es,法語取 fr,自動則取 auto
// w:查詢內容
  • 根據 金山詞霸API 的資料格式,建立 接收伺服器返回資料 的類:

Translation.java

public class Translation {
    private int status;

    private content content;
    private static class content {
        private String from;
        private String to;
        private String vendor;
        private String out;
        private int errNo;
    }

    //定義 輸出返回資料 的方法
    public void show() {
        System.out.println( "Rxjava翻譯結果:" + status);
        System.out.println("Rxjava翻譯結果:" + content.from);
        System.out.println("Rxjava翻譯結果:" + content.to);
        System.out.println("Rxjava翻譯結果:" + content.vendor);
        System.out.println("Rxjava翻譯結果:" + content.out);
        System.out.println("Rxjava翻譯結果:" + content.errNo);
    }
}

步驟3:建立 用於描述網路請求 的介面

採用 註解 + Observable<...>介面描述 網路請求引數

GetRequestthread_Interface.java

public interface GetRequest_Interface {

    @GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
    Observable<Translation> getCall();
     // 註解裡傳入 網路請求 的部分URL地址
    // Retrofit把網路請求的URL分成了兩部分:一部分放在Retrofit物件裡,另一部分放在網路請求接口裡
    // 如果接口裡的url是一個完整的網址,那麼放在Retrofit物件裡的URL可以忽略
    // 採用Observable<...>介面 
    // getCall()是接受網路請求資料的方法
}

接下來的步驟均在MainActivity.java內實現(請看註釋)

MainActivity.java

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        //步驟4:建立Retrofit物件
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://fy.iciba.com/") // 設定 網路請求 Url
                .addConverterFactory(GsonConverterFactory.create()) //設定使用Gson解析(記得加入依賴)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支援RxJava
                .build();

        // 步驟5:建立 網路請求介面 的例項
        GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

        // 步驟6:採用Observable<...>形式 對 網路請求 進行封裝
        Observable<Translation> observable = request.getCall();

        // 步驟7:傳送網路請求
        observable.subscribeOn(Schedulers.io())               // 在IO執行緒進行網路請求
                  .observeOn(AndroidSchedulers.mainThread())  // 回到主執行緒 處理請求結果
                  .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Translation result) {
                        // 步驟8:對返回的資料進行處理
                        result.show() ;
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "請求失敗");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "請求成功");
                    }
                });
    }
}

4).4 測試結果

5). 特別注意

5).1 依賴包問題

  • 問題說明

示意圖

  • 解決方法
    通過在Gradle使用packageOptions解決

build.gradle

android {
    ...
    packagingOptions {
        exclude 'META-INF/rxjava.properties'
    }
}

5).2 應用程式崩潰問題

  • 背景:在傳送網路請求時 退出當前Activity
  • 衝突:此時如果回到主執行緒更新 UIApp會崩潰
  • 解決方案:當 Activity退出時,呼叫 Disposable.dispose()切斷觀察者和被觀察者的連線,使得觀察者無法收到事件 & 響應事件

當出現多個Disposable時,可採用RxJava內建容器CompositeDisposable進行統一管理

// 新增Disposable到CompositeDisposable容器
CompositeDisposable.add()

// 清空CompositeDisposable容器
CompositeDisposable.clear() 

 

3.4.3.3 延遲操作

  • 需求場景
    即在被觀察者傳送事件前進行一些延遲的操作

  • 對應操作符使用

delay()

  • 作用
    使得被觀察者延遲一段時間再發送事件

  • 方法介紹
    delay() 具備多個過載方法,具體如下:

// 1. 指定延遲時間
// 引數1 = 時間;引數2 = 時間單位
delay(long delay,TimeUnit unit)

// 2. 指定延遲時間 & 排程器
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延遲時間  & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再丟擲錯誤異常
// 引數1 = 時間;引數2 = 時間單位;引數3 = 錯誤延遲引數
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延遲時間 & 排程器 & 錯誤延遲
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器;引數4 = 錯誤延遲引數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並新增排程器,錯誤通知可以設定是否延遲
  • 具體使用
Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS) // 延遲3s再發送,由於使用類似,所以此處不作全部展示
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

3.4.3.4 在事件的生命週期中操作

  • 需求場景
    在事件傳送 & 接收的整個生命週期過程中進行操作

如傳送事件前的初始化、傳送事件後的回撥請求等

  • 對應操作符使用

do()

  • 作用
    在某個事件的生命週期中呼叫
  • 型別
    do()操作符有很多個,具體如下:

 

  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("發生錯誤了"));
                 }
               })
                // 1. 當Observable每傳送1次資料事件就會呼叫1次
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG, "doOnEach: " + integerNotification.getValue());
                    }
                })
                // 2. 執行Next事件前呼叫
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext: " + integer);
                    }
                })
                // 3. 執行Next事件後呼叫
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doAfterNext: " + integer);
                    }
                })
                // 4. Observable正常傳送事件完畢後呼叫
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doOnComplete: ");
                    }
                })
                // 5. Observable傳送錯誤事件時呼叫
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "doOnError: " + throwable.getMessage());
                    }
                })
                // 6. 觀察者訂閱時呼叫
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnSubscribe: ");
                    }
                })
                // 7. Observable傳送事件完畢後呼叫,無論正常傳送完畢 / 異常終止
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doAfterTerminate: ");
                    }
                })
                // 8. 最後執行
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doFinally: ");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

3.4.3.5 錯誤處理

  • 需求場景
    傳送事件過程中,遇到錯誤時的處理機制

  • 對應操作符型別

 

  • 對應操作符使用

onErrorReturn()

  • 作用
    遇到錯誤時,傳送1個特殊事件 & 正常終止

可捕獲在它之前發生的異常

  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("發生錯誤了"));
                 }
               })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(@NonNull Throwable throwable) throws Exception {
                        // 捕捉錯誤異常
                        Log.e(TAG, "在onErrorReturn處理了錯誤: "+throwable.toString() );

                        return 666;
                        // 發生錯誤事件後,傳送一個"666"事件,最終正常結束
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

onErrorResumeNext()

  • 作用
    遇到錯誤時,傳送1個新的Observable

注:

  1. onErrorResumeNext()攔截的錯誤 = Throwable;若需攔截Exception請用onExceptionResumeNext()
  2. onErrorResumeNext()攔截的錯誤 = Exception,則會將錯誤傳遞給觀察者的onError方法
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("發生錯誤了"));
                 }
               })
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {

                        // 1. 捕捉錯誤異常
                        Log.e(TAG, "在onErrorReturn處理了錯誤: "+throwable.toString() );

                        // 2. 發生錯誤事件後,傳送一個新的被觀察者 & 傳送事件序列
                        return Observable.just(11,22);
                        
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

onExceptionResumeNext()

  • 作用
    遇到錯誤時,傳送1個新的Observable

注:

  1. onExceptionResumeNext()攔截的錯誤 = Exception;若需攔截Throwable請用onErrorResumeNext()
  2. onExceptionResumeNext()攔截的錯誤 = Throwable,則會將錯誤傳遞給觀察者的onError方法
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                 }
               })
                .onExceptionResumeNext(new Observable<Integer>() {
                    @Override
                    protected void subscribeActual(Observer<? super Integer> observer) {
                        observer.onNext(11);
                        observer.onNext(22);
                        observer.onComplete();
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

retry()

  • 作用
    重試,即當出現錯誤時,讓被觀察者(Observable)重新發射資料
  1. 接收到 onError()時,重新訂閱 & 傳送事件
  2. ThrowableException都可攔截
  • 型別

共有5種過載方法

<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送

<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
 
<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試
// 引數 =  判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯

  • 具體使用
<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
                 }
               })
                .retry() // 遇到錯誤時,讓被觀察者重新發射資料(若一直錯誤,則一直重新發送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });


<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
                 }
               })
                .retry(3) // 設定重試次數 = 3次
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
                 }
               })
                // 攔截錯誤後,判斷是否需要重新發送請求
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "retry錯誤: "+throwable.toString());

                        //返回false = 不重新重新發送資料 & 呼叫觀察者的onError結束
                        //返回true = 重新發送請求(若持續遇到錯誤,就持續重新發送)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試
// 引數 =  判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
                 }
               })

                // 攔截錯誤後,判斷是否需要重新發送請求
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "異常錯誤 =  "+throwable.toString());

                        // 獲取當前重試次數
                        Log.e(TAG, "當前重試次數 =  "+integer);

                        //返回false = 不重新重新發送資料 & 呼叫觀察者的onError結束
                        //返回true = 重新發送請求(若持續遇到錯誤,就持續重新發送)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });


<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
                 }
               })
                // 攔截錯誤後,判斷是否需要重新發送請求
                .retry(3, new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "retry錯誤: "+throwable.toString());

                        //返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束
                        //返回true = 重新發送請求(最多重新發送3次)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

retryUntil()

  • 作用
    出現錯誤後,判斷是否需要重新發送資料
  1. 若需要重新發送 & 持續遇到錯誤,則持續重試
  2. 作用類似於retry(Predicate predicate)
  • 具體使用
    具體使用類似於retry(Predicate predicate),唯一區別:返回 true 則不重新發送資料事件。此處不作過多描述

retryWhen()

  • 作用
    遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者(Observable)& 傳送事件
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發生錯誤了"));
                e.onNext(3);
            }
        })
                // 遇到error事件才會回撥
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                        // 引數Observable<Throwable>中的泛型 = 上游操作符丟擲的異常,可通過該條件來判斷異常的型別
                        // 返回Observable<?> = 新的被觀察者 Observable(任意型別)
                        // 此處有兩種情況:
                            // 1. 若 新的被觀察者 Observable傳送的事件 = Error事件,那麼 原始Observable則不重新發送事件:
                            // 2. 若 新的被觀察者 Observable傳送的事件 = Next事件 ,那麼原始的Observable則重新發送事件:
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

                                // 1. 若返回的Observable傳送的事件 = Error事件,則原始的Observable不重新發送事件
                                // 該異常錯誤資訊可在觀察者中的onError()中獲得
                                 return Observable.error(new Throwable("retryWhen終止啦"));
                                
                                // 2. 若返回的Observable傳送的事件 = Next事件,則原始的Observable重新發送事件(若持續遇到錯誤,則持續重試)
                                 // return Observable.just(1);
                            }
                        });

                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應" + e.toString());
                        // 獲取異常錯誤資訊
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

 

3.4.3.6 重複傳送

  • 需求場景
    重複不斷地傳送被觀察者事件

  • 對應操作符型別
    repeat() & repeatWhen()

repeat()

  • 作用
    無條件地、重複傳送 被觀察者事件

具備過載方法,可設定重複建立次數

  • 具體使用
// 不傳入引數 = 重複傳送次數 = 無限次
        repeat();
        // 傳入引數 = 重複傳送次數有限
        repeatWhen(Integer int );

// 注:
  // 1. 接收到.onCompleted()事件後,觸發重新訂閱 & 傳送
  // 2. 預設執行在一個新的執行緒上

        // 具體使用
        Observable.just(1, 2, 3, 4)
                .repeat(3) // 重複建立次數 =- 3次
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }
                    
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });



  • 測試結果

repeatWhen()

  • 作用
    有條件地、重複傳送 被觀察者事件

  • 原理
    將原始 Observable 停止傳送事件的標識(Complete() / Error())轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable),以此決定是否重新訂閱 & 傳送原來的 Observable

  1. 若新被觀察者(Observable)返回1個Complete / Error事件,則不重新訂閱 & 傳送原來的 Observable
  2. 若新被觀察者(Observable)返回其餘事件時,則重新訂閱 & 傳送原來的 Observable
  • 具體使用
 Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            // 在Function函式中,必須對輸入的 Observable<Object>進行處理,這裡我們使用的是flatMap操作符接收上游的資料
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                // 將原始 Observable 停止傳送事件的標識(Complete() /  Error())轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable)
                // 以此決定是否重新訂閱 & 傳送原來的 Observable
                // 此處有2種情況:
                // 1. 若新被觀察者(Observable)返回1個Complete() /  Error()事件,則不重新訂閱 & 傳送原來的 Observable
                // 2. 若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                        // 情況1:若新被觀察者(Observable)返回1個Complete() /  Error()事件,則不重新訂閱 & 傳送原來的 Observable
                        return Observable.empty();
                        // Observable.empty() = 傳送Complete事件,但不會回撥觀察者的onComplete()

                        // return Observable.error(new Throwable("不再重新訂閱事件"));
                        // 返回Error事件 = 回撥onError()事件,並接收傳過去的錯誤資訊。

                        // 情況2:若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
                        // return Observable.just(1);
                       // 僅僅是作為1個觸發重新訂閱被觀察者的通知,傳送的是什麼資料並不重要,只要不是Complete() /  Error()事件
                    }
                });

            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應:" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });
  • 測試結果