1. 程式人生 > >【一起學設計模式】觀察者模式實戰:真實專案中屢試不爽的瓜娃EventBus到底如何實現觀察者模式的?

【一起學設計模式】觀察者模式實戰:真實專案中屢試不爽的瓜娃EventBus到底如何實現觀察者模式的?

申明

本文章首發自本人公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

22.jpg

前言

之前出過一個設計模式的系列文章,這些文章和其他講設計模式的文章 有些不同

文章沒有拘泥於講解設計模式的原理,更多的是梳理工作中實際用到的一些設計模式,並提取出對應業務模型進行總結,回顧下之前的一些文章:

【一起學設計模式】策略模式實戰一:基於訊息傳送的策略模式實戰

【一起學習設計模式】策略模式實戰二:配合註解 幹掉業務程式碼中冗餘的if else…

【一起學設計模式】訪問者模式實戰:許可權管理樹刪除節點操作

【一起學設計模式】命令模式+模板方法+工廠方法實戰: 如何優雅的更新商品庫存…

【一起學設計模式】狀態模式+裝飾器模式+簡單工廠模式實戰:(一)提交個訂單我到底經歷了什麼鬼?

【一起學設計模式】中介者模式+觀察者模式+備忘錄模式實戰:(二)提交個訂單我到底經歷了什麼鬼?

所以:任何脫離實際業務的設計模式都是耍流氓

image.png

業務梳理

最近專案在對接神策埋點相關需求。
有一個場景是:產品自定義了很多埋點事件,有些事件需要後端進行一定的業務處理,然後進行埋點。

業務其實很簡單,就是前端請求到後端,後端進行一定業務處理組裝後將資料傳送到神策後臺。

說到這裡是不是還有小夥伴沒聽懂??那麼就畫張圖吧:

image.png

這裡只是簡單的舉個栗子,說明下業務場景。

針對這個業務場景,最開始的想法是儘量少的侵入原有業務方法,所以這裡選擇使用觀察者模式。

原有業務場景中加入釋出事件的能力,然後訂閱者自己消費進行埋點資料邏輯。做到儘可能的業務解耦。

觀察者模式

這裡還是要多囉嗦幾句,說下觀察者模式原理:

所謂的觀察者模式也稱為釋出訂閱模式,這裡肯定至少存在兩種角色:釋出者/訂閱者

接著看下UML圖:

image.png

所涉及到的角色如下:      

  • 抽象主題(Subject):提供介面,可以增加和剔除觀察者物件。一般用抽象類或者介面實現。
  • 抽象觀察者(Observer):提供介面,在得到主題的通知時更新自己。一般用抽象類或者介面實現。
  • 具體主題(ConcreteSubject):將有關狀態存入具體觀察者,在具體主題的內部狀態發生變化時,給所有註冊過的觀察者發出通知。一般是具體子類實現。
  • 具體觀察者(ConcreteObserver):儲存與主題的狀態自恰的狀態。具體觀察者角色實現抽象觀察者角色所要求的更新介面,以便使本身的狀態與主題的狀態 像協調。如果需要,具體觀察者角色可以保持一個指向具體主題物件的引用    

在上述類圖中,ConcreteSubject中有一個儲存Observer的列表,這意味著ConcreteSubject並不需要知道引用了哪些ConcreteObserver,只要實現(繼承)了Observer的物件都可以存到該列表中。在需要的時候呼叫Observer的update方法。

話不多說,我們自己動手來模擬一個簡單的觀察者模式:

/**
 * 觀察者模式測試程式碼
 *
 * @author wangmeng
 * @date 2020/4/25 19:38
 */
public class ObserverTest {

    public static void main(String[] args) {
        Subject subject = new Subject();
        Task1 task1 = new Task1();
        subject.addObserver(task1);

        Task2 task2 = new Task2();
        subject.addObserver(task2);

        subject.notifyObserver("xxxx");
    }
}

class Subject {
    // observer集合
    private List<Observer> observerList = Lists.newArrayList();

    // add
    public void addObserver(Observer observer) {
        observerList.add(observer);
    }

    // remove
    public void removeObserver(Observer observer) {
        observerList.remove(observer);
    }

    // 通知觀察者
    public void notifyObserver(Object object) {
        for (Observer item : observerList) {
            item.update(object);
        }
    }
}

