1. 程式人生 > >RxJava響應式程式設計之初級瞭解

RxJava響應式程式設計之初級瞭解

據說現在流行的開發模式是 Retrofit+RxJava+MVP+ButterKnife

今天我就簡單來學習下RxJava的相關知識
以前我也只是聽說過RxJava,RxJava這個到底是什麼東西呢?
呵呵,它其實是一個庫,所以我們使用裡面的方法,得需要下載庫,所以我們需要在AS中進行配置

1.RxJava 地址以及新增

依賴庫新增:
compile ‘io.reactivex:rxjava:1.1.6’
或者
compile ‘io.reactivex:rxandroid:1.2.1’

2.RxJava是什麼型別的庫?它的原理是什麼?

RxJava 在 GitHub 主頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。這就是 RxJava ,概括得非常精準。

Rx:函式響應式程式設計,也許這個詞對你我都很只可意會,不可言傳,先拋開Rx不說,我們接觸到的類似的這樣的思路,大概有介面回撥、Handler通訊、廣播通訊、還有一個開源的EventBus、以及ContentPorivider裡面的觀察者模式、AsyncTask 我們朦朧中也許就大概瞭解RxJava是怎麼個東西了。

RxJava 的非同步實現,是通過一種擴充套件的觀察者模式來實現的。
至於觀察者我就拿ContentProvider來說吧,比如我們在一個ContentProvider中有一個insert方法,插入完畢後,去通知某個監聽該URI變化的介面

getContext().getContentResolver
().notifyChange(URI, null);

比如在MainActivity中去registerContentObserver註冊一個內容觀察者

private static final Uri URI = Uri  
            .parse("content://com.example.contentprovider.MyContentProvider/student");  
 contentResolver.registerContentObserver(uri, true, observer);
 private ContentObserver observer = new
ContentObserver(null) { public void onChange(boolean selfChange) { // 說明資料有改變,重新查詢一直所有記錄 Uri uri = Uri.parse("content://com.example.contentprovider.MyContentProvider/student"); Cursor cursor = contentResolver.query(uri, null, null, null, null); Log.e("TAG", "onChange() count=" + cursor.getCount()); }; };

那麼對於RxJava 的觀察者模式呢?
RxJava 有四個基本概念:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。
對比ContentProvider,RxJava裡面的被觀察者就是某個Uri(也就是某個資料庫),觀察者就是某個介面(比如MainActivity),訂閱就是registerContentObserver,事件就是insert方法

Rxjava本質主要就是非同步任務 外層構建了一個觀察者的設計模式 它更簡潔 我們呼叫api方法 幾乎看不到 方法裡面的 分執行緒資料操作 它只是利用了 一種觀察者的設計模式 來進行 主分執行緒的通訊 來進行響應操作

我們來看看RxJava的察者模式流程互動圖:
這裡寫圖片描述

與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext() (相當於 insert)之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

我們來看下Observable是如何定義的?

     Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })

我們看看create原始碼裡面做了什麼?

   public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

可以看到,這裡傳入了一個 OnSubscribe 物件作為引數。OnSubscribe 會被儲存在返回的 Observable 物件中,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber 將會被呼叫一次 onNext() 和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

然後在看Observer中做了什麼操作?

subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

接下來我們將全部的程式碼貼出來,看看Log日誌列印:

同步方式

package com.example.administrator.myapplication;
import android.app.Activity;
import android.os.Bundle;
import android.util.Log;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;

public class MainActivity extends Activity {
    private String TAG = "MainActivity";
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });
    }
}
call: threadId:1
onNext: threadId:1
onNext: s = Hello World!
onCompleted: threadId:1

從上可以看出,事件的處理和結果的接收都是在同一個執行緒裡面處理的。但是,Rxjava的意義何在,非同步呢?別急,看以下程式碼的處理,你就會發現了,非同步原來是這麼的簡單。
非同步方式
我們將上面的程式碼稍微改下,增加2行程式碼

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
                subscriber.onStart();
                subscriber.onNext("Hello World!");
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

我們添上如下2行程式碼

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

我們看下Log輸出

03-08 07:20:18.101 22734-22734/? I/MainActivity: testFunction: threadId:1
03-08 07:20:18.123 22734-22755/? D/MainActivity: call: threadId:180
03-08 07:20:18.142 22734-22734/? D/MainActivity: onNext: threadId:1
03-08 07:20:18.142 22734-22734/? I/MainActivity: onNext: s = Hello World!
03-08 07:20:18.143 22734-22734/? D/MainActivity: onCompleted: threadId:1

看見了沒,第二行log日誌threadId與其它的threadId很明顯的不一樣啊,說明我們在處理事件的時候,發生在了一個新的執行緒裡面,而結果的接收,還是在主執行緒裡面操作的。怎麼樣,只要新增兩句話,非同步立馬就實現了,非同步處理耗時操作,就是這麼easy。

我們簡單看下原始碼

 public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

