1. 程式人生 > >Android:RxJava的使用

Android:RxJava的使用

說明:

RxJava用於非同步執行任務,跟建立子執行緒執行任務無本質區別,優點在於讓程式碼看起來整潔優雅些,並不能減少程式碼量

一、加入jar包依賴(app下的build.gradle):

dependencies {
    ...
    compile 'io.reactivex.rxjava2:rxjava:2.+'
    compile 'io.reactivex.rxjava2:rxandroid:2.+'
}

二、建立Observable(傳送資料):

1.通過create方法建立(需在subscribe方法中手動呼叫各個監聽方法,<>可以為任意型別)

Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
    @Override
    public void subscribe(ObservableEmitter<T> e) throws Exception {
        /*
        需要手動呼叫各個監聽方法
        */
        e.onNext(T);
        e.onComplete();
    }
});

2.通過just方法建立(自動按順序呼叫各個監聽方法,<>

可以為任意型別)

Observable<T> observable = Observable.just(T);

3.通過fromIterable方法建立(根據傳入的列表多次呼叫onNext方法,<>可以為任意型別)

List<T> list = new ArrayList<>();
list.add(T); //onNext方法會呼叫size次
Observable<T> observable = Observable.fromIterable(list);

4.通過defer方法建立(延時呼叫,<>可以為任意型別)

Observable<T> observable = Observable.defer(new Callable<ObservableSource<? extends T>>() {
    @Override
    public ObservableSource<? extends T> call() throws Exception {
        return Observable.just(T);
    }
});

5.通過interval方法建立(定時執行onNext方法,只能<Long>型別,onNext實參從0開始,每次+1)

Observable<Long> observable = Observable.interval(5, TimeUnit.SECONDS);  //定時5秒執行onNext,第1個引數為數值,第2個引數為單位,這裡為秒

6.通過range方法建立(執行指定次數的onNext方法,只能<Integer>型別,onNext實參從開始值開始,每次+1)

Observable<Integer> observable = Observable.range(開始值, 次數);

7.通過rangeLong方法建立(執行指定次數的onNext方法,只能<Long>型別,onNext實參從開始值開始,每次+1)

Observable<Long> observable = Observable.rangeLong(開始值, 次數);

8.通過timer方法建立(延時指定時間執行一次onNext方法,只能<Long>型別,onNext實參為0)

Observable<Long> observable = Observable.timer(5, TimeUnit.SECONDS);  //延時5秒執行一次onNext,第1個引數為延時時間,第2個引數為單位,這裡為秒

9.通過repeat方法建立(重複執行onNext方法,<>可以為任意型別)

Observable<T> observable = Observable.just(T).repeat();

三、建立監聽(監聽,接收資料):

1.Observer方式(執行訂閱事件後,無錯誤時,依次執行onSubscribeonNextonComplete):

Observer<T> observer = new Observer<T>() {
    /**
    *  順序:1
    **/
    @Override
    public void onSubscribe(Disposable d) {
    }

    /**
    *  順序:2
    **/
    @Override
    public void onNext(T arg) {
    }

    /**
    *  錯誤回撥
    **/
    @Override
    public void onError(Throwable e) {
    }

   /**
    *  順序:3
    **/
    @Override
    public void onComplete() {
    }
};

2.Consumer方式(執行訂閱事件後,accept方法會被執行):

Consumer consumer = new Consumer<T>() {
    @Override
    public void accept(T arg) throws Exception {
    }
};

四、執行訂閱事件(執行傳送資料動作):

1.執行事件:

observable.subscribe(observer);

2.取消事件:

Observable<T> observable = ...;

(1)獲得Disposable物件有2種方式:

方式1

Disposable disposable = observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    }
});

方式2

Disposable disposable = null;
Observer<T> observer = new Observer<T>() {
    @Override
    public void onSubscribe(Disposable d) {
        disposable = d;
    }
    ...
}

(2)取消事件

disposable.dispose();

五、操作符的使用:

1.map方法(型別轉換,將原資料型別轉換為目標型別):

(1)建立Observable時呼叫map轉換資料:

//A和B是2個不同的類
Observable<B> observable = Observable.just(new A()).map(new Function<A, B>() {
    @Override
    public B apply(A a) throws Exception {
        //資料轉換邏輯
        B b = ...;
        return b;
    }
});

(2)在監聽中接收的是轉換後的資料型別:

Observer<B> observer = new Observer<B>() {
    ...
    @Override
    public void onNext(B b) {
    }
};
observable.subscribe(observer);

2.flatMap方法(與fromIterable配合使用,將列表轉換為單個例項,呼叫sizeonNext方法):

List<T> list = ...;
Observable<T> observable = Observable.just(list).flatMap(new Function<List<T>, ObservableSource<T>>() {
    @Override
    public ObservableSource<T> apply(List<T> list) throws Exception {
        return Observable.fromIterable(list);  //onNext方法會呼叫size次,每次收到的實參都是T的單個物件
    }
});
Observer<B> observer = ...;
observable.subscribe(observer);

3.filter方法(過濾器,test中判斷條件,返回true才執行onNext):

Observable<T> observable = ...;
observable.filter(new Predicate<T>() {
    @Override
    public boolean test(T t) throws Exception {
        if (條件) {
            return true;  //返回true執行下一步,呼叫onNext
        }
        return false; //返回false不會執行onNext
    }
});
Observer<B> observer = ...;
observable.subscribe(observer);

4.take方法(指定onNext方法執行次數):

Observable<T> observable = ...;
observable.take(1).subscribe(...);

5.doOnNext方法(會在onNext方法之前執行)

Observable<T> observable = ...;
observable.doOnNext(new Consumer<T>() {
    @Override
    public void accept(T t) throws Exception {  //此方法會在onNext之前執行
    }
}).subscribe(...);

六、執行緒排程:

1.說明:

subscribeOn:設定Observable中任務的執行緒是哪種方式

observeOn:設定Observer中任務的執行在哪個執行緒

Schedulers.newThread():啟動一個新的執行緒

Schedulers.io():內部使用了無上限的執行緒池

Schedulers.computation():預設排程器,內部使用了固定的執行緒池

Schedulers.single():單執行緒

Schedulers.trampoline():按順序執行佇列中的任務

2.使用:

Observable<Integer> observable = ...;
observable.subscribeOn(Schedulers.io())              //控制Observable中subscribe方法在子執行緒中執行
        .observeOn(AndroidSchedulers.mainThread())   //控制Observer中的onNext、onError、onComplete方法在主執行緒(UI執行緒)執行
        .subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Integer arg) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

七、Backpressure策略:

1.Backpressure有以下幾種模式(預設只能存128個事件的快取池):

ERROR:快取池溢位時,丟擲MissingBackpressureException異常

BUFFER:設定更大的快取池

DROP:超上限時丟棄掉

LATEST:同DROP,區別是最後一個事件能收到

2.使用:

(1)使用Flowable替換Observable

Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
    @Override
    public void subscribe(FlowableEmitter<T> fe) throws Exception {
        fe.onNext(t);
        fe.onComplete();
    }
}, BackpressureStrategy.ERROR); //此引數設定Backpressure策略

(2)使用Subscriber替換Observer,並在onSubscribe中呼叫Subscription.request()申請事件數量

Subscriber<T> subscriber = new Subscriber<T>() {
    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE); //表示向生產者申請可以消費的事件數量
    }
    ...
};

(3)執行訂閱事件

flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);