RxJava響應式程式設計之初級瞭解
據說現在流行的開發模式是 Retrofit+RxJava+MVP+ButterKnife
今天我就簡單來學習下RxJava的相關知識
以前我也只是聽說過RxJava,RxJava這個到底是什麼東西呢?
呵呵,它其實是一個庫,所以我們使用裡面的方法,得需要下載庫,所以我們需要在AS中進行配置
1.RxJava 地址以及新增
依賴庫新增:
compile ‘io.reactivex:rxjava:1.1.6’
或者
compile ‘io.reactivex:rxandroid:1.2.1’
2.RxJava是什麼型別的庫?它的原理是什麼?
RxJava 在 GitHub 主頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。這就是 RxJava ,概括得非常精準。
Rx:函式響應式程式設計,也許這個詞對你我都很只可意會,不可言傳,先拋開Rx不說,我們接觸到的類似的這樣的思路,大概有介面回撥、Handler通訊、廣播通訊、還有一個開源的EventBus、以及ContentPorivider裡面的觀察者模式、AsyncTask 我們朦朧中也許就大概瞭解RxJava是怎麼個東西了。
RxJava 的非同步實現,是通過一種擴充套件的觀察者模式來實現的。
至於觀察者我就拿ContentProvider來說吧,比如我們在一個ContentProvider中有一個insert方法,插入完畢後,去通知某個監聽該URI變化的介面
getContext().getContentResolver ().notifyChange(URI, null);
比如在MainActivity中去registerContentObserver註冊一個內容觀察者
private static final Uri URI = Uri
.parse("content://com.example.contentprovider.MyContentProvider/student");
contentResolver.registerContentObserver(uri, true, observer);
private ContentObserver observer = new ContentObserver(null) {
public void onChange(boolean selfChange) {
// 說明資料有改變,重新查詢一直所有記錄
Uri uri = Uri.parse("content://com.example.contentprovider.MyContentProvider/student");
Cursor cursor = contentResolver.query(uri, null, null, null, null);
Log.e("TAG", "onChange() count=" + cursor.getCount());
};
};
那麼對於RxJava 的觀察者模式呢?
RxJava 有四個基本概念:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。
對比ContentProvider,RxJava裡面的被觀察者就是某個Uri(也就是某個資料庫),觀察者就是某個介面(比如MainActivity),訂閱就是registerContentObserver,事件就是insert方法
Rxjava本質主要就是非同步任務 外層構建了一個觀察者的設計模式 它更簡潔 我們呼叫api方法 幾乎看不到 方法裡面的 分執行緒資料操作 它只是利用了 一種觀察者的設計模式 來進行 主分執行緒的通訊 來進行響應操作
我們來看看RxJava的察者模式流程互動圖:
與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext() (相當於 insert)之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
我們來看下Observable是如何定義的?
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
subscriber.onStart();
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
})
我們看看create原始碼裡面做了什麼?
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
可以看到,這裡傳入了一個 OnSubscribe 物件作為引數。OnSubscribe 會被儲存在返回的 Observable 物件中,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber 將會被呼叫一次 onNext() 和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
然後在看Observer中做了什麼操作?
subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
Log.i(TAG, "onNext: s = " + s);
}
});
onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。
接下來我們將全部的程式碼貼出來,看看Log日誌列印:
同步方式
package com.example.administrator.myapplication;
import android.app.Activity;
import android.os.Bundle;
import android.util.Log;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
public class MainActivity extends Activity {
private String TAG = "MainActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
subscriber.onStart();
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
Log.i(TAG, "onNext: s = " + s);
}
});
}
}
call: threadId:1
onNext: threadId:1
onNext: s = Hello World!
onCompleted: threadId:1
從上可以看出,事件的處理和結果的接收都是在同一個執行緒裡面處理的。但是,Rxjava的意義何在,非同步呢?別急,看以下程式碼的處理,你就會發現了,非同步原來是這麼的簡單。
非同步方式
我們將上面的程式碼稍微改下,增加2行程式碼
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
subscriber.onStart();
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
Log.i(TAG, "onNext: s = " + s);
}
});
我們添上如下2行程式碼
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
我們看下Log輸出
03-08 07:20:18.101 22734-22734/? I/MainActivity: testFunction: threadId:1
03-08 07:20:18.123 22734-22755/? D/MainActivity: call: threadId:180
03-08 07:20:18.142 22734-22734/? D/MainActivity: onNext: threadId:1
03-08 07:20:18.142 22734-22734/? I/MainActivity: onNext: s = Hello World!
03-08 07:20:18.143 22734-22734/? D/MainActivity: onCompleted: threadId:1
看見了沒,第二行log日誌threadId與其它的threadId很明顯的不一樣啊,說明我們在處理事件的時候,發生在了一個新的執行緒裡面,而結果的接收,還是在主執行緒裡面操作的。怎麼樣,只要新增兩句話,非同步立馬就實現了,非同步處理耗時操作,就是這麼easy。
我們簡單看下原始碼
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
什麼意思呢?
Scheduler scheduler引數就是執行訂閱操作,返回一個源觀察到的修改,使其訂閱發生在指定的執行緒
observeOn(AndroidSchedulers.mainThread())
句話說,observeOn() 指定的是它之後的操作所在的執行緒。
列印字串陣列 from和just方式
以上是RxJava的很基礎很簡單的一個用法,那麼我們接著往下看,比如我們有一組需求把一個String陣列的字串,單個打印出來,我們用Rxjava怎麼實現呢?看程式碼:
Log.i(TAG, "testFunction: threadId:" + Thread.currentThread().getId());
Observable.from(new String[]{"one","two","three","four"})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
});
看Log輸出日誌如下:
testFunction: threadId:1
onNext: s = one
onNext: s = two
onNext: s = three
onNext: s = four
onCompleted: threadId:1
From操作符用來將某個物件轉化為Observable物件,並且依次將其內容發射出去。這個類似於just,但是just會將這個物件整個發射出去。比如說一個含有3個字串的陣列,使用from就會發射4次,每次發射一個數字,而使用just會發射一次來將整個的陣列發射出去。
Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
Observable.just("one", "two", "three", "four")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
});
03-08 08:09:25.743 32155-32155/? I/MainActivity: testFunction: threadId:1
03-08 08:09:25.784 32155-32155/? I/MainActivity: onNext: s = one
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = two
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = three
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = four
03-08 08:09:25.785 32155-32155/? D/MainActivity: onCompleted: threadId:1
一對一轉換
Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.i(TAG, "call: s = "+s);
return Integer.parseInt(s);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: integer = "+integer);
}
});
testFunction: threadId:1
call: s = 1
call: integer = 1
call: s = 2
call: integer = 2
call: s = 3
call: integer = 3
call: s = 4
call: integer = 4
簡單說一下Func1,其中的T表示傳入的引數型別,R表示方法返回的引數型別。原始碼如下:
public interface Func1<T, R> extends Function {
R call(T t);
}
上例中還有一個叫做 Action1的類。也是 RxJava 的一個介面,用於包裝含有無引數的方法。 Func1 和 Action 的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用於不同引數個數的方法。FuncX 和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。
可以看到,map() 方法將引數中的 String 物件轉換成一個 Integer物件後返回,而在經過 map() 方法後,事件的引數型別也由 String 轉為了 Integer。這種直接變換物件並返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件物件,還可以針對整個事件佇列,這使得 RxJava 變得非常靈活。
封裝Observable一對多轉換
map轉換,是一對一的轉換,像示例當中,我們把string轉成int,但是當我們需要一對多的轉換,該怎麼做呢?比如說,定義一個學生類:
package com.example.administrator.myapplication;
import java.util.List;
public class Student {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
//////////////////////////
private List<String> courses;
public List<String> getCourses() {
return courses;
}
public void setCourses(List<String> courses) {
this.courses = courses;
}
}
Student student1 = new Student();
student1.setName("safly");
List<String> courses = new ArrayList<>();
courses.add("語文");
courses.add("數學");
courses.add("英語");
student1.setCourses(courses);
Student student2 = new Student();
student2.setName("wyf");
List<String> courses2 = new ArrayList<>();
courses2.add("化學");
courses2.add("地理");
courses2.add("政治");
student2.setCourses(courses2);
Observable.just(student1,student2)
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
Log.i(TAG, "call: name = "+student.getName());
List<String> course = student.getCourses();
for(String str:course){
Log.i(TAG, "call: str = "+str);
}
}
});
這裡我們沒有進行轉換,直接just傳送過來,沒有用到轉換,然後在call中進行直接輸出了
call: name = safly
call: str = 語文
call: str = 數學
call: str = 英語
call: name = wyf
call: str = 化學
call: str = 地理
call: str = 政治
我們用轉換來試試看?
這裡我們用到了flatmap這一函式,按通俗的一點理解:我們首先把Student轉成了Observable,然後呢,又把student.getCourses()轉成string挨個打印出來,結果如下:
Observable.just(student1,student2)
.flatMap(new Func1<Student, Observable<String>>() {
@Override
public Observable<String> call(Student student) {
Log.i(TAG, "Observable " );
return Observable.from(student.getCourses());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: s = "+s);
}
});
輸出
Observable
call: s = 語文
call: s = 數學
call: s = 英語
Observable
call: s = 化學
call: s = 地理
call: s = 政治
我們還記得Observable.from嘛?
From操作符用來將某個物件轉化為Observable物件
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(new OnSubscribeFromIterable<T>(iterable));
}