1. 程式人生 > >RxJava之錯誤處理

RxJava之錯誤處理

在Observable發射資料時,有時傳送onError通知,導致觀察者不能正常接收資料。可是,有時我們希望對Observable發射的onError通知做出響應或者從錯誤中恢復。此時我們該如何處理呢?下面我們瞭解下RxJava錯誤處理相關的操作符。

catch

流程圖

這裡寫圖片描述

概述

catch操作符攔截原Observable的onError通知,將它替換為其它的資料項或資料序列,讓產生的Observable能夠正常終止或者根本不終止。

在RxJava中,catch實現為三個不同的操作符:

  • onErrorReturn:讓Observable遇到錯誤時發射一個特殊的項並且正常終止。
  • onErrorResumeNext:讓Observable在遇到錯誤時開始發射第二個Observable的資料序列。
  • onExceptionResumeNext:讓Observable在遇到錯誤時繼續發射後面的資料項。

onErrorReturn

流程圖

這裡寫圖片描述

概述

onErrorReturn方法建立並返回一個擁有類似原Observable的新Observable,後者會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,作為替代,它會通過引數函式,建立一個特殊項併發發射,最後呼叫觀察者的onCompleted方法。

API

Javadoc: onErrorReturn(Func1))

示例程式碼

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onErrorReturn(new Func1<Throwable, Student>() {
            @Override
            public Student call(Throwable throwable) {
                return new Student(1001, "error - 1 ", 10);
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                mAdaStudent.addData(student);
            }
        });

Log列印

OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onCompleted

示例解析

在手動建立Observale時,當Observable傳送了第三個資料後,Observable傳送了onError通知,然後又傳送了2個數據。而在onErrorReturn方法處理中,其引數函式中,建立並返回了一個特殊項( new Student(1001, “error - 1 “, 10)).

從Log列印可以看出,觀察者並沒有執行onError方法,意味著Observale並沒有接收到onError通知,而是接收到了一個特殊項後,呼叫了onCompleted方法,結束了此次訂閱。而這個特殊項,正是在onErrorReturn中引數函式中,建立的特殊項。

onErrorResumeNext

流程圖

這裡寫圖片描述

概述

onErrorResumeNext方法建立並返回一個擁有類似原Observable的新Observable,後者會忽略前者的onError呼叫,不會將onError通知傳遞給觀察者,但作為替代,=新的Observable開始發射資料。

onErrorResumeNext方法與onErrorReturn()方法類似,都是攔截原Observable的onError通知,不同的是攔截後的處理方式,onErrorReturn建立並返回一個特殊項,而onErrorResumeNext建立並返回一個新的Observabl,觀察者會訂閱它,並接收其發射的資料。

API

Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))

示例程式碼

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onErrorResumeNext(new Func1<Throwable, Observable<Student>>() {
            @Override
            public Observable<Student> call(Throwable throwable) {
                return Observable.just(new Student(1001, "error - 1 ", 10), new Student(1002, "error - 2 ", 10));
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
            });

Log列印

OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onNext
Student{id='1002'name='error - 2 ', age=10}
OperateActivity: do onCompleted

示例解析

在手動建立Observale時,當Observable傳送了第三個資料後,Observable傳送了onError通知,然後又傳送了2個數據。在onErrorResumeNext方法中的引數函式中,建立了一個新的Observable。

從Log列印可以看出,觀察者並沒有執行onError方法,意味著Observale並沒有接收到onError通知,而是接收到了新建的建立了一個新的Observable發射的出具。在新Observable發射完資料後,呼叫了onCompleted方法,結束了此次訂閱。

onExceptionResumeNext

流程圖

這裡寫圖片描述

概述

onExceptionResumeNext方法與onErrorResumeNext方法類似建立並返回一個擁有類似原Observable的新Observable,,也使用這個備用的Observable。不同的是,如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。

API

Javadoc: onExceptionResumeNext(Observable))

示例程式碼

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onExceptionResumeNext(Observable.just(new Student(1001, "error - 1 ", 10),
                new Student(1002, "error - 2 ", 10)))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

 Observable.create(new Observable.OnSubscribe<Student>() {
        @Override
        public void call(Subscriber<? super Student> subscriber) {
            subscriber.onNext(getListOfStudent().get(0));
            subscriber.onNext(getListOfStudent().get(1));
            subscriber.onNext(getListOfStudent().get(2));
            subscriber.onError(new Exception("do onError"));
            subscriber.onNext(getListOfStudent().get(3));
            subscriber.onNext(getListOfStudent().get(4));
            subscriber.onNext(getListOfStudent().get(5));
        }
    }) ***

Log列印

1.
do onError  
2.
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onNext
Student{id='1002'name='error - 2 ', age=10}
OperateActivity: do onCompleted

示例解析

在建立Observale傳送OnError通知時,error採用了兩種方式,一個是Throwable,另外一個是Exception。從列印的Log中可以看出,在採用第一種方式時,原Observable直接傳送了onError通知,並結束髮射。但是採用發射Exception作為onError通知時,原Observale的onError通知被攔截,並使用了onExceptionResumeNext()建立的備用Observale。正如概述中敘述的,onExceptionResumeNext方法至攔截原Observale中Exception作為onError的通知,並將在引數函式中建立的備用Observable中的資料發射出去。

retry

流程圖

這裡寫圖片描述

概述

retry()操作符將攔截原Observable傳遞onError給觀察者,而是重新訂閱此Observable。由於是重新訂閱會造成資料重複。

在RxJava中,retry()操作符有幾個變體

retry()變體在出現onError通知時,將無限的重新訂閱原Observable.

retry(long)變體通過引數指定最多重新訂閱的次數,如果次數超了,它不會嘗試再次訂閱,它會把最新的一個onError通知傳遞給它的觀察者。

retry(Func2)變體通過引數接受兩個引數的函式,引數為重試次數和導致發射onError通知的Throwable,而函式返回一個布林值,如果返回true,retry應該再次訂閱原Observable,如果返回false,retry會將最新的一個onError通知傳遞給它的觀察者。

API

Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)

示例程式碼

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do onError"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).retry(3)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

Log列印

OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='4'name='D', age=24}
OperateActivity: do onNext
Student{id='5'name='E', age=33}
OperateActivity: do onNext
Student{id='6'name='F', age=23}

示例解析

從示例程式碼中可以看出,第一次訂閱時,發射完第三個通知後,傳送onError通知。但,通過Log列印可以清晰的看出,onError通知並沒有發射出去,而是重新訂閱,將之前發射的資料,重新發了一遍。正如之前說的,retry()操作符會攔截onError通知並重新訂閱,但是會造成資料的重複。

retryWhen

流程圖

這裡寫圖片描述

概述

retryWhen()預設在trampoline排程器上執行,可以通過引數指定其它的排程器。

API

Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)

示例程式碼

1.
Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do onError"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });
2.
***
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<Throwable> call(Observable<? extends Throwable> observable) {
        return Observable.error(new Throwable(" do retryWhen"));
    }
})
****

Log列印

1.
OperateActivity: do onNext
Student{id=’1’name=’A’, age=23}
OperateActivity: do onNext
Student{id=’2’name=’B’, age=33}
OperateActivity: do onNext
Student{id=’3’name=’C’, age=24}
OperateActivity: do onNext
Student{id=’1’name=’A’, age=23}
OperateActivity: do onNext
Student{id=’2’name=’B’, age=33}
OperateActivity: do onNext
Student{id=’3’name=’C’, age=24}
OperateActivity: do onNext
Student{id=’4’name=’D’, age=24}
OperateActivity: do onNext
Student{id=’5’name=’E’, age=33}
OperateActivity: do onNext
Student{id=’6’name=’F’, age=23}
2.
do onError

示例解析

示例1中,在retryWhend(Func1)的引數函式中,建立並返回了一個可發射資料的Observable物件,而在示例2中,其引數函式,建立並返回了一個發射onError通知的Observable。通過Log列印可以出,示例1在攔截了原Observable中的onError通知,並重新訂閱了原Observable,但是示例2中,觀察者接收了onError通知,意味著原Observable中的onError通知未被攔截,直接發射出去。示例2中,正體現了retryWhen()和retry()的不同之處。