interface Observer {
    void update(Object object);
}

class Task1 implements Observer {
    @Override
    public void update(Object object) {
        System.out.println("task1 received: " + object);
    }
}

class Task2 implements Observer {
    @Override
    public void update(Object object) {
        System.out.println("task2 received: " + object);
    }
}

針對於觀察者模式,JDK和Spring也有一些內建實現,具體可以參見:JDK中Observable,Spring中ApplicationListener

這裡就不再贅述了,想深入瞭解的小夥伴可執行谷歌,畢竟我們這次文章的重點還是Guava中觀察者模式的使用實現原理。

業務程式碼示例

這裡使用的是Guava中自帶的EventBus元件,我們繼續用取消訂單業務場景做示例,這裡抽離了部分程式碼,只展示核心的一些程式碼:

1. 事件匯流排服務

/**
 * 事件匯流排服務
 *
 * @author wangmeng
 * @date 2020/4/14
 */
@Service
public class EventBusService {
    /**
    * 訂閱者非同步執行器,如果同步可以使用EventBus
    **/
    @Autowired
    private AsyncEventBus asyncEventBus;
    /**
    * 訂閱者集合,裡面方法通過@Subscribe進行事件訂閱
    **/
    @Autowired
    private EventListener eventListener;

    /**
    * 註冊方法,啟動的時候將所有的訂閱者進行註冊
    **/
    @PostConstruct
    public void register() {
        asyncEventBus.register(eventListener);
    }

    /**
     * 訊息投遞,根據入參自動投遞到對應的方法中去消費。
     */
    public void post(Object object) {
        asyncEventBus.post(object);
    }
}

這裡使用了非同步的實現方式,如果使用同步的方式可以將AsyncEventBus改為EventBus

2. 非同步AsyncEventBus配置:

/**
 * AsyncEventBus 執行緒池配置
 *
 * @author wangmeng
 * @date 2020/04/14
 */
@Configuration
public class EventBusConfiguration {

    /** Set the ThreadPoolExecutor's core pool size. */
    private int corePoolSize = 10;
    /** Set the ThreadPoolExecutor's maximum pool size. */
    private int maxPoolSize = 30;
    /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
    private int queueCapacity = 500;

    @Bean
    public AsyncEventBus asyncEventBus() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("jv-mall-user-sensorsData:");
        executor.initialize();
        return new AsyncEventBus(executor);
    }
}

執行緒池資料大家可以隨意配置,這裡只是參考。

3. 觀察者實現

/**
 * 觀察者程式碼
 *
 * @author wangmeng
 * @date 2020/4/14
 */
@Service
@Slf4j
public class EventListener {

    @Autowired
    private SensorsDataManager sensorsDataManager;

    /**
     * 觀察者處理資料埋點方法
     */
    @Subscribe
    @AllowConcurrentEvents
    public void handleCancelOrderEvent(TrackCancelOrderDTO cancelOrderDTO) {
        Map<String, Object> propertyMap = this.buildBasicProperties(cancelOrderDTO);
        propertyMap.put(SensorsDataConstants.ORDER_ID, registerDTO.getOrderId());
        // 各種屬性賦值,這裡只擷取一點
        propertyMap.put(SensorsDataConstants.PROPERTY_IS_SUCCESS, registerDTO.getIsSuccess());
        propertyMap.put(SensorsDataConstants.PROPERTY_FAIL_REASON, registerDTO.getFailReason());
        sensorsDataManager.send(registerDTO.getUserId(), SensorsEventConstants.EVENT_CANCEL_ORDER, propertyMap);
    }
}

這個EventLister 是我們在上面EventBusService中註冊的類,觀察者方法上面新增@Subscribe即可對釋出者的資料進行訂閱。

@AllowConcurrentEvents註解字面意思是允許事件併發執行,這個原理後面會講。

PS:這裡sensorsDataManager是封裝生成埋點相關的類。

釋出者實現

在業務邏輯中加入埋點資料釋出的方法:

@Autowired
private EventBusService eventBusService;

public void cancelOrder(Long orderId) {

    // 業務邏輯執行

    // 埋點資料
    TrackCancelOrderDTO trackCancelOrderDTO = trackBaseOrderInfoManager.buildTrackBaseOrderDTO(orderInfoDO, context.getOrderParentInfoDO(), TrackCancelOrderDTO.class);
    trackCancelOrderDTO.setCancelReason(orderInfoDO.getCancelReason());
    trackCancelOrderDTO.setCancelTime(orderInfoDO.getCancelTime());
    trackCancelOrderDTO.setPlatformName(SensorsDataConstants.PLATFORM_APP);
    trackCancelOrderDTO.setUserId(orderInfoDO.getUserId().toString());
    eventBusService.post(trackCancelOrderDTO);
}

到了這裡所有的如何使用EventBus的程式碼都已經貼出來了,下面就看看具體的原始碼實現吧

原始碼剖析

事件匯流排訂閱原始碼實現

com.google.common.eventbus.SubscriberRegistry#register:

void register(Object listener) {
    //查詢所有訂閱者,維護了一個key是事件型別,value是定訂閱這個事件型別的訂閱者集合的一個map
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      //獲取事件型別
      Class<?> eventType = entry.getKey();
      //獲取這個事件型別的訂閱者集合
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      //從快取中按事件型別查詢訂閱者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        //從快取中取不到,更新快取
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

事件和訂閱事件的訂閱者集合是在com.google.common.eventbus.SubscriberRegistry這裡維護的:

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
    Maps.newConcurrentMap();

到這裡,訂閱者已經準備好了,準備接受事件了。通過debug 看下subscribers中資料:

image.png

釋出事件原始碼實現

com.google.common.eventbus.EventBus#post

public void post(Object event) {
    //獲取事件的訂閱者集合
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      //轉發事件
      dispatcher.dispatch(event, eventSubscribers);
      //如果不是死亡事件,重新包裝成死亡事件重新發布
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
}

Iterator<Subscriber> getSubscribers(Object event) {
    //獲取事件型別類的超類集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
      //獲取事件型別的訂閱者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

事件轉發器有三種實現:

image.png

第一種是立即轉發,實時性比較高,其他兩種都是佇列實現。

我們使用的是AsyncEventBus,其中指定的事件轉發器是:LegacyAsyncDispatcher,接著看看其中的dispatch()方法的實現:

com.google.common.eventbus.Dispatcher.LegacyAsyncDispatcher

private static final class LegacyAsyncDispatcher extends Dispatcher {

    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
            Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        // 先將所有釋出的事件放入佇列中
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        // 消費佇列中的訊息
        e.subscriber.dispatchEvent(e.event);
      }
    }
}

接著看subscriber.dispatchEvent()方法實現:

final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
}

執行訂閱方法都是非同步實現,我們在上面初始化AsyncEventBus的時候有為其構造執行緒池,就是在這裡使用的。

在看invokeSubscriberMethod()具體程式碼之前,我們先來看看@AllowConcurrentEvents,我們在訂閱方法上有加這個註解,來看看這個註解的作用吧:

image.png

在我們執行register()方法的時候,會為每一個訂閱者構造一個Subscriber物件,如果配置了@AllowConcurrentEvents註解,就會為它配置一個允許併發的Subscriber物件。

class Subscriber {

  /**
   * Creates a {@code Subscriber} for {@code method} on {@code listener}.
   */
  static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }

  private static boolean isDeclaredThreadSafe(Method method) {
    // 如果有AllowConcurrentEvents註解,則返回true
    return method.getAnnotation(AllowConcurrentEvents.class) != null;
  }

  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      // 通過反射直接執行訂閱者中方法
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

  @VisibleForTesting
  static final class SynchronizedSubscriber extends Subscriber {

    private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
      super(bus, target, method);
    }

    @Override
    void invokeSubscriberMethod(Object event) throws InvocationTargetException {
      // SynchronizedSubscriber不支援併發,這裡用synchronized修飾,所有執行都序列化執行
      synchronized (this) {
        super.invokeSubscriberMethod(event);
      }
    }
  }
}

這裡麵包含了invokeSubscriberMethod()方法的實現原理,其實就是通過反射去執行訂閱者中的方法。

還有就是如果沒有添加註解,就會走SynchronizedSubscriberinvokeSubscriberMethod()邏輯,添加了synchronized關鍵字,不支援併發執行。

總結

這裡主要是整理了guava 中實現觀察者模式的使用及原理。

大家如果有類似的業務場景也可以使用到自己專案