響應式程式設計--Android Rxjava的使用(一)
RxJava作為一個響應式程式設計庫,在Android開發者中非常的受歡迎,越來越多的人開始接觸並使用,作為一個Android開發的菜鳥,仔細研究了一下RxJava的知識,在此將一些學習的過程和心得記錄一下
首先介紹一下RxJava相關的概念
ReactiveX
ReactiveX 是一個專注於非同步程式設計與控制可觀察資料(或者事件)流的API。它組合了觀察者模式,迭代器模式和函數語言程式設計的優秀思想。
RxJava
RxJava 是 ReactiveX 在 Java 上的開源的實現。在Github上對此的解釋是:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
翻譯過來的大概意思是:一個在 Java VM 上使用可觀察序列組成的非同步的、基於事件的程式庫
這句話比較抽象,簡單來說就是一個用於非同步操作的類庫。對Android執行緒瞭解的人都知道,Android的繪製和事件響應都是在主執行緒中的,為了保證介面能夠快速及時的響應使用者操作,很多耗時操作,比如讀寫檔案、請求網路,都會在子執行緒中去執行。在沒有RxJava之前,一般都會使用AsyncTask、Thread來實現,有了RxJava以後,實現起來就會變得簡單的多
本部落格將會分別介紹RxJava1.x和RxJava2.0兩個版本上的使用及不同點
RxJava1.x
RxJava使用的是通用的觀察者模式,RxJava中兩個主要的類:Observable(被觀察者) 和 Subscriber(訂閱者)。在 RxJava 上,一個 Observable 是一個發出資料流或者事件的類,Subscriber 是一個對這些發出的 items (資料流或者事件)進行處理(採取行動)的類
現在,著手實現一個簡單的Observable和Subscriber的建立和訂閱
使用RxJava需要引入類庫,Github上的連結
如果需要進行Android App的開發,同時需要引入類庫RxAndroid
首先,實現一個Observable,即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件
Observable observable = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// TODO Auto-generated method stub
subscriber.onNext(0);
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
}
});
create()方法傳入了一個OnSubscribe物件作為引數。OnSubscribe物件會被儲存在返回的Observable物件中,當Observable被訂閱的時候,OnSubscribe的call()方法會被呼叫,事件序列就會依照設定依次觸發。被觀察者可以傳送三種事件:onNext()、onError()、onComplete()。被觀察者傳送的事件會在觀察者中處理,這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式
接下來實現一個Observer,即觀察者,它決定事件觸發的時候有怎樣的行為
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
// TODO Auto-generated method stub
Log.i(TAG, integer.toString());
}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
Log.i(TAG, e.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete()");
}
};
Observer是一個介面,RxJava還內建了一個實現Observer的抽象類:Subscriber。Subscriber對Observer介面進行了擴充套件,但他們的基本使用方式是完全一樣的
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
// TODO Auto-generated method stub
Log.i(TAG, integer.toString());
}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
Log.i(TAG, e.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete()");
}
};
最後,使用subscribe()方法實現訂閱,被觀察者就會向觀察者傳送事件
observable.subscribe(observer);
//observable.subscribe(subscriber);
輸出結果:
0
1
2
onComplete
上游的被觀察者傳送的事件遵循以下規則:
1、上游可以傳送無限個onNext,下游也可以接收無限個onNext
2、當上遊傳送了一個onComplete後,上游onComplete之後的事件會繼續傳送,而下游收到onComplete事件之後不再繼續接收事件
3、當上遊傳送了一個onError後,上游onError之後的事件會繼續傳送,而下游收到onError事件之後將不再繼續接收事件
4、上游可以不傳送onComplete或onError
5、onComplete和onError必須唯一併且互斥,即onComplete和onError只能傳送一個,不能兩個都發送
上述程式碼使用的是create()方法來建立事件佇列,create()是RxJava最基本的建立事件佇列的方法,下面是幾種其他建立事件佇列的方法
- just()方法:佇列中的事件會被依次傳送給觀察者
Observable.just("Hello", "Hi", "RxJava").subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
// TODO Auto-generated method stub
Log.i(TAG, s);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
- from(T[]) / from(Iterable
Observable.from(array).subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
// TODO Auto-generated method stub
Log.i(TAG, s);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
- defer():有觀察者訂閱時才建立Observable,這種方式可以保證Observable的狀態是最新的
str = "Test";
Observable observable1 = Observable.just(str);
Observable observable2 = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
// TODO Auto-generated method stub
return Observable.just(str);
}
});
str = "Change str";
observable1.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
// TODO Auto-generated method stub
Log.i(TAG, s);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
observable2.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
// TODO Auto-generated method stub
Log.i(TAG, s);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
輸出結果:
Test
Change str
- interval():建立一個按固定時間間隔發射整數序列的Observable
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onNext(Long aLong) {
// TODO Auto-generated method stub
Log.i(TAG, aLong.toString());
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
- range():建立一個發射特定序列的Observable,第一個引數為起始值,第二個為傳送的個數,如果為0則不傳送,如果為負數丟擲異常
Observable.range(0, 1000).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
// TODO Auto-generated method stub
Log.i(TAG, aLong.toString());
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
- timer():建立一個Observable,它在一個給定的延遲後發射一個特殊的值,等同於Handler的postDelayed()
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onNext(Long aLong) {
// TODO Auto-generated method stub
Log.i(TAG, aLong.toString());
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
- repeat():建立一個重複發射特定資料的Observable
String[] array = {"Hello", "Hi", "RxJava"};
Observable.from(array).repeat().subscribe(new Subscriber<String>() {
@Override
public void onNext(String str) {
// TODO Auto-generated method stub
Log.i(TAG, str);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
Log.i(TAG, throwable.toString());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
});
subscribe()還支援不完整的回撥,RxJava會自動根據定義創建出Subscriber
Action0 onCompleteAction = new Action0() {
@Override
public void call() {
// TODO Auto-generated method stub
Log.i(TAG, "onComplete");
}
};
Action1<String> onNextAction = new Action1<String>() {
public void call(String str) {
Log.i(TAG, str);
};
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable e) {
// TODO Auto-generated method stub
Log.i(TAG, e.toString());
}
};
Action0是RxJava的一個介面,它只有一個方法call(),這個方法沒有引數,也沒有返回值,所以可以被定義為onComplete()方法。Action1也是一個介面,它同樣只有一個方法call(T param),這個方法也沒有返回值,但有一個引數,與Action0同理,可以被定義為onNext()和onError()方法。
observable.subscribe(onNextAction);
observable.subscribe(onNextAction, onErrorAction);
observable.subscribe(onNextAction, onErrorAction, onCompleteAction);