1. 程式人生 > >Android異步框架RxJava 1.x系列(二) - 事件及事件序列轉換原理

Android異步框架RxJava 1.x系列(二) - 事件及事件序列轉換原理

方式 名稱 pri 新的 pos strong 之一 調度 star

前言

在介紹 RxJava 1.x 線程調度器之前,首先引入一個重要的概念 - 事件序列轉換。RxJava 提供了對事件序列進行轉換的支持,這是它的核心功能之一。

技術分享圖片

正文

1. 事件序列轉換定義

所謂轉換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列,有點類似 Java 1.8 中的流處理。

2. 事件序列轉換API

首先看一個 map() 的例子:

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數類型 String
            return getBitmapFromPath(filePath); // 返回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數類型 Bitmap
            showBitmap(bitmap);
        }
    });
復制代碼

這裏出現了一個叫 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用於包裝含有一個參數的方法。 Func1Action 的區別在於: Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣,FuncX 也有多個,用於不同參數個數的方法。同理,FuncXActionX 的區別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在經過 map() 方法後,事件的參數類型也由 String 轉為了 Bitmap。

這種直接轉換對象並返回的,是最常見的也最容易理解的變換。不過 RxJava

的轉換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。

下面給出幾個示例:

map()

事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。 map() 的示意圖如下:

技術分享圖片

flatMap()

這是一個很有用但非常難理解的變換。首先假設這麽一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
};

Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);
復制代碼

如果要打印出每個學生所需要修的所有課程的名稱呢?需求的區別在於,每個學生只有一個名字,但卻有多個課程,首先可以這樣實現:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
};

Observable.from(students)
    .subscribe(subscriber);
復制代碼

如果我不想在 Subscriber 中使用 for 循環,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對於代碼復用很重要)?用 map() 顯然是不行的,因為 map()一對一的轉化,而現在需要一對多的轉化。問題出現了:怎樣把一個 Student 轉化成多個 Course

這個時候,flatMap() 就派上了用場:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
};

Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
復制代碼

從上面的代碼可以看出,flatMap()map() 有一個相同點:它也是把傳入的參數轉化之後返回另一個對象。

flatMap()map() 不同的是,flatMap() 返回的是個 Observable 對象,並且這個 Observable 對象並不是被直接發送到 Subscriber 的回調方法中。

flatMap() 示意圖如下:

技術分享圖片

flatMap() 的原理是這樣的:

  1. 使用傳入的事件對象創建一個 Observable 對象;
  2. 並不立即發送這個 Observable, 而是將它激活,然後開始發送事件;
  3. 將每一個創建出來的 Observable 發送的事件,都被匯入同一個 Observable

而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,通過一組新創建的 Observable 將初始的對象『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat

3. 事件序列轉換原理

這些轉換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的轉換方法:lift(Operator)

lift()

首先看一下 lift() 的內部實現(核心代碼):

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}
復制代碼

這段代碼實現的功能,簡單來說就是創建了一個新的 Observable 並返回。如果看過上篇博客會發現有些蹊蹺。重溫一下 Observable.subscribe(Subscriber) 的實現(核心代碼):

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
復制代碼

對比一下以上兩段代碼的方法體(忽略返回值),會發現一行突兀的代碼:

Subscriber newSubscriber = operator.call(subscriber);
復制代碼

解釋一下 lift() 方法完成的操作:

  1. 利用 Observable.create() 方法創建一個新的 Observable 對象,加上之前的原始 Observable,已經有兩個 Observable

  2. 創建 Observable 的同時創建一個新的 OnSubscribe 用於發出事件。

  3. 通過 lift() 傳入的 Operator 函數的 call() 方法構造一個新的 Subscriber 對象,並將新 Subscriber 和原始 Subscriber 進行關聯。

  4. 利用這個新 Subscriber 向原始 Observable 進行訂閱,實現事件序列的轉換。

這種實現基於代理模式,通過事件攔截和處理實現事件序列的變換。

Observable 執行了 lift(Operator) 方法之後,會返回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,並在處理後發送給 Subscriber

整個過程的思維導圖如下:

技術分享圖片

或者可以看動圖:

技術分享圖片

兩次和多次的 lift() 同理,如下圖:

技術分享圖片

舉一個具體的 Operator 的實現。下面是一個將事件的 Integer 對象轉換成 String 的例子,僅供參考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 將事件序列中的 Integer 對象轉換為 String 對象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});
復制代碼

學習 lift() 的原理只是為了更好地理解 RxJava ,從而可以更好地使用它。然而RxJava 不建議開發者自定義 Operator 來直接使用 lift(),而是盡量使用已有的 lift() 包裝方法(如 map() flatMap() 等)進行組合。

compose()

除了 lift() 之外,Observable 還有一個轉方法叫做 compose()。它和 lift() 的區別在於,lift() 是針對事件項事件序列的,而 compose() 是針對 Observable 自身進行轉換。

舉個例子,假設在程序中有多個 Observable 都需要應用一組相同的 lift() 進行轉換,通常會這樣寫:

observable1.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

observable2.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);

observable3.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);

observable4.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
復制代碼

可以發現有太多重復代碼,代碼重構如下:

private Observable liftAll(Observable observable) {
    return observable.lift1()
        .lift2()
        .lift3()
        .lift4();
}

liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
復制代碼

可讀性、可維護性都提高了。可是 Observable 被一個方法包起來,這種方式對於 Observale 的靈活性進行了限制。怎麽辦?這個時候,就應該用 compose() 來解決了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable.lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}

Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
復制代碼

如上,使用 compose() 方法,Observable 可以利用傳入的 Transformer 對象的 call 方法直接對自身進行處理,而不是被包在方法的裏面。

小結

本文主要介紹了 RxJava 事件及事件序列轉換原理,其中 lift() 方法的使用方法和實現原理是重點、難點。後續將會介紹的 RxJava 線程調度器底層也是基於它實現的。


歡迎關註技術公眾號: 零壹技術棧

技術分享圖片

本帳號將持續分享後端技術幹貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分布式和微服務,架構學習和進階等學習資料和文章。



Android異步框架RxJava 1.x系列(二) - 事件及事件序列轉換原理