1. 程式人生 > >RxJava 錯誤處理操作符(Error Handling Operators)

RxJava 錯誤處理操作符(Error Handling Operators)

RxJava系列教程:

一般來說,Observable不會拋異常。它會呼叫 onError 終止Observable序列,以此通知所有的觀察者發生了一個不可恢復的錯誤。 但是,也存在一些異常。例如,如果 onError 呼叫失敗了,Observable不會嘗試再次呼叫 onError 去通知觀察者,它會丟擲 RuntimeException,OnErrorFailedException 或者 OnErrorNotImplementedException。

有時我們希望觀察者或者操作符應該對異常發生時的 onError 通知做出合適的響應,而不是捕獲(catch)異常。很多操作符可用於對Observable發射的onError通知做出響應或者從錯誤中恢復,例如,你可以:

  • 吞掉這個錯誤,切換到一個備用的Observable繼續發射資料
  • 吞掉這個錯誤然後發射預設值
  • 吞掉這個錯誤並立即嘗試重啟這個Observable
  • 吞掉這個錯誤,在一些回退間隔後重啟這個Observable

我們可以使用Error handling相關的操作符來集中統一地處理錯誤。RxJava中錯誤處理的操作符為 Catch和 Retry。

Catch

  Catch操作符能夠攔截原始Observable的onError通知,不讓Observable因為產生錯誤而終止。相當於Java中try/catch操作,不能因為拋異常而導致程式崩潰。
  
這裡寫圖片描述

RxJava將Catch實現為三個不同的操作符:

onErrorReturn:讓Observable遇到錯誤時發射一個特殊的項並且正常終止。
onErrorResumeNext:讓Observable在遇到錯誤時開始發射第二個Observable的資料序列。
onExceptionResumeNext:讓Observable在遇到錯誤時繼續發射後面的資料項。

onErrorReturn

這裡寫圖片描述

onErrorReturn方法 返回一個映象原有Observable行為的新Observable
會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,而是發射一個特殊的項並呼叫觀察者的onCompleted方法。

API

Javadoc: onErrorReturn(Func1))

示例程式碼

/*
 * onErrorReturn:
 * 返回一個原有Observable行為的新Observable映象,
 * 後者會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,
 * 作為替代,它會發發射一個特殊的項並呼叫觀察者的onCompleted方法
 */
createObserver()
        //作為替代,它會發發射一個特殊的項並呼叫觀察者的onCompleted方法。
        .onErrorReturn(new Func1<Throwable, String>() {

            @Override
            public String call(Throwable throwable) {

                return "do something";
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

這裡建立Observable的方法(2個)如下:

private static Observable<String> createObserver() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i <= 6; i++) {
                    if (i < 3) {
                        subscriber.onNext(i+"");
                    } else {
                        //會忽略onError呼叫,不會將錯誤傳遞給觀察者
                        subscriber.onError(new Throwable("Throw error"));
                    }
                }
            }
        });
    }

    private static Observable<String> createObserver2() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i <= 6; i++) {
                    if (i < 3) {
                        subscriber.onNext("onNext:" + i);
                    } else {
                        subscriber.onError(new Exception("the nubmer is greater than 3"));
                        //下面寫法也是可以的
                        /*try {
                            throw new Exception("the nubmer is greater than 3");

                        } catch (Exception e) {
                            subscriber.onError(e);
                        }*/
                    }
                }
            }
        });
    }

輸出結果如下:

onSuccess value = 1
onSuccess value = 2
onSuccess value = do something
onCompleted

在手動建立Observale時,當Observable傳送了第二個資料後,Observable傳送了onError通知,然後又傳送了1個數據。而在onErrorReturn方法處理中,其引數函式中,建立並返回了一個特殊項( do something).

從Log列印可以看出,觀察者並沒有執行onError方法,意味著Observale並沒有接收到onError通知,而是接收到了一個特殊項後,呼叫了onCompleted方法,結束了此次訂閱。而這個特殊項,正是在onErrorReturn中引數函式中,建立的特殊項。

onErrorResumeNext

這裡寫圖片描述

onErrorResumeNext方法與onErrorReturn()方法類似,都是攔截原Observable的onError通知,不同的是攔截後的處理方式,onErrorReturn建立並返回一個特殊項,而onErrorResumeNext建立並返回一個新的Observabl,觀察者會訂閱它,並接收其發射的資料。

API

Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))

示例程式碼

