Android訊息匯流排的演進之路【新的訊息匯流排框架】
正文:
對於 Android
系統來說,訊息傳遞是最基本的元件,每一個 App
內的不同頁面,不同元件都在進行訊息傳遞。訊息傳遞既可以用於 Android
四大元件之間的通訊,也可用於非同步執行緒和主執行緒之間的通訊。對於 Android
開發者來說,經常使用的訊息傳遞方式有很多種,從最早使用的 Handler
、 BroadcastReceiver
、介面回撥,到近幾年流行的通訊匯流排類框架 EventBus
、 RxBus
。 Android
訊息傳遞框架,總在不斷的演進之中。
從 EventBus 說起
EventBus
是一個 Android
事件釋出/訂閱框架,通過解耦釋出者和訂閱者簡化 Android
事件傳遞。 EventBus
可以代替 Android
傳統的 Intent
、 Handler
、 Broadcast
或介面回撥,在 Fragment
、 Activity
、 Service
執行緒之間傳遞資料,執行方法。
EventBus
最大的特點就是簡潔、解耦。在沒有 EventBus
之前我們通常用廣播來實現監聽,或者自定義介面函式回撥,有的場景我們也可以直接用 Intent
攜帶簡單資料,或者線上程之間通過 Handler
處理訊息傳遞。但無論是廣播還是 Handler
機制遠遠不能滿足我們高效的開發。 EventBus
簡化了應用程式內各元件間、元件與後臺執行緒間的通訊。 EventBus
一經推出,便受到廣大開發者的推崇。
現在看來, EventBus
給 Android
開發者世界帶來了一種新的框架和思想,就是訊息的釋出和訂閱。這種思想在其後很多框架中都得到了應用。

Picture
釋出/訂閱模式
訂閱釋出模式定義了一種“一對多”的依賴關係,讓多個訂閱者物件同時監聽某一個主題物件。這個主題物件在自身狀態變化時,會通知所有訂閱者物件,使它們能夠自動更新自己的狀態。

Picture
RxBus 的出現
RxBus
不是一個庫,而是一個檔案,實現只有短短30行程式碼。 RxBus
本身不需要過多分析,它的強大完全來自於它基於的 RxJava
技術。響應式程式設計( Reactive Programming
)技術這幾年特別火, RxJava
是它在 Java上
的實作。 RxJava
天生就是釋出/訂閱模式,而且很容易處理執行緒切換。所以, RxBus
憑藉區區30行程式碼,就敢挑戰 EventBus
“江湖老大”的地位。
RxBus 原理
在 RxJava
中有個 Subject
類,它繼承 Observable
類,同時實現了 Observer
介面,因此 Subject
可以同時擔當訂閱者和被訂閱者的角色,我們使用 Subject
的子類 PublishSubject
來建立一個 Subject
物件( PublishSubject
只有被訂閱後才會把接收到的事件立刻傳送給訂閱者),在需要接收事件的地方,訂閱該 Subject
物件,之後如果 Subject
物件接收到事件,則會發射給該訂閱者,此時 Subject
物件充當被訂閱者的角色。
完成了訂閱,在需要傳送事件的地方將事件傳送給之前被訂閱的 Subject
物件,則此時 Subject
物件作為訂閱者接收事件,然後會立刻將事件轉發給訂閱該 Subject
物件的訂閱者,以便訂閱者處理相應事件,到這裡就完成了事件的傳送與處理。
最後就是取消訂閱的操作了, RxJava
中,訂閱操作會返回一個 Subscription
物件,以便在合適的時機取消訂閱,防止記憶體洩漏,如果一個類產生多個 Subscription
物件,我們可以用一個 CompositeSubscription
儲存起來,以進行批量的取消訂閱。
RxBus 有很多實現,如:
其實正如前面所說的, RxBus
的原理是如此簡單,我們自己都可以寫出一個 RxBus
的實現:
基於 RxJava1 的 RxBus 實現:
public final class RxBus { private final Subject<Object, Object> bus; private RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); } private static class SingletonHolder { private static final RxBus defaultRxBus = new RxBus(); } public static RxBus getInstance() { return SingletonHolder.defaultRxBus; } /* * 傳送 */ public void post(Object o) { bus.onNext(o); } /* * 是否有Observable訂閱 */ public boolean hasObservable() { return bus.hasObservers(); } /* * 轉換為特定型別的Obserbale */ public <T> Observable<T> toObservable(Class<T> type) { return bus.ofType(type); } }
基於 RxJava2 的 RxBus 實現:
public final class RxBus2 { private final Subject<Object> bus; private RxBus2() { // toSerialized method made bus thread safe bus = PublishSubject.create().toSerialized(); } public static RxBus2 getInstance() { return Holder.BUS; } private static class Holder { private static final RxBus2 BUS = new RxBus2(); } public void post(Object obj) { bus.onNext(obj); } public <T> Observable<T> toObservable(Class<T> tClass) { return bus.ofType(tClass); } public Observable<Object> toObservable() { return bus; } public boolean hasObservers() { return bus.hasObservers(); } }
引入 LiveDataBus 的想法
從 LiveData 談起
LiveData
是 Android Architecture Components
提出的框架。 LiveData
是一個可以被觀察的資料持有類,它可以感知並遵循 Activity
、 Fragment
或 Service
等元件的生命週期。正是由於 LiveData
對元件生命週期可感知特點,因此可以做到僅在元件處於生命週期的啟用狀態時才更新 UI
資料。
LiveData
需要一個觀察者物件,一般是 Observer
類的具體實現。當觀察者的生命週期處於 STARTED
或 RESUMED
狀態時, LiveData
會通知觀察者資料變化;在觀察者處於其他狀態時,即使 LiveData
的資料變化了,也不會通知。
LiveData 的優點
-
UI 和實時資料保持一致,因為
LiveData
採用的是觀察者模式,這樣一來就可以在資料發生改變時獲得通知,更新UI
。 -
避免記憶體洩漏,觀察者被繫結到元件的生命週期上,當被繫結的元件銷燬(
destroy
)時,觀察者會立刻自動清理自身的資料。 -
不會再產生由於 Activity 處於 stop 狀態而引起的崩潰,例如:當
Activity
處於後臺狀態時,是不會收到LiveData
任何事件的。 -
不需要再解決生命週期帶來的問題,
LiveData
可以感知被繫結的元件的生命週期,只有在活躍狀態才會通知資料變化。 -
實時資料重新整理,當元件處於活躍狀態或者從不活躍狀態到活躍狀態時總是能收到最新的資料。
-
解決
Configuration Change
問題 ,在螢幕發生旋轉或者被回收再次啟動,立刻就能收到最新的資料。
談一談 Android Architecture Components
Android Architecture Components
的核心是 Lifecycle
、 LiveData
、 ViewModel
以及 Room
,通過它可以非常優雅的讓資料與介面進行互動,並做一些持久化的操作,高度解耦,自動管理生命週期,而且不用擔心記憶體洩漏的問題。
- Room
一個強大的 SQLite
物件對映庫。
- ViewModel
一類物件,它用於為 UI
元件提供資料,在裝置配置發生變更時依舊可以存活。
- LiveData
一個可感知生命週期、可被觀察的資料容器,它可以儲存資料,還會在資料發生改變時進行提醒。
- Lifecycle
包含 LifeCycleOwer
和 LifecycleObserver
,分別是生命週期所有者和生命週期感知者。
Android Architecture Components 的特點
- 資料驅動型程式設計
變化的永遠是資料,介面無需更改。
-
感知生命週期,防止記憶體洩漏
-
高度解耦
資料,介面高度分離。
- 資料持久化
資料、 ViewModel
不與 UI
的生命週期掛鉤,不會因為介面的重建而銷燬。
重點:為什麼使用 LiveData 構建資料通訊匯流排 LiveDataBus
使用 LiveData 的理由
-
LiveData 具有的這種可觀察性和生命週期感知的能力,使其非常適合作為 Android 通訊匯流排的基礎構件。
-
使用者不用顯示呼叫反註冊方法。
由於 LiveData
具有生命週期感知能力,所以 LiveDataBus
只需要呼叫註冊回撥方法,而不需要顯示的呼叫反註冊方法。這樣帶來的好處不僅可以編寫更少的程式碼,而且可以完全杜絕其他通訊匯流排類框架(如 EventBus
、 RxBus
)忘記呼叫反註冊所帶來的記憶體洩漏的風險。
為什麼要用LiveDataBus替代EventBus和RxBus
-
LiveDataBus 的實現極其簡單,相對
EventBus
複雜的實現,LiveDataBus
只需要一個類就可以實現。 -
LiveDataBus 可以減小 APK 包的大小,由於
LiveDataBus
只依賴Android
官方Android Architecture Components
元件的LiveData
,沒有其他依賴,本身實現只有一個類。作為比較,EventBus JAR
包大小為57kb
,RxBus
依賴RxJava
和RxAndroid
,其中RxJava2
包大小2.2MB
,RxJava1
包大小1.1MB
,RxAndroid
包大小9kb
。使用LiveDataBus
可以大大減小APK
包的大小。 -
LiveDataBus 依賴方支援更好,
LiveDataBus
只依賴Android
官方Android Architecture Components
元件的LiveData
,相比RxBus
依賴的RxJava
和RxAndroid
,依賴方支援更好。 -
LiveDataBus 具有生命週期感知,
LiveDataBus
具有生命週期感知,在Android
系統中使用呼叫者不需要呼叫反註冊,相比EventBus
和RxBus
使用更為方便,並且沒有記憶體洩漏風險。
LiveDataBus 的設計和架構
LiveDataBus 的組成
- 訊息
訊息可以是任何的 Object
,可以定義不同型別的訊息,如 Boolean
、 String
。也可以定義自定義型別的訊息。
- 訊息通道
LiveData
扮演了訊息通道的角色,不同的訊息通道用不同的名字區分,名字是 String
型別的,可以通過名字獲取到一個 LiveData
訊息通道。
- 訊息匯流排
訊息匯流排通過單例實現,不同的訊息通道存放在一個 HashMap
中。
- 訂閱
訂閱者通過 getChannel
獲取訊息通道,然後呼叫 observe
訂閱這個通道的訊息。
釋出
釋出者通過 getChannel
獲取訊息通道,然後呼叫 setValue
或者 postValue
釋出訊息。
LiveDataBus原理圖

Picture
LiveDataBus的實現
第一個實現:
public final class LiveDataBus { private final Map<String, MutableLiveData<Object>> bus; private LiveDataBus() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus DATA_BUS = new LiveDataBus(); } public static LiveDataBus get() { return SingletonHolder.DATA_BUS; } public <T> MutableLiveData<T> getChannel(String target, Class<T> type) { if (!bus.containsKey(target)) { bus.put(target, new MutableLiveData<>()); } return (MutableLiveData<T>) bus.get(target); } public MutableLiveData<Object> getChannel(String target) { return getChannel(target, Object.class); } } 短短二十行程式碼,就實現了一個通訊匯流排的全部功能,並且還具有生命週期感知功能,並且使用起來也及其簡單: 註冊訂閱: LiveDataBus.get().getChannel("key_test", Boolean.class) .observe(this, new Observer<Boolean>() { @Override public void onChanged(@Nullable Boolean aBoolean) { } });
傳送訊息:
LiveDataBus.get().getChannel("key_test").setValue(true);
我們傳送了一個名為"key_test",值為true的事件。
這個時候訂閱者就會收到訊息,並作相應的處理,非常簡單。
問題出現
對於LiveDataBus的第一版實現,我們發現,在使用這個LiveDataBus的過程中,訂閱者會收到訂閱之前釋出的訊息。對於一個訊息匯流排來說,這是不可接受的。無論EventBus或者RxBus,訂閱方都不會收到訂閱之前發出的訊息。對於一個訊息匯流排,LiveDataBus必須要解決這個問題。
問題分析
怎麼解決這個問題呢?先分析下原因:
當LifeCircleOwner的狀態發生變化的時候,會呼叫LiveData.ObserverWrapper的activeStateChanged函式,如果這個時候ObserverWrapper的狀態是active,就會呼叫LiveData的dispatchingValue。

Picture
在 LiveData
的 dispatchingValue
中,又會呼叫 LiveData
的 considerNotify
方法。

Picture
在 LiveData
的 considerNotify
方法中,紅框中的邏輯是關鍵,如果 ObserverWrapper
的 mLastVersion
小於 LiveData
的 mVersion
,就會去回撥mObserver的onChanged方法。而每個新的訂閱者,其 version
都是 -1
, LiveData
一旦設定過其 version
是大於 -1
的(每次 LiveData
設定值都會使其 version
加 1
),這樣就會導致 LiveDataBus
每註冊一個新的訂閱者,這個訂閱者立刻會收到一個回撥,即使這個設定的動作發生在訂閱之前。

Picture
問題原因總結
對於這個問題,總結一下發生的核心原因。對於 LiveData
,其初始的 version
是 -1
,當我們呼叫了其 setValue
或者 postValue
,其 vesion
會 +1
;對於每一個觀察者的封裝 ObserverWrapper
,其初始 version
也為 -1
,也就是說,每一個新註冊的觀察者,其 version
為 -1
;當 LiveData
設定這個 ObserverWrapper
的時候,如果 LiveData
的 version
大於 ObserverWrapper
的 version
, LiveData
就會強制把當前 value
推送給 Observer
。
如何解決這個問題
明白了問題產生的原因之後,我們來看看怎麼才能解決這個問題。很顯然,根據之前的分析,只需要在註冊一個新的訂閱者的時候把 Wrapper
的 version
設定成跟 LiveData
的 version
一致即可。
那麼怎麼實現呢,看看 LiveData
的 observe
方法,他會在 步驟1
建立一個 LifecycleBoundObserver
, LifecycleBoundObserver
是 ObserverWrapper
的派生類。然後會在 步驟2
把這個 LifecycleBoundObserver
放入一個私有 Map
容器 mObservers
中。無論 ObserverWrapper
還是 LifecycleBoundObserver
都是私有的或者包可見的,所以無法通過繼承的方式更改 LifecycleBoundObserver
的 version
。
那麼能不能從 Map
容器 mObservers
中取到 LifecycleBoundObserver
,然後再更改 version
呢?答案是肯定的,通過檢視 SafeIterableMap
的原始碼我們發現有一個 protected
的 get
方法。因此,在呼叫 observe
的時候,我們可以通過反射拿到 LifecycleBoundObserver
,再把 LifecycleBoundObserver
的 version
設定成和 LiveData
一致即可。

Picture
對於非生命週期感知的 observeForever
方法來說,實現的思路是一致的,但是具體的實現略有不同。 observeForever
的時候,生成的 wrapper
不是 LifecycleBoundObserver
,而是 AlwaysActiveObserver
( 步驟1
),而且我們也沒有機會在 observeForever
呼叫完成之後再去更改 AlwaysActiveObserver
的 version
,因為在 observeForever
方法體內, 步驟3
的語句,回撥就發生了。

Picture
那麼對於 observeForever
,如何解決這個問題呢?既然是在呼叫內回撥的,那麼我們可以寫一個 ObserverWrapper
,把真正的回撥給包裝起來。把 ObserverWrapper
傳給 observeForever
,那麼在回撥的時候我們去檢查呼叫棧,如果回撥是 observeForever
方法引起的,那麼就不回撥真正的訂閱者。
LiveDataBus最終實現
public final class LiveDataBus { private final Map<String, BusMutableLiveData<Object>> bus; private LiveDataBus() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus DEFAULT_BUS = new LiveDataBus(); } public static LiveDataBus get() { return SingletonHolder.DEFAULT_BUS; } public <T> MutableLiveData<T> with(String key, Class<T> type) { if (!bus.containsKey(key)) { bus.put(key, new BusMutableLiveData<>()); } return (MutableLiveData<T>) bus.get(key); } public MutableLiveData<Object> with(String key) { return with(key, Object.class); } private static class ObserverWrapper<T> implements Observer<T> { private Observer<T> observer; public ObserverWrapper(Observer<T> observer) { this.observer = observer; } @Override public void onChanged(@Nullable T t) { if (observer != null) { if (isCallOnObserve()) { return; } observer.onChanged(t); } } private boolean isCallOnObserve() { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (stackTrace != null && stackTrace.length > 0) { for (StackTraceElement element : stackTrace) { if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) && "observeForever".equals(element.getMethodName())) { return true; } } } return false; } } private static class BusMutableLiveData<T> extends MutableLiveData<T> { private Map<Observer, Observer> observerMap = new HashMap<>(); @Override public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); try { hook(observer); } catch (Exception e) { e.printStackTrace(); } } @Override public void observeForever(@NonNull Observer<T> observer) { if (!observerMap.containsKey(observer)) { observerMap.put(observer, new ObserverWrapper(observer)); } super.observeForever(observerMap.get(observer)); } @Override public void removeObserver(@NonNull Observer<T> observer) { Observer realObserver = null; if (observerMap.containsKey(observer)) { realObserver = observerMap.remove(observer); } else { realObserver = observer; } super.removeObserver(realObserver); } private void hook(@NonNull Observer<T> observer) throws Exception { //get wrapper's version Class<LiveData> classLiveData = LiveData.class; Field fieldObservers = classLiveData.getDeclaredField("mObservers"); fieldObservers.setAccessible(true); Object objectObservers = fieldObservers.get(this); Class<?> classObservers = objectObservers.getClass(); Method methodGet = classObservers.getDeclaredMethod("get", Object.class); methodGet.setAccessible(true); Object objectWrapperEntry = methodGet.invoke(objectObservers, observer); Object objectWrapper = null; if (objectWrapperEntry instanceof Map.Entry) { objectWrapper = ((Map.Entry) objectWrapperEntry).getValue(); } if (objectWrapper == null) { throw new NullPointerException("Wrapper can not be bull!"); } Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass(); Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion"); fieldLastVersion.setAccessible(true); //get livedata's version Field fieldVersion = classLiveData.getDeclaredField("mVersion"); fieldVersion.setAccessible(true); Object objectVersion = fieldVersion.get(this); //set wrapper's version fieldLastVersion.set(objectWrapper, objectVersion); } } }
註冊訂閱:
LiveDataBus.get() .with("key_test", String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String s) { } });
傳送訊息:
LiveDataBus.get().with("key_test").setValue(s);
原始碼說明
LiveDataBus的原始碼可以直接拷貝使用,也可以前往作者的GitHub倉庫檢視下載:加群獲取【 Android技術交流】: https://jq.qq.com/?_wv=1027&k=55YgqsK 。
總結
本文提供了一個新的訊息匯流排框架 —— LiveDataBus。訂閱者可以訂閱某個訊息通道的訊息,釋出者可以把訊息釋出到訊息通道上。利用LiveDataBus,不僅可以實現訊息匯流排功能,而且對於訂閱者,他們不需要關心何時取消訂閱,極大減少了因為忘記取消訂閱造成的記憶體洩漏風險。