什麼意思呢?
Scheduler scheduler引數就是執行訂閱操作,返回一個源觀察到的修改,使其訂閱發生在指定的執行緒
observeOn(AndroidSchedulers.mainThread())
句話說,observeOn() 指定的是它之後的操作所在的執行緒。

列印字串陣列 from和just方式

以上是RxJava的很基礎很簡單的一個用法,那麼我們接著往下看,比如我們有一組需求把一個String陣列的字串,單個打印出來,我們用Rxjava怎麼實現呢?看程式碼:

Log.i(TAG, "testFunction: threadId:" + Thread.currentThread().getId());
        Observable.from(new String[]{"one","two","three","four"})
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });

看Log輸出日誌如下:

testFunction: threadId:1
onNext: s = one
onNext: s = two
onNext: s = three
onNext: s = four
onCompleted: threadId:1

From操作符用來將某個物件轉化為Observable物件,並且依次將其內容發射出去。這個類似於just,但是just會將這個物件整個發射出去。比如說一個含有3個字串的陣列,使用from就會發射4次,每次發射一個數字,而使用just會發射一次來將整個的陣列發射出去。

 Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
        Observable.just("one", "two", "three", "four")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i(TAG, "onNext: s = " + s);
                    }
                });
03-08 08:09:25.743 32155-32155/? I/MainActivity: testFunction: threadId:1
03-08 08:09:25.784 32155-32155/? I/MainActivity: onNext: s = one
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = two
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = three
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = four
03-08 08:09:25.785 32155-32155/? D/MainActivity: onCompleted: threadId:1

一對一轉換

 Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
        Observable.just("1", "2", "3", "4")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<String, Integer>() {

                    @Override
                    public Integer call(String s) {
                        Log.i(TAG, "call: s = "+s);
                        return Integer.parseInt(s);
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.i(TAG, "call: integer = "+integer);
                    }
                });
testFunction: threadId:1
call: s = 1
call: integer = 1
call: s = 2
call: integer = 2
call: s = 3
call: integer = 3
call: s = 4
call: integer = 4

簡單說一下Func1,其中的T表示傳入的引數型別,R表示方法返回的引數型別。原始碼如下:

public interface Func1<T, R> extends Function {
    R call(T t);
}

上例中還有一個叫做 Action1的類。也是 RxJava 的一個介面,用於包裝含有無引數的方法。 Func1 和 Action 的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用於不同引數個數的方法。FuncX 和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將引數中的 String 物件轉換成一個 Integer物件後返回,而在經過 map() 方法後,事件的引數型別也由 String 轉為了 Integer。這種直接變換物件並返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件物件,還可以針對整個事件佇列,這使得 RxJava 變得非常靈活。

封裝Observable一對多轉換
map轉換,是一對一的轉換,像示例當中,我們把string轉成int,但是當我們需要一對多的轉換,該怎麼做呢?比如說,定義一個學生類:

package com.example.administrator.myapplication;

import java.util.List;

public class Student {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    //////////////////////////
    private List<String> courses;


    public List<String> getCourses() {
        return courses;
    }

    public void setCourses(List<String> courses) {
        this.courses = courses;
    }
}

        Student student1 = new Student();
        student1.setName("safly");
        List<String> courses = new ArrayList<>();
        courses.add("語文");
        courses.add("數學");
        courses.add("英語");
        student1.setCourses(courses);

        Student student2 = new Student();
        student2.setName("wyf");
        List<String> courses2 = new ArrayList<>();
        courses2.add("化學");
        courses2.add("地理");
        courses2.add("政治");
        student2.setCourses(courses2);



        Observable.just(student1,student2)
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        Log.i(TAG, "call: name = "+student.getName());
                        List<String> course = student.getCourses();
                        for(String str:course){
                            Log.i(TAG, "call: str = "+str);
                        }
                    }
                });

這裡我們沒有進行轉換,直接just傳送過來,沒有用到轉換,然後在call中進行直接輸出了

call: name = safly
call: str = 語文
call: str = 數學
call: str = 英語
call: name = wyf
call: str = 化學
call: str = 地理
call: str = 政治

我們用轉換來試試看?
這裡我們用到了flatmap這一函式,按通俗的一點理解:我們首先把Student轉成了Observable,然後呢,又把student.getCourses()轉成string挨個打印出來,結果如下:

 Observable.just(student1,student2)
                  .flatMap(new Func1<Student, Observable<String>>() {
                      @Override
                      public Observable<String> call(Student student) {
                       Log.i(TAG, "Observable " );
                          return Observable.from(student.getCourses());
                      }
                  })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, "call: s = "+s);     
                    }
                });

輸出

 Observable
call: s = 語文
call: s = 數學
call: s = 英語
 Observable
call: s = 化學
call: s = 地理
call: s = 政治

我們還記得Observable.from嘛?
From操作符用來將某個物件轉化為Observable物件

  public static <T> Observable<T> from(Iterable<? extends T> iterable) {
        return create(new OnSubscribeFromIterable<T>(iterable));
    }