createObserver()
        .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {

            @Override
            public Observable<String> call(Throwable t) {
                return Observable.just("a","b","c");
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

輸出結果如下:

onSuccess value = 1
onSuccess value = 2
onSuccess value = a
onSuccess value = b
onSuccess value = c
onCompleted

在手動建立Observale時,當Observable傳送了第二個資料後,Observable傳送了onError通知,然後又傳送了3個數據。在onErrorResumeNext方法中的引數函式中,建立了一個新的Observable。

從Log列印可以看出,觀察者並沒有執行onError方法,意味著Observale並沒有接收到onError通知,而是接收到了新建的建立了一個新的Observable發射的出具。在新Observable發射完資料後,呼叫了onCompleted方法,結束了此次訂閱。

onExceptionResumeNext

這裡寫圖片描述

onExceptionResumeNext方法與onErrorResumeNext方法類似建立並返回一個擁有類似原Observable的新Observable,,也使用這個備用的Observable。不同的是,如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。

這裡要普及一個概念,Java的異常分為錯誤(error)和異常(exception)兩種,它們都是繼承於Throwable類。

  • 錯誤(error)一般是比較嚴重的系統問題,比如我們經常遇到的OutOfMemoryError、StackOverflowError等都是錯誤。錯誤一般繼承於Error類,而Error類又繼承於Throwable類,如果需要捕獲錯誤,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。使用try..catch(Exception e)句式無法捕獲錯誤

  • 異常(Exception)也是繼承於Throwable類,一般是根據實際處理業務丟擲的異常,分為執行時異常(RuntimeException)和普通異常。普通異常直接繼承於Exception類,如果方法內部沒有通過try..catch句式進行處理,必須通過throws關鍵字把異常丟擲外部進行處理(即checked異常);而執行時異常繼承於RuntimeException類,如果方法內部沒有通過try..catch句式進行處理,不需要顯式通過throws關鍵字丟擲外部,如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是執行時異常,當然RuntimeException也是繼承於Exception類,因此是可以通過try..catch(Exception e)句式進行捕獲處理的。

API

Javadoc: onExceptionResumeNext(Observable))

示例程式碼

/*
 * onExceptionResumeNext:
 * 和onErrorResumeNext類似,可以說是onErrorResumeNext的特例,
 * 區別是如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。
 */
createObserver()
        .onExceptionResumeNext(Observable.just("www.stay4it.com"))
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

輸出結果如下:

onSuccess value = 1
onSuccess value = 2
onError error = java.lang.Throwable: Throw error

從Log列印可以看出,沒有使用備用的Observable,這是因為onError收到的Throwable不是一個Exception,所以將錯誤傳遞給觀察者的onError方法。那麼怎麼才能使用備用的Observable呢?程式碼如下:

createObserver2()// 注意這裡
        .onExceptionResumeNext(Observable.just("www.stay4it.com"))
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

輸出結果如下:

onSuccess value = onNext:1
onSuccess value = onNext:2
onSuccess value = www.stay4it.com
onCompleted

Retry

顧名思義,retry的意思就是試著重來,當原始Observable發射onError通知時,retry操作符不會讓onError通知傳遞給觀察者,它會重新訂閱這個Observable一次或者多次(意味著重新從頭髮射資料),所以可能造成資料項重複傳送的情況。

如果重新訂閱了指定的次數還是發射了onError通知,將不再嘗試重新訂閱,它會把最新的一個onError通知傳遞給觀察者。

這裡寫圖片描述

RxJava中將Retry操作符的實現為retry和retryWhen兩種。

retry操作符預設在trampoline排程器上執行。

  • Javadoc: retry():無論收到多少次onError通知,都會繼續訂閱並重發原始Observable,直到onCompleted。

  • Javadoc: retry(long):接受count引數的retry會最多重新訂閱count次,如果次數超過了就不會嘗試再次訂閱,它會把最新的一個onError通知傳遞給他的觀察者。

  • Javadoc: retry(Func2): 這個版本的retry接受一個謂詞函式作為引數,這個函式的兩個引數是:重試次數和導致發射onError通知的Throwable。這個函式返回一個布林值,如果返回true,retry應該再次訂閱和映象原始的Observable,如果返回false,retry會將最新的一個onError通知傳遞給它的觀察者。

API

Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)

示例程式碼

/**
 * retry()
 * 無限次嘗試重新訂閱
 */
createObserver()
        .retry()
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

/**
 * retry(count)
 * 最多2次嘗試重新訂閱
 */
createObserver()
        .retry(2)
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

/**
 * retry(Func2)
 */
createObserver()
        .retry(new Func2<Integer, Throwable, Boolean>() {

            @Override
            public Boolean call(Integer t1, Throwable throwable) {
                System.out.println("發生錯誤了:"+throwable.getMessage()+",第"+t1+"次重新訂閱");    
                if(t1>2){
                    return false;//不再重新訂閱
                }
                //此處也可以通過判斷throwable來控制不同的錯誤不同處理
                return true;
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

輸出結果如下:

onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
…無限次



onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
onError error = java.lang.Throwable: Throw error



onSuccess value = 1
onSuccess value = 2
發生錯誤了:Throw error,第1次重新訂閱
onSuccess value = 1
onSuccess value = 2
發生錯誤了:Throw error,第2次重新訂閱
onSuccess value = 1
onSuccess value = 2
發生錯誤了:Throw error,第3次重新訂閱
onError error = java.lang.Throwable: Throw error

retryWhen

這裡寫圖片描述

retryWhen和retry類似,區別是,retryWhen將onError中的Throwable傳遞給一個函式,這個函式產生另一個Observable,retryWhen觀察它的結果再決定是不是要重新訂閱原始的Observable。如果這個Observable發射了一項資料,它就重新訂閱,如果這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者然後終止。
  
retryWhen()預設在trampoline排程器上執行,可以通過引數指定其它的排程器。

API

Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)

示例程式碼

int retryCount = 0;
final int maxRetries = 3;

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new RuntimeException("always fails"));
            }
        })
        .subscribeOn(Schedulers.immediate())
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {

                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                if (++retryCount <= maxRetries) {
                                    // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                    System.out.println("get error, it will try after " + 1000 + " millisecond, retry count " + retryCount);
                                    return Observable.timer(1000, TimeUnit.MILLISECONDS);
                                }
                                return Observable.error(throwable);
                            }
                        });
                    }


                })
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onNext(Integer value) {
                        System.out.println("onSuccess value = " + value);
                    }

                    @Override
                    public void onError(Throwable error) {
                        System.out.println("onError error = " + error);
                    }
                });

輸出結果如下:

get error, it will try after 1000 millisecond, retry count 1
get error, it will try after 1000 millisecond, retry count 2
get error, it will try after 1000 millisecond, retry count 3
onError error = java.lang.RuntimeException: always fails