1. 程式人生 > >Android開發之從零開始學RxJava 2.x(一)認識Rxjava

Android開發之從零開始學RxJava 2.x(一)認識Rxjava

歡迎轉載,轉載請註明出處:https://mp.csdn.net/mdeditor/80772129
落地98k,沒有倍鏡怪誰,讓你扶我你卻丟個手雷。
這裡寫圖片描述
哈哈,大家好,喜歡裝逼的我又出現了,今天給大家帶來的是RxJava的相關討論,RxJava已經出來很久了,也是一直在用,但是總感覺沒有完全掌握它,所以花了點時間也閱讀了很多文章以及官方的文件,決定對其好好總結一番分享點有價值的文章,寫一個系列,對於RxJava1.x就不做討論了過去的就讓它過去吧0.0,在寫本部落格時GitHub上最新的RxJava版本是2.1.16,地址:https://github.com/ReactiveX/RxJava,那麼本系列也就是從0-1帶著大家一起學習RxJava2.x的用法。如有疑問歡迎留言,如有謬誤歡迎批評指正。

通過本篇部落格你將學到以下知識點
①什麼是RxJava
②RxJava的優勢是什麼
③RxJava如何使用
④主要用來做什麼?

一、認識RxJava及RxJava的優勢

首先來看第一個問題,RxJava到底是個什麼東東?
官網給出的描述是這樣的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
翻譯過來就是:RxJava是一個為使用可觀測的序列來組成非同步的、基於事件的程式而產生的庫。說白了RxJava就是用來解決非同步的,解決非同步的方法有很多為什麼RxJava這麼流行呢?肯定它有它強於其它方法的優勢。那咱們來看看它的優勢是什麼?

1.函式式風格:對可觀察資料流使用無副作用的輸入輸出函式,避免了程式裡錯綜複雜的狀態
2.程式碼書寫邏輯清晰:Rx的操作符通通常可以將複雜的難題簡化為邏輯非常簡單的程式碼,可讀性非常強,隨著程式邏輯的複雜,依然保持簡潔,解耦了各個模組操作,單一化,不巢狀。
3.非同步錯誤處理:傳統的try/catch沒辦法處理非同步計算,Rx提供了合適的錯誤處理機制
4.輕鬆使用併發:Rx的Observables和Schedulers讓開發者可以擺脫底層的執行緒同步和各種併發問題
5.輕量級框架、支援Java 8 lambda、支援Java 6+和Android 2.3+

有沒有感覺很厲害,有木有被RxJava的氣場震懾到?
這裡寫圖片描述

二、RxJava的使用

清楚了RxJava的定義以及優點之後,接著我們來看下RxJava如何使用,對於RxJava的使用首先必須明白以下三個概念
1.Observable(被觀察者)
2.Observer(觀察者)
3.Subscribe(訂閱)
為了更好的理解這三者之間的協作可以看下圖
RxJava協作流程圖
其中上游是Observable,下游是Observer,上游與下游建立連線是通過Subscribe方法建立連線的,這樣理解是不是更好理解?
這裡寫圖片描述
對於使用RxJava總共有三個步驟,可以參考上圖
①建立上游的Observable
②建立下游的Observer
③建立連線
說了這麼多,來搞一波程式碼嚐嚐鹹淡
首先我們在Gradle中做如下配置

implementation 'io.reactivex.rxjava2:rxjava:2.1.16'

接著就可以肆無忌憚的裝逼了,按照上述說的三個步驟程式碼如下

//建立上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

emitter.onNext("Hello");
emitter.onNext("Word");
emitter.onNext("!!!");

emitter.onComplete();

}
});

//建立下游的Observer
Observer observer=new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {

Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(String s) {

Log.d(TAG,"onNext:"+s);
}

@Override
public void onError(Throwable e) {

Log.d(TAG,"onError");
}

@Override
public void onComplete() {

Log.d(TAG,"onComplete");
}
};
//連線上游和下游
observable.subscribe(observer);

列印日誌如下:
流程日誌列印
我們來分析下執行的過程首先我們分別用程式碼建立了上游的Observable物件和下游的Observer物件,建立好這兩個物件之後通過Observable物件的subscribe()方法將上游與下游建立了連線。建立連線成功之後會呼叫下游observer的onSubscribe回撥走onSubscribe方法預示著上游與下游建立了連線,此時列印了“onSubscribe”,之後上游會依次傳送三個事件,程式碼如下

emitter.onNext("Hello");
emitter.onNext("Word");
emitter.onNext("!!!");
emitter.onComplete();

因為上游與下游已經建立了連線所以下游會收到這三個事件,對應的會走下游observer的onNext方法的回撥,所以會依次列印”Hello”、”Word”、”!!!”,之後上游傳送傳送emitter.onComplete();預示著上游傳送事件結束,此時會呼叫下游的onComplete方法的回撥,此時會列印onComplete。以上就是一個簡單的RxJava的小案例。
當然對於剛接觸到RxJava的同學可能還是有點陌生,那接下來咱們就來把這個案例理解透徹,首先來看看兩個物件ObservableEmitter和Disposable這兩個物件,其中Emitter是發射的意思,ObservableEmitter的原始碼實現中共有三個方法:

onNext:用來發送資料,可多次呼叫,每呼叫一次傳送一條資料
onError:用來發送異常通知,只發送一次,若多次呼叫則第二次呼叫時報錯
onComplete:用來發送完成通知,只發送一次,若多次呼叫只發送第一條

在一個正確執行的事件序列中,onCompleted() 和 onError() 必須唯一併且互斥,資料在傳送時,出現異常可以呼叫onError傳送異常通知也可以不呼叫,因為其所在的方法subscribe會丟擲異常,若資料在全部發送完之後均正常可以呼叫onComplete傳送完成通知;其中,onError與onComplete不做強制性呼叫,並且兩者是事件序列中的最後一個。Observable可以傳送無限個onNext, 觀察者也可以接收無限個onNext。Observable傳送了一個onComplete(或者onError)後,可以繼續傳送onComplete(或者onError)後續事件,但觀察者收到onComplete(或者onError)後不再接收事件。Observable可以不傳送onComplete或onError。

物件Observer中的三個方法(onNext,onError,onComplete)正好與Emitter中的三個方法相對應,分別對Emitter中對應方法的行為作出響應。

Emitter呼叫onNext傳送資料時,Observer會通過onNext接收資料。
Emitter呼叫onError傳送異常通知時,Observer會通過onError接收異常通知,此時不再接收上游傳送的資料(此時上游是可以傳送資料的)
Emitter呼叫onComplete傳送完成通知時,Observer會通過onComplete接收完成通知,並且不再接收上游傳送的資料(此時上游是可以傳送資料的)
這裡寫圖片描述
1.中間傳送一個oncomplete事件,原理圖如下:
這裡寫圖片描述
程式碼:

//建立上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

Log.d(TAG,"上游 發射---->a");
emitter.onNext("發射---->a");
Log.d(TAG,"上游 發射---->b");
emitter.onNext("發射---->b");

emitter.onComplete();

Log.d(TAG,"上游 發射---->c");
emitter.onNext("發射---->c");
Log.d(TAG,"上游 發射---->d");
emitter.onNext("發射---->d");

}
});

//建立下游的Observer
Observer observer=new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {

Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(String s) {

Log.d(TAG,"下游onNext接收:"+s);
}

@Override
public void onError(Throwable e) {

Log.d(TAG,"onError");
}

@Override
public void onComplete() {

Log.d(TAG,"onComplete");
}
};
//連線上游和下游
observable.subscribe(observer);

列印日誌如下:
這裡寫圖片描述
從日誌中可以看出在上游傳送onComplete方法之後,上游還是正常的傳送事件,但是下游卻沒有接收到上游傳送的事件。

2.中間傳送一個onerror,原理圖如下:
傳送onError事件
程式碼只需修改上游的傳送事件的程式碼:

//建立上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

Log.d(TAG,"上游 發射---->a");
emitter.onNext("發射---->a");
Log.d(TAG,"上游 發射---->b");
emitter.onNext("發射---->b");

emitter.onError(new NullPointerException());

Log.d(TAG,"上游 發射---->c");
emitter.onNext("發射---->c");
Log.d(TAG,"上游 發射---->d");
emitter.onNext("發射---->d");

}
});

日誌:
這裡寫圖片描述
從日誌中可以看出在傳送了onError事件後,上游是可以傳送資料的,但是下游將不再接收上游傳送的資料。

3.傳送多個onError或onComple
前面我們說到onComplete和onError必須唯一併且互斥若傳送多個onComplete是可以正常執行的, 依然是收到第一個onComplete就不再接收了, 但若是傳送多個onError, 則收到第二個onError事件會導致程式會崩潰,這一點留給大家去驗證。

瞭解完ObservableEmitter之後我們再來了解一下Disposable 這個物件,Disposable這個物件是用來切斷上游與下游的連線的一個物件,切斷之後上游可以繼續傳送事件,但是下游將不會再收到上游傳送的事件,廢話不多說咱們來看一段程式碼:

//建立上游的Observable
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

Log.d(TAG,"上游 發射---->a");
emitter.onNext("發射---->a");
Log.d(TAG,"上游 發射---->b");
emitter.onNext("發射---->b");
Log.d(TAG,"上游 發射---->c");
emitter.onNext("發射---->c");
Log.d(TAG,"上游 發射---->d");
emitter.onNext("發射---->d");

}
});

//建立下游的Observer
Observer observer=new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {

disposable=d;
Log.d(TAG,"onSubscribe");
}

@Override
public void onNext(String s) {

if(s.equals("發射---->b")){

disposable.dispose();
}

Log.d(TAG,"下游onNext接收:"+s);
}

@Override
public void onError(Throwable e) {

Log.d(TAG,"onError");
}

@Override
public void onComplete() {

Log.d(TAG,"onComplete");
}
};
//連線上游和下游
observable.subscribe(observer);

日誌:
DIsposable
從日誌中我們可以看出,在呼叫了dispose之後,上游的傳送是沒有受影響的,但是下游收不到上游傳送的資料。

對於之前所述的程式碼寫熟練之後就是經常說的鏈式呼叫

//RxJava鏈式呼叫
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

有木有感覺高大上?

四、真實專案中使用RxJava

到這裡可能部分同學對於RxJava可以用在什麼地方可能還不是特別清楚,其實RxJava的用處是非常非常大的,比如網路請求、讀寫檔案等等,多處可以用到,因為本系列是從零開始學,所以這裡我就先舉個簡單的例子比如網路請求RxJava可以與RxAndroid結合在一起使用對執行緒進行控制。程式碼如下

//RxJava真實開發中的作用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//通過設定此方法的回撥執行在子執行緒中,可以進行網路請求等一些耗時的操作
//比如請求網路拿到資料通過呼叫emitter.onNext(response);將請求的資料傳送到下游
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
//通過設定Observer執行在主執行緒,拿到網路請求的資料進行解析使用
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
//在此接收上游非同步獲取的資料,比如網路請求過來的資料進行處理
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

對於這個使用不懂沒關係先有個大致的印象RxJava到底可以用在什麼地方,後續會有一系列關於RxJava的文章來講解具體的用法,記得關注和點贊,好了,相信通過本篇部落格大家會對RxJava有了一定的認識。鎖定本臺敬請關注接下來的系列文章。