【一起學設計模式】觀察者模式實戰:真實專案中屢試不爽的瓜娃EventBus到底如何實現觀察者模式的?
申明
本文章首發自本人公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫
22.jpg
前言
之前出過一個設計模式的系列文章,這些文章和其他講設計模式的文章 有些不同
文章沒有拘泥於講解設計模式的原理,更多的是梳理工作中實際用到的一些設計模式,並提取出對應業務模型進行總結,回顧下之前的一些文章:
【一起學設計模式】策略模式實戰一:基於訊息傳送的策略模式實戰
【一起學習設計模式】策略模式實戰二:配合註解 幹掉業務程式碼中冗餘的if else…
【一起學設計模式】訪問者模式實戰:許可權管理樹刪除節點操作
【一起學設計模式】命令模式+模板方法+工廠方法實戰: 如何優雅的更新商品庫存…
【一起學設計模式】狀態模式+裝飾器模式+簡單工廠模式實戰:(一)提交個訂單我到底經歷了什麼鬼?
【一起學設計模式】中介者模式+觀察者模式+備忘錄模式實戰:(二)提交個訂單我到底經歷了什麼鬼?
所以:任何脫離實際業務的設計模式都是耍流氓
image.png
業務梳理
最近專案在對接神策埋點相關需求。
有一個場景是:產品自定義了很多埋點事件,有些事件需要後端進行一定的業務處理,然後進行埋點。
業務其實很簡單,就是前端請求到後端,後端進行一定業務處理組裝後將資料傳送到神策後臺。
說到這裡是不是還有小夥伴沒聽懂??那麼就畫張圖吧:
image.png
這裡只是簡單的舉個栗子,說明下業務場景。
針對這個業務場景,最開始的想法是儘量少的侵入原有業務方法,所以這裡選擇使用觀察者模式。
原有業務場景中加入釋出事件的能力,然後訂閱者自己消費進行埋點資料邏輯。做到儘可能的業務解耦。
觀察者模式
這裡還是要多囉嗦幾句,說下觀察者模式原理:
所謂的觀察者模式也稱為釋出訂閱模式,這裡肯定至少存在兩種角色:釋出者/訂閱者
接著看下UML圖:
image.png
所涉及到的角色如下:
- 抽象主題(Subject):提供介面,可以增加和剔除觀察者物件。一般用抽象類或者介面實現。
- 抽象觀察者(Observer):提供介面,在得到主題的通知時更新自己。一般用抽象類或者介面實現。
- 具體主題(ConcreteSubject):將有關狀態存入具體觀察者,在具體主題的內部狀態發生變化時,給所有註冊過的觀察者發出通知。一般是具體子類實現。
- 具體觀察者(ConcreteObserver):儲存與主題的狀態自恰的狀態。具體觀察者角色實現抽象觀察者角色所要求的更新介面,以便使本身的狀態與主題的狀態 像協調。如果需要,具體觀察者角色可以保持一個指向具體主題物件的引用
在上述類圖中,ConcreteSubject中有一個儲存Observer的列表,這意味著ConcreteSubject並不需要知道引用了哪些ConcreteObserver,只要實現(繼承)了Observer的物件都可以存到該列表中。在需要的時候呼叫Observer的update方法。
話不多說,我們自己動手來模擬一個簡單的觀察者模式:
/**
* 觀察者模式測試程式碼
*
* @author wangmeng
* @date 2020/4/25 19:38
*/
public class ObserverTest {
public static void main(String[] args) {
Subject subject = new Subject();
Task1 task1 = new Task1();
subject.addObserver(task1);
Task2 task2 = new Task2();
subject.addObserver(task2);
subject.notifyObserver("xxxx");
}
}
class Subject {
// observer集合
private List<Observer> observerList = Lists.newArrayList();
// add
public void addObserver(Observer observer) {
observerList.add(observer);
}
// remove
public void removeObserver(Observer observer) {
observerList.remove(observer);
}
// 通知觀察者
public void notifyObserver(Object object) {
for (Observer item : observerList) {
item.update(object);
}
}
}
interface Observer {
void update(Object object);
}
class Task1 implements Observer {
@Override
public void update(Object object) {
System.out.println("task1 received: " + object);
}
}
class Task2 implements Observer {
@Override
public void update(Object object) {
System.out.println("task2 received: " + object);
}
}
針對於觀察者模式,JDK和Spring也有一些內建實現,具體可以參見:JDK中Observable
,Spring中ApplicationListener
這裡就不再贅述了,想深入瞭解的小夥伴可執行谷歌,畢竟我們這次文章的重點還是Guava
中觀察者模式的使用實現原理。
業務程式碼示例
這裡使用的是Guava中自帶的EventBus元件,我們繼續用取消訂單業務場景做示例,這裡抽離了部分程式碼,只展示核心的一些程式碼:
1. 事件匯流排服務
/**
* 事件匯流排服務
*
* @author wangmeng
* @date 2020/4/14
*/
@Service
public class EventBusService {
/**
* 訂閱者非同步執行器,如果同步可以使用EventBus
**/
@Autowired
private AsyncEventBus asyncEventBus;
/**
* 訂閱者集合,裡面方法通過@Subscribe進行事件訂閱
**/
@Autowired
private EventListener eventListener;
/**
* 註冊方法,啟動的時候將所有的訂閱者進行註冊
**/
@PostConstruct
public void register() {
asyncEventBus.register(eventListener);
}
/**
* 訊息投遞,根據入參自動投遞到對應的方法中去消費。
*/
public void post(Object object) {
asyncEventBus.post(object);
}
}
這裡使用了非同步的實現方式,如果使用同步的方式可以將AsyncEventBus
改為EventBus
2. 非同步AsyncEventBus配置:
/**
* AsyncEventBus 執行緒池配置
*
* @author wangmeng
* @date 2020/04/14
*/
@Configuration
public class EventBusConfiguration {
/** Set the ThreadPoolExecutor's core pool size. */
private int corePoolSize = 10;
/** Set the ThreadPoolExecutor's maximum pool size. */
private int maxPoolSize = 30;
/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
private int queueCapacity = 500;
@Bean
public AsyncEventBus asyncEventBus() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("jv-mall-user-sensorsData:");
executor.initialize();
return new AsyncEventBus(executor);
}
}
執行緒池資料大家可以隨意配置,這裡只是參考。
3. 觀察者實現
/**
* 觀察者程式碼
*
* @author wangmeng
* @date 2020/4/14
*/
@Service
@Slf4j
public class EventListener {
@Autowired
private SensorsDataManager sensorsDataManager;
/**
* 觀察者處理資料埋點方法
*/
@Subscribe
@AllowConcurrentEvents
public void handleCancelOrderEvent(TrackCancelOrderDTO cancelOrderDTO) {
Map<String, Object> propertyMap = this.buildBasicProperties(cancelOrderDTO);
propertyMap.put(SensorsDataConstants.ORDER_ID, registerDTO.getOrderId());
// 各種屬性賦值,這裡只擷取一點
propertyMap.put(SensorsDataConstants.PROPERTY_IS_SUCCESS, registerDTO.getIsSuccess());
propertyMap.put(SensorsDataConstants.PROPERTY_FAIL_REASON, registerDTO.getFailReason());
sensorsDataManager.send(registerDTO.getUserId(), SensorsEventConstants.EVENT_CANCEL_ORDER, propertyMap);
}
}
這個EventLister
是我們在上面EventBusService
中註冊的類,觀察者方法上面新增@Subscribe
即可對釋出者的資料進行訂閱。
@AllowConcurrentEvents
註解字面意思是允許事件併發執行,這個原理後面會講。
PS:這裡sensorsDataManager
是封裝生成埋點相關的類。
釋出者實現
在業務邏輯中加入埋點資料釋出的方法:
@Autowired
private EventBusService eventBusService;
public void cancelOrder(Long orderId) {
// 業務邏輯執行
// 埋點資料
TrackCancelOrderDTO trackCancelOrderDTO = trackBaseOrderInfoManager.buildTrackBaseOrderDTO(orderInfoDO, context.getOrderParentInfoDO(), TrackCancelOrderDTO.class);
trackCancelOrderDTO.setCancelReason(orderInfoDO.getCancelReason());
trackCancelOrderDTO.setCancelTime(orderInfoDO.getCancelTime());
trackCancelOrderDTO.setPlatformName(SensorsDataConstants.PLATFORM_APP);
trackCancelOrderDTO.setUserId(orderInfoDO.getUserId().toString());
eventBusService.post(trackCancelOrderDTO);
}
到了這裡所有的如何使用EventBus
的程式碼都已經貼出來了,下面就看看具體的原始碼實現吧
原始碼剖析
事件匯流排訂閱原始碼實現
com.google.common.eventbus.SubscriberRegistry#register:
void register(Object listener) {
//查詢所有訂閱者,維護了一個key是事件型別,value是定訂閱這個事件型別的訂閱者集合的一個map
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
//獲取事件型別
Class<?> eventType = entry.getKey();
//獲取這個事件型別的訂閱者集合
Collection<Subscriber> eventMethodsInListener = entry.getValue();
//從快取中按事件型別查詢訂閱者集合
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
//從快取中取不到,更新快取
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
事件和訂閱事件的訂閱者集合是在com.google.common.eventbus.SubscriberRegistry這裡維護的:
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
到這裡,訂閱者已經準備好了,準備接受事件了。通過debug 看下subscribers
中資料:
image.png
釋出事件原始碼實現
com.google.common.eventbus.EventBus#post
public void post(Object event) {
//獲取事件的訂閱者集合
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//轉發事件
dispatcher.dispatch(event, eventSubscribers);
//如果不是死亡事件,重新包裝成死亡事件重新發布
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
Iterator<Subscriber> getSubscribers(Object event) {
//獲取事件型別類的超類集合
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());
for (Class<?> eventType : eventTypes) {
//獲取事件型別的訂閱者集合
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
事件轉發器有三種實現:
image.png
第一種是立即轉發,實時性比較高,其他兩種都是佇列實現。
我們使用的是AsyncEventBus
,其中指定的事件轉發器是:LegacyAsyncDispatcher
,接著看看其中的dispatch()
方法的實現:
com.google.common.eventbus.Dispatcher.LegacyAsyncDispatcher
private static final class LegacyAsyncDispatcher extends Dispatcher {
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
// 先將所有釋出的事件放入佇列中
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
// 消費佇列中的訊息
e.subscriber.dispatchEvent(e.event);
}
}
}
接著看subscriber.dispatchEvent()
方法實現:
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
執行訂閱方法都是非同步實現,我們在上面初始化AsyncEventBus
的時候有為其構造執行緒池,就是在這裡使用的。
在看invokeSubscriberMethod()
具體程式碼之前,我們先來看看@AllowConcurrentEvents
,我們在訂閱方法上有加這個註解,來看看這個註解的作用吧:
image.png
在我們執行register()
方法的時候,會為每一個訂閱者構造一個Subscriber
物件,如果配置了@AllowConcurrentEvents
註解,就會為它配置一個允許併發的Subscriber
物件。
class Subscriber {
/**
* Creates a {@code Subscriber} for {@code method} on {@code listener}.
*/
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
private static boolean isDeclaredThreadSafe(Method method) {
// 如果有AllowConcurrentEvents註解,則返回true
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
// 通過反射直接執行訂閱者中方法
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
@VisibleForTesting
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
// SynchronizedSubscriber不支援併發,這裡用synchronized修飾,所有執行都序列化執行
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}
}
這裡麵包含了invokeSubscriberMethod()
方法的實現原理,其實就是通過反射去執行訂閱者中的方法。
還有就是如果沒有添加註解,就會走SynchronizedSubscriber
中invokeSubscriberMethod()
邏輯,添加了synchronized
關鍵字,不支援併發執行。
總結
這裡主要是整理了guava 中實現觀察者模式的使用及原理。
大家如果有類似的業務場景也可以使用到自己專案