1. 程式人生 > >Android非同步框架RxJava 1.x系列(二)

Android非同步框架RxJava 1.x系列(二)

前言

在介紹 RxJava 1.x 執行緒排程器之前,首先引入一個重要的概念 - 事件序列轉換。RxJava 提供了對事件序列進行轉換的支援,這是它的核心功能之一。

正文

1. 事件序列轉換定義

所謂轉換,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列,有點類似 Java 1.8 中的流處理。

2. 事件序列轉換API

首先看一個 map() 的例子:

Observable.just("images/logo.png") // 輸入型別 String
    .map(new Func1<String, Bitmap>() {
        @Override
public Bitmap call(String filePath) { // 引數型別 String return getBitmapFromPath(filePath); // 返回型別 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 引數型別 Bitmap showBitmap(bitmap); } }
);

這裡出現了一個叫 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個介面,用於包裝含有一個引數的方法。 Func1Action 的區別在於: Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣,FuncX 也有多個,用於不同引數個數的方法。同理,FuncXActionX 的區別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將引數中的 String 物件轉換成一個 Bitmap 物件後返回,而在經過 map() 方法後,事件的引數型別也由 String 轉為了 Bitmap。

這種直接轉換物件並返回的,是最常見的也最容易理解的變換。不過 RxJava

的轉換遠不止這樣,它不僅可以針對事件物件,還可以針對整個事件佇列,這使得 RxJava 變得非常靈活。

下面給出幾個示例:

map()

事件物件的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。 map() 的示意圖如下:

flatMap()

這是一個很有用但非常難理解的變換。首先假設這麼一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
};

Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

如果要打印出每個學生所需要修的所有課程的名稱呢?需求的區別在於,每個學生只有一個名字,但卻有多個課程,首先可以這樣實現:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
};

Observable.from(students)
    .subscribe(subscriber);

如果我不想在 Subscriber 中使用 for 迴圈,而是希望 Subscriber 中直接傳入單個的 Course 物件呢(這對於程式碼複用很重要)?用 map() 顯然是不行的,因為 map()一對一的轉化,而現在需要一對多的轉化。問題出現了:怎樣把一個 Student 轉化成多個 Course

這個時候,flatMap() 就派上了用場:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
};

Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

從上面的程式碼可以看出,flatMap()map() 有一個相同點:它也是把傳入的引數轉化之後返回另一個物件。

flatMap()map() 不同的是,flatMap() 返回的是個 Observable 物件,並且這個 Observable 物件並不是被直接傳送到 Subscriber 的回撥方法中。

flatMap() 示意圖如下:

flatMap() 的原理是這樣的:

  1. 使用傳入的事件物件建立一個 Observable 物件;
  2. 並不立即傳送這個 Observable, 而是將它啟用,然後開始傳送事件;
  3. 將每一個創建出來的 Observable 傳送的事件,都被匯入同一個 Observable

而這個 Observable 負責將這些事件統一交給 Subscriber 的回撥方法。這三個步驟,把事件拆成了兩級,通過一組新建立的 Observable 將初始的物件『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat

3. 事件序列轉換原理

這些轉換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的轉換方法:lift(Operator)

lift()

首先看一下 lift() 的內部實現(核心程式碼):

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

這段程式碼實現的功能,簡單來說就是建立了一個新的 Observable 並返回。如果看過上篇部落格會發現有些蹊蹺。重溫一下 Observable.subscribe(Subscriber) 的實現(核心程式碼):

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

對比一下以上兩段程式碼的方法體(忽略返回值),會發現一行突兀的程式碼:

Subscriber newSubscriber = operator.call(subscriber);

解釋一下 lift() 方法完成的操作:

  1. 利用 Observable.create() 方法建立一個新的 Observable 物件,加上之前的原始 Observable,已經有兩個 Observable

  2. 建立 Observable 的同時建立一個新的 OnSubscribe 用於發出事件。

  3. 通過 lift() 傳入的 Operator 函式的 call() 方法構造一個新的 Subscriber 物件,並將新 Subscriber 和原始 Subscriber 進行關聯。

  4. 利用這個新 Subscriber 向原始 Observable 進行訂閱,實現事件序列的轉換。

這種實現基於代理模式,通過事件攔截和處理實現事件序列的變換。

Observable 執行了 lift(Operator) 方法之後,會返回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,並在處理後傳送給 Subscriber

整個過程的思維導圖如下:

或者可以看動圖:

兩次和多次的 lift() 同理,如下圖:

舉一個具體的 Operator 的實現。下面是一個將事件的 Integer 物件轉換成 String 的例子,僅供參考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 將事件序列中的 Integer 物件轉換為 String 物件
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

學習 lift() 的原理只是為了更好地理解 RxJava ,從而可以更好地使用它。然而RxJava 不建議開發者自定義 Operator 來直接使用 lift(),而是儘量使用已有的 lift() 包裝方法(如 map() flatMap() 等)進行組合。

compose()

除了 lift() 之外,Observable 還有一個轉方法叫做 compose()。它和 lift() 的區別在於,lift() 是針對事件項事件序列的,而 compose() 是針對 Observable 自身進行轉換。

舉個例子,假設在程式中有多個 Observable 都需要應用一組相同的 lift() 進行轉換,通常會這樣寫:

observable1.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

observable2.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);

observable3.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);

observable4.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

可以發現有太多重複程式碼,程式碼重構如下:

private Observable liftAll(Observable observable) {
    return observable.lift1()
        .lift2()
        .lift3()
        .lift4();
}

liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可讀性、可維護性都提高了。可是 Observable 被一個方法包起來,這種方式對於 Observale 的靈活性進行了限制。怎麼辦?這個時候,就應該用 compose() 來解決了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable.lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}

Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

如上,使用 compose() 方法,Observable 可以利用傳入的 Transformer 物件的 call 方法直接對自身進行處理,而不是被包在方法的裡面。

小結

本文主要介紹了 RxJava 事件及事件序列轉換原理,其中 lift() 方法的使用方法和實現原理是重點、難點。後續將會介紹的 RxJava 執行緒排程器底層也是基於它實現的。

歡迎關注技術公眾號: 零壹技術棧

零壹技術棧

本帳號將持續分享後端技術乾貨,包括虛擬機器基礎,多執行緒程式設計,高效能框架,非同步、快取和訊息中介軟體,分散式和微服務,架構學習和進階等學習資料和文章。