1. 程式人生 > >RxJava 詳解——簡潔的異步操作(二)

RxJava 詳解——簡潔的異步操作(二)

i++ 只有一個 tco 多個 etc 隊列 技術分享 () 而在

上次說的兩個例子,事件的發出和消費都是在同一個線程的。如果只用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是異步機制,因此異步對於 RxJava 是至關重要的。而要實現異步,則需要用到 RxJava 的另一個概念: Scheduler 。本文就來介紹一下Scheduler的用法:

3. 線程控制 —— Scheduler (一)

默認情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生產事件;在哪個線程生產事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調度器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——調度器,相當於線程控制器,RxJava 通過它來指定每一段代碼應該運行在什麽樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:

  • Schedulers.immediate(): 直接在當前線程運行,相當於不指定線程。這是默認的 Scheduler

  • Schedulers.newThread(): 總是啟用新線程,並在新線程執行操作。

  • Schedulers.io(): I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io()

    的內部實現是是用一個無數量上限的線程池,可以重用空閑的線程,因此多數情況下 io()newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創建不必要的線程。

  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread()

    ,它指定的操作將在 Android 主線程運行。

有了這幾個 Scheduler ,就可以使用 subscribeOn()observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。

有了這幾個 Scheduler ,就可以使用 subscribeOn()observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。

技術分享

上面這段代碼中,由於 subscribeOn(Schedulers.io()) 的指定,被創建的事件的內容 1到11 將會在 IO 線程發出;而由於 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數字的打印將發生在主線程 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io())observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用於多數的 『後臺線程取數據,主線程顯示』的程序策略。

然後會在最後顯示在textview主線程上。

技術分享

圖片是是一樣道理

技術分享

4. 變換

RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。

技術分享

這裏出現了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用於包裝含有一個參數的方法。 Func1Action的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用於不同參數個數的方法。FuncXActionX 的區別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在經過 map() 方法後,事件的參數類型也由 String轉為了 Bitmap。這種直接變換對象並返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:

map(): 事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換\

● flatMap(): 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。

首先假設這麽一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:

public String changstudent(){

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
}

很簡單。那麽再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在於,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:

   Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);

依然很簡單。那麽如果我不想在 Subscriber 中使用 for 循環,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對於代碼復用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現在的要求是一對多的轉化。那怎麽才能把一個 Student 轉化成多個 Course 呢?

這個時候,就需要用 flatMap() 了:

 Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);

關註微信公眾號獲得更多內容:

技術分享

RxJava 詳解——簡潔的異步操作(二)