Android 常用開源框架源碼解析 系列 (十)Rxjava 異步框架
阿新 • • 發佈:2018-08-13
oid super 嚴重 ids 代碼 執行者 輸出 ... tin 一、Rxjava的產生背景
一、進行耗時任務
傳統解決辦法:
傳統手動開啟子線程,聽過接口回調的方式獲取結果
傳統解決辦法的缺陷:
隨著項目的深入、擴展。代碼量的增大會產生回調之中套回調的,耦合度高度增加的不利場景。對代碼維護和擴展是很嚴重的問題。
RxJava本質上是一個異步操作庫
優點:
使用簡單的邏輯,處理復雜 ,困難的異步操作事件庫;在一定程度上替代handler、AsyncTask等等
二、傳統的觀察者模式
使用場景
1、一個方面的操作依賴於另一個方面的狀態變化
2、如果在更改一個對象的時候,需要同時連帶改變其他的對象(不確定有多少對象需要改變)
3、當一個對象必須通知其他的對象,但是又希望這個對象和其他被通知的對象是松散耦合度的關系
在App開發過程中,有一個對象的狀態數據需要時常關註,很多個頁面的Ui都跟這個對象又有綁定關系。當這個對象發生改變的時候就需要通知所有跟他有關系的Ui都進行相應的改變。這種情況下就是一種觀察者模式的使用場景。
簡單來說:
A對象 對B對象的數據高度敏感,當B對象變化的一瞬間,A對象要做出反應。這時候A對象就是觀察者,B對象就是被觀察者
觀察者模式說白了就是眾多的觀察者對被觀察者的數據高度敏感變化的自身的一種反應。其反應的是一種 多對一的 關系。
組成:
(一)<interface>Observerable 被觀察者接口
a、registerObserver() :將觀察者註冊到被觀察者當中,是一個訂閱方法
b、removeObserver():將觀察者從被觀察者中移除,取消訂閱
c、notifyObservers():當被觀察者狀態改變的時候,該方法就會被調用 *****
ps:內部會調用觀察者的 update() 函數,來通知觀察者做出相應的數據改變,依次循環遍歷整個觀察者數量並獲取到觀察者並調用update()方法 進行相應的更新操作
(二)<class> ConcreteObserverable 被觀察者具體的實現
實現了被觀察者接口中的abc方法,並且定義了一個List<Observer>observers 用來保存註冊好的觀察者對象的
ps: 由於集合的範型參數 是接口 類型 所以不能是具體的Observer 實現類,只能是Observer的接口
接口的定義和設計 要為了以後的拓展而考慮
解析:這樣做的原因 讓一個被觀察者可能會有多個實現類的觀察者都有可能實現了Observerable 這個接口 。
這樣就能把觀察者和被觀察者 通過List 這個集合 進行解耦
(三)<interface> Observer 觀察者接口
Update() :接口
與被觀察者的notifyObservers()關聯進行相應的數據變化
(四)<class > ConcreteObserver 具體的觀察者
實現 update() 或是其他方法
三、Rxjava觀察者模式和基本用法
Rxjava 四要素
1、被觀察者
2、觀察者
3、訂閱
4、事件
事件
響應式編程 是基於異步數據流概念的編程模式。響應式編程的一個核心概念 事件
步驟
1、創建被觀察者:create
rxjava中:決定什麽時候觸發事件,以及決定觸發怎樣的事件
//第一步、創建被觀察者:Create
//Observable 也會轉換成subscriber進行響應的處理
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
//按代碼當中的順序進行響應式調用
//這裏的Subscriber就是觀察者,在被觀察者的回調中調用了觀察者的方法實際上就是一種事件的傳遞
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onCompleted();
}
});
//第二種創建 被觀察者對象的方法:
//通過just 方法 來創建被觀察者對象
Observable observableJust = Observable.just("1","2”); //最多有10個String參數
//第三種創建,通過from方法,把參數作為字符數組,然後添加到參數裏
String[] parameters = {“1”,”2"};
Observable observableFrom = Observable.from(parameters);
//第二步、創建觀察者:observer
//第二步、創建觀察者Observer,決定事件觸發會有怎樣的行為
Observer<Object> observer = new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
//實際就是傳統觀察者模式中的update()方法
@Override
public void onNext(Object o) {
}
};
//第三步,訂閱,通過被觀察者.subscribe(觀察者)
public void doRxjava() {
//第三部,訂閱
observable.subscribe(observer);
}
ps:註意 是被觀察者 訂閱 觀察者 !為了通過流式api 進行不同的操作符操作、線程控制都能通過鏈式調用來完善。
四、Rxjava創建Observable & observer
1、Observable(被觀察者)
2、OnSubscribe 對象 :被觀察者用來 通知觀察者的notifyObservers() 函數
3、Subscriber 觀察者
4、subscribe() ;通過該方法完成觀察者與被觀察之間的訂閱
A、創建被觀察者、Observable:
同時創建Observable 內部的一個onSubscribe對象作為參數傳遞到create()方法當中
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
//內部傳入一個OnSubscribe參數,最終會賦值給Observable的成員變量 onSubscribe
public static <T> Observable<T> create(OnSubscribe<T> f) {
ps:hook 可以被看作是一抽象的代理類,該代理類默認情況下不會對onSubscribe做任何處理。
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
//create()構造了一個新的被觀察者Observable對象,同時將參數賦值給Observable的成員變量 onSubscribe,生成被觀察者對象
return new Observable<T>(hook.onCreate(f));
}
B、創建觀察者、Subscriber or Observer :實現了Observer接口,和Subscription接口;
Subscription接口的實現:
void unsubscribe();//進行接觸綁定——不再有訂閱事件了,訂閱事件列表為空了調用發方法
boolean isUnsubscribed();//判斷是否接觸綁定,判斷是否已經取消了訂閱事件
public abstract class Subscriber<T> implements Observer<T>, Subscription
private final SubscriptionList subscriptions;//訂閱事件的集合 ,在這個集合List當中保存了所有這個觀察者的訂閱事件,當取消訂閱的時候,該List會有事件被刪除
C、訂閱關系:調用observable 內部的subscribe() 完成訂閱
public final Subscription subscribe(final Observer<? super T> observer) {
//傳入一個observer ,調用subscribe,最終轉型成subscribe
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber,
Observable<T> observable) {
…
subscriber.onStart();//空方法實現,需要的時候自己調用並實現
if (!(subscriber instanceof SafeSubscriber)) { //將subscriber 包裝成SafeSubscriber
ps:在SafeSubscriber()中執行了onCompled()、和onError()方法 ,就不會再執行onNext()方法
subscriber = new SafeSubscriber<T>(subscriber);
}
…
//調用完call方法,意味著完成了一次訂閱
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
ps:完成訂閱後默認會觸發 Observable.Onsubscribe<String>中的call 函數
五、Rxjava 的操作符
變換: 就是將事件序列中的對象 或整個序列進行加工再處理,轉換成不同的事件或是事件序列
map操作符:就是用來把一個事件轉換為另一個事件
/**
* 通過被觀察者Observabled 調用just方法創建被觀察者並傳入圖片的路徑地址,調用map操作符,對原來的觀察者進行數據流的變化操作。將String類型的圖片路徑轉換成bitmap,來完成map操作符的調用,map操作符會創建一個新的Observable 對象然後再鏈式調用subscribe完成訂閱關系
*/
private void map() {
Observable.just("map/image/map.png")
//通過map的鏈式調用,將String轉換成bitmap對象
.map(new Func1<String, Bitmap>() {
//Func1 是Rxjava中的接口,用於包裝含有參數的方法,
// func1中第一個參數的類型就代表Observable發射當前的類型;第二個參數是String類型將要轉換的類型
@Override
public Bitmap call(String filepath) {
return getBitmap(filepath);
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
//...
}
});
}
map()函數接受一個Func1類型的參數,然後把這個Func1應用到每一個由Observable發射的值上,將發射的值轉換為我們期望的值。
將參數重String類型參數 轉換成Bitmap 並返回
六、Rxjava 的map操作符的原理
lift()方法是Rxjava 所有操作符的核心方法
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
ps:OperatorMap 是實現了Operator操作符的一個接口
public final class OperatorMap<T, R> implements Operator<R, T>
在該類中核心方法call方法內,接受外部傳遞給它的subscriber觀察者
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
...
@Override
public void onNext(T t) {
try {
//通過調用onNext()完成觀察數據流的轉化
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
…
}
而transformer 是定義在func1 這個接口下的,通過Func1類中 call()方法的調用完成T —> 轉換成 R的操作
final Func1<? super T, ? extends R> transformer;
//transformer的作用就是將範型<T , > 轉換成 範型 < ,R>
public interface Func1<T, R> extends Function {
R call(T t);
}
//基本所有操作符內部都會用到lift ()函數的內部相關原理 ;
lift : 本質上是針對事件序列的處理和再發送
lift():
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//1、首先內部生成了一個新的Observable 被觀察者對象並返回
ps:new Observable——代理主要負責接收原始的Observable 發出的事件,當創建好了 new Observable 會將其發送給下面
的 Subscriber<? super T> st 讓其進行處理
returnnew Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//在新對象的OnSubscribe對象當中,通過call方法調用 拿到之前的生成的Observable,生成一個新的Subscriber對象
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so ‘onStart‘ it
st.onStart();
//將新建的Subscriber作為參數傳遞到call 方法中,在這給call方法中 完成了訂閱工作
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}
七、Rxjava 的flatmap操作符
flatmap 和map有一個共同點 : 均是用來進行事件轉化的
map 是將 String 類型 ——轉化——>bitmap ,一一對應,映射成一個新的事件
flatmap 是將 String 類型——轉化——>Observable————將所有的事件轉化成一個Observable然後由這個Observable進行統一的事件分發
/**
* flatMap -輸入URi地址返回Uri 列表
*/
private Subscription processNetAddress() {
return Observable.just(
"http://www.xxx",
"http://www.yyy",
"http://www.zzz"
)
//1、將傳入的String類型 事件對象,轉換成Observable()類型對象
.flatMap(new Func1<String, Observable<String>>() {
//2、不會直接發送這個Observable ,而是將這個Observable激活讓他自己開始發送事件
@Override
public Observable<String> call(String s) {
return null;
}
})
//3、每一個創建出來的Observable發送的事件,都被匯入同一個Observable
//接收到上面一連串的字符串完成輸出
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
}
八、Rxjava 的線程控制
在默認不指定線程的情況下,Rxjava 遵循的是線程不變的原則。
也就算說在哪個線程調用的subscribe ()訂閱方法,就會在哪個線程生產事件;所以在哪個線程生產了事件,就在哪個線程消費事件
Schedulers——線程控制符
Rxjava 通過該類進行線程的調度
Schedulers.immediate()
默認情況下,在當前線程運行,不切換任何線程默認操作
Schedulers.newThread()
總是啟用新線程,在新線程中執行相應的操作
Schedulers -io()
執行IO 流操作,讀寫文件之類的
ps:區別於newThread(),io()內部實現是一個無數量的線程池,可以更好的利用線程效率,優於newThread()
Schedulers-computation()
cpu密集計算使用的Schedulers
AndroidSchedulers.mainThread()
將指定的操作放在Android 的主線程中執行
線程控制
1、subscribeOn()
指定subscribe 訂閱觀察者時候所發生的線程,也就是Observerable內部的OnSubscribe()被激活時候所處的線程,通知各個觀察者開始執行相應的操作
2、observeOn()
指定subscribeOn()所運行在的線程,事件消費所在的線程
public void doThreadWithScheduler() {
Observable.just("x", "y", "z”)
//前面通過just()方法,所創建的3個事件內容的發出,會在io線程發出,後臺線程進行數據讀取
.subscribeOn(Schedulers.io())
//指定了subscribeOn訂閱的時候call方法裏函數將發生在主線程中,主線程顯示數據
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("thread" + s);
}
});
}
public void doWeatherCompute() {
//create 方法創建Observable對象,內部傳入OnSubscribe對象,該方法可以被理解成為notifyObservers()方法,
由它通知觀察者去進行相應的操作,也就是調用觀察者的 onNext()操作
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("");
// ...
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
通過線程Schedulers控制符+subscribeOn()和observeOn()完成主線程和子線程間的工作切換,替代復雜 ,易發生錯誤的newThreard() 和handler()
public void doWeatherCompute() {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("");
// ...
}
})
//指定subscribe(),發生在io線程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() { //訂閱上面的call()方法發生在io線程,可以進行耗時操作
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//ui 顯示工作的 邏輯 在onNext()
}
});
}
}
思考:subscribeOn 和 observeOn 各能調用幾次呢? 1/無限
答 :
observeOn()指定的是之後的工作所在的線程,因此有多次切換線程的需求,只要在每一個切換線程的位置調用一次observeOn()就能滿足需求。所以說observeOn()支持多次調用!
subscribeOn()位置可以放在observeOn()前後都可以 。但是!! subscribeOn()只能調用一次。
————subscribeOn()源碼
public final Observable<T> subscribeOn(Scheduler scheduler) {//返回值是一個Observable對象,返回創建一個新的被觀察者,通過新的被觀察者進行下面的事件
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
ps:OperatorSubscribeOn implements OnSubscribe<T> 實現了該接口
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
public void call(final Subscriber<? super T> subscriber) {
//用於線程控制 實現了Subscription接口 用於看是否取消訂閱的操作,取消後就不會接受觀察者發送的各類事件了
final Workerinner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
//根據傳遞的subscriber 創建一個新的Subscriber
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
//根據新的subscriber 通知到目標subscriber並調用其onNext()方法
subscriber.onNext(t);
}
…
//內部對線程進行一些判斷
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
//在該方法裏進行線程的控制操作
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
source.unsafeSubscribe(s);//source實際上是一個Obsrvable對象
…
}
同理,觀察者 Subscriber 也是現實了Subscription、和Observer接口的原因是當我們的觀察者Subscriber取消訂閱的時候,將持有事件列表中的所有Subscription 訂閱全部取消。也就不會再接受訂閱事件。
createWorker():
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);//通過newThreadWorker()完成創建
}
NewThreadWorker():
public NewThreadWorker(ThreadFactory threadFactory) {
//通過線程池 創建並操作線程
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
inner.schedule(): 是通過一個抽象類實現的abstract Subscription schedule,在其具體的實現類之一NewThreadWoker中通過schedule()方法實現具體邏輯
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {//通過executor 的方法可以判斷其還是根據線程池完成操作
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
source.unsafeSubscribe():
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
//在這裏調用call()函數表明整個subscribeOn()操作已經完成
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}
SubscribeOn方法小結
1、會新生成一個Observable
2、在其內部,onSubscribe對象會在目標Subscriber訂閱時候使用傳入的Scheduler的worker作為線程調度執行者
3、在對應的線程中通知原始Observable發送消息給這個過程中臨時生成的Subscriber
4、這個Subscriber又會通知到目標Subscriber,從而完成我們的subscribeOn的過程
————observeOn()源碼
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//最終通過lift() 函數操作
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
class OperatorObserveOn<T> implements Operator<T, T>實現了操作符的接口
…
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
//完成線程切換實際的對象ObserveOnSubscriber
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0{
…
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
//將結果緩存到隊列當中
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule(); //開啟真正的線程切換
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
}
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
//線程調度方法,同上面的方法一樣 也是使用executor並發庫來進行線程調度
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
...
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
...
}
SubscribeOn ()和 ObserveOn()
subscribeOn ()是通過新建Observable的方式,使用OnSubscribe()類的方式去做到線程切換的。不斷的調用subscribeOn()實際上最終只會使用第一個subscribeOn()的方法。
observeOn()是通過opeartor 操作符的形式去完成線程切換的,所以它的作用域和其他操作符一樣,是調用observeOn(0之後的鏈路
1、observeOn()指定的是它之後的操作所在的線程,通過observeOn()的多次調用,程序實現了線程的多次切換;
2、subscribeOn()的位置放在哪裏都可以,但它是只能調用一次的,原因就是subscribeOn是通過新建Observable的方法創建的
Android 常用開源框架源碼解析 系列 (十)Rxjava 異步框架