1. 程式人生 > >響應式程式設計--Android Rxjava的使用(一)

響應式程式設計--Android Rxjava的使用(一)

RxJava作為一個響應式程式設計庫,在Android開發者中非常的受歡迎,越來越多的人開始接觸並使用,作為一個Android開發的菜鳥,仔細研究了一下RxJava的知識,在此將一些學習的過程和心得記錄一下

首先介紹一下RxJava相關的概念

ReactiveX

ReactiveX 是一個專注於非同步程式設計與控制可觀察資料(或者事件)流的API。它組合了觀察者模式,迭代器模式和函數語言程式設計的優秀思想。

RxJava

RxJava 是 ReactiveX 在 Java 上的開源的實現。在Github上對此的解釋是:

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

翻譯過來的大概意思是:一個在 Java VM 上使用可觀察序列組成的非同步的、基於事件的程式庫

這句話比較抽象,簡單來說就是一個用於非同步操作的類庫。對Android執行緒瞭解的人都知道,Android的繪製和事件響應都是在主執行緒中的,為了保證介面能夠快速及時的響應使用者操作,很多耗時操作,比如讀寫檔案、請求網路,都會在子執行緒中去執行。在沒有RxJava之前,一般都會使用AsyncTask、Thread來實現,有了RxJava以後,實現起來就會變得簡單的多

本部落格將會分別介紹RxJava1.x和RxJava2.0兩個版本上的使用及不同點

RxJava1.x

RxJava使用的是通用的觀察者模式,RxJava中兩個主要的類:Observable(被觀察者) 和 Subscriber(訂閱者)。在 RxJava 上,一個 Observable 是一個發出資料流或者事件的類,Subscriber 是一個對這些發出的 items (資料流或者事件)進行處理(採取行動)的類

現在,著手實現一個簡單的Observable和Subscriber的建立和訂閱

使用RxJava需要引入類庫,Github上的連結

如果需要進行Android App的開發,同時需要引入類庫RxAndroid

首先,實現一個Observable,即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件

Observable observable = Observable.create(new OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super
Integer> subscriber) { // TODO Auto-generated method stub subscriber.onNext(0); subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); } });

create()方法傳入了一個OnSubscribe物件作為引數。OnSubscribe物件會被儲存在返回的Observable物件中,當Observable被訂閱的時候,OnSubscribe的call()方法會被呼叫,事件序列就會依照設定依次觸發。被觀察者可以傳送三種事件:onNext()、onError()、onComplete()。被觀察者傳送的事件會在觀察者中處理,這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式

接下來實現一個Observer,即觀察者,它決定事件觸發的時候有怎樣的行為

Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onNext(Integer integer) {
                // TODO Auto-generated method stub
                Log.i(TAG, integer.toString());
            }

            @Override
            public void onError(Throwable e) {
                // TODO Auto-generated method stub
                Log.i(TAG, e.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete()");
            }
        };

Observer是一個介面,RxJava還內建了一個實現Observer的抽象類:Subscriber。Subscriber對Observer介面進行了擴充套件,但他們的基本使用方式是完全一樣的

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                // TODO Auto-generated method stub
                Log.i(TAG, integer.toString());
            }

            @Override
            public void onError(Throwable e) {
                // TODO Auto-generated method stub
                Log.i(TAG, e.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete()");
            }
        };

最後,使用subscribe()方法實現訂閱,被觀察者就會向觀察者傳送事件

observable.subscribe(observer);
//observable.subscribe(subscriber);

輸出結果:

0
1
2
onComplete

上游的被觀察者傳送的事件遵循以下規則:

1、上游可以傳送無限個onNext,下游也可以接收無限個onNext

2、當上遊傳送了一個onComplete後,上游onComplete之後的事件會繼續傳送,而下游收到onComplete事件之後不再繼續接收事件

3、當上遊傳送了一個onError後,上游onError之後的事件會繼續傳送,而下游收到onError事件之後將不再繼續接收事件

4、上游可以不傳送onComplete或onError

5、onComplete和onError必須唯一併且互斥,即onComplete和onError只能傳送一個,不能兩個都發送

上述程式碼使用的是create()方法來建立事件佇列,create()是RxJava最基本的建立事件佇列的方法,下面是幾種其他建立事件佇列的方法

  • just()方法:佇列中的事件會被依次傳送給觀察者
Observable.just("Hello", "Hi", "RxJava").subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                // TODO Auto-generated method stub
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });
  • from(T[]) / from(Iterable
Observable.from(array).subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                // TODO Auto-generated method stub
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });
  • defer():有觀察者訂閱時才建立Observable,這種方式可以保證Observable的狀態是最新的
        str = "Test";
        Observable observable1 = Observable.just(str);

        Observable observable2 = Observable.defer(new Func0<Observable<String>>() {

            @Override
            public Observable<String> call() {
                // TODO Auto-generated method stub
                return Observable.just(str);
            }

        });

        str = "Change str";

        observable1.subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                // TODO Auto-generated method stub
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });

        observable2.subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                // TODO Auto-generated method stub
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });

輸出結果:

Test
Change str
  • interval():建立一個按固定時間間隔發射整數序列的Observable
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
            @Override
            public void onNext(Long aLong) {
                // TODO Auto-generated method stub
                Log.i(TAG, aLong.toString());
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });
  • range():建立一個發射特定序列的Observable,第一個引數為起始值,第二個為傳送的個數,如果為0則不傳送,如果為負數丟擲異常
Observable.range(0, 1000).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                // TODO Auto-generated method stub
                Log.i(TAG, aLong.toString());
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });
  • timer():建立一個Observable,它在一個給定的延遲後發射一個特殊的值,等同於Handler的postDelayed()
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
            @Override
            public void onNext(Long aLong) {
                // TODO Auto-generated method stub
                Log.i(TAG, aLong.toString());
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });
  • repeat():建立一個重複發射特定資料的Observable
String[] array = {"Hello", "Hi", "RxJava"};
Observable.from(array).repeat().subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String str) {
                // TODO Auto-generated method stub
                Log.i(TAG, str);
            }

            @Override
            public void onError(Throwable throwable) {
                // TODO Auto-generated method stub
                Log.i(TAG, throwable.toString());
            }

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        });

subscribe()還支援不完整的回撥,RxJava會自動根據定義創建出Subscriber

Action0 onCompleteAction = new Action0() {

            @Override
            public void call() {
                // TODO Auto-generated method stub
                Log.i(TAG, "onComplete");
            }
        };

        Action1<String> onNextAction = new Action1<String>() {
            public void call(String str) {
                Log.i(TAG, str);
            };
        };

        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable e) {
                // TODO Auto-generated method stub
                Log.i(TAG, e.toString());
            }
        };

Action0是RxJava的一個介面,它只有一個方法call(),這個方法沒有引數,也沒有返回值,所以可以被定義為onComplete()方法。Action1也是一個介面,它同樣只有一個方法call(T param),這個方法也沒有返回值,但有一個引數,與Action0同理,可以被定義為onNext()和onError()方法。

observable.subscribe(onNextAction);
observable.subscribe(onNextAction, onErrorAction);
observable.subscribe(onNextAction, onErrorAction, onCompleteAction);