1. 程式人生 > >Android函式響應式程式設計——RxJava的4大subject和執行緒控制

Android函式響應式程式設計——RxJava的4大subject和執行緒控制

Subject

Subject既可以是一個Observer也可以是一個Observable,它用來連線兩者。所以Subject被認為是Subject=Observable+Observer

1.PublishSubject

PublishSubject在被建立完成之後立刻開始發射資料,所有有一個風險:在所有的訂閱者訂閱之前,資料可能已經發送了 。

解決方法:1.確保所有訂閱者的訂閱比這個PublishSubject早2.使用ReplaySubject

2.BehaviorSubject

當觀察者訂閱BehaviorSubject的時候,BehaviorSubject就開始發射原始Observable最近發射的資料。如果這個時候還沒有收到任何資料,BehaviorSubject就會發射一個預設值。然後繼續發射其他任何來自原始Observer的資料。如果原始的Observer因為發生了一個錯誤而終止,BehaviorSubject將不會發射人惡化資料,但是會向Observable傳遞一個異常通知的。

3.ReplaySubject

不管觀察者什麼時候訂閱它,它都會把所有資料給觀察者。

限定他的範圍:設定Buffer的具體大小;設定具體的時間範圍。

注意:如果使用了ReplaySubject作為了觀察者,注意不要在多個執行緒中呼叫onNext、onCompleted和onError方法。這可能會導致順序錯亂,並且違反了Observer規則。

4.AsyncSubject

當被觀察者完成時,AsyncSubject只會發射來自原始Observer的最後一個數據。如果原始的Observer因為發生了錯誤而終止,AsyncSubject將不會發射任何資料,但是會向Observable傳遞一個異常通知。

RxJava的執行緒控制

1.內建的Scheduler

如果我們不指定執行緒,預設是在呼叫subscribe方法的執行緒上進行回撥的。如果我們想切換執行緒,就需要使用Scheduler。RxJava已經內建瞭如下5個Scheduler。

1.Schedulers.immediate():直接在當前執行緒執行。它是timeout、、timeInterval和timestamp操作符的預設排程器。

2.Schedulers.newThread():總是啟用新執行緒,並在新執行緒執行操作

3.Schedulers.io():i/o操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的Scheduler。行為模式和newThread差不多,區別在於io()的內部實現是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下io()比newThread()更有效率。

4.Schedulers.computation():計算所使用的Scheduler,例如圖形的計算。這個Scheduler使用固定執行緒池,大小為CPU核數。不要把i/o操作放在computation()中,否則i/o操作的等待時間會浪費cpu。它是buffer、debounce、delay、interval、sample和skip操作符的預設排程器。

5.Schedulers.trampoline():當我們想在當前執行緒執行一個任務時,並不是立即時,可以用.trampoline將它入隊。這個排程器將會處理它的佇列並且按序執行佇列中的每一個任務。它是repeat和retry的操作符預設的排程器。

6.RxAndroid中常用的Scheduler:AndroidSchedulers.mainThread(),主執行緒。

控制執行緒:採用subscribeOn和observeOn。

(rxjava的應用和原始碼文章先擱置一下,先看了網路請求框架的原始碼再來結合rxjava分析)