溫馨提示:

本文內容基於個人學習Nacos 2.0.1版本程式碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。

Nacos的服務註冊、服務變更等功能都是通過事件釋出來通知的,搞清楚事件釋出訂閱的機制,有利於理解業務的流程走向。本文將淺顯的分析Nacos中的事件釋出訂閱實現。

事件(Event)

常規事件(Event)

package com.alibaba.nacos.common.notify;

public abstract class Event implements Serializable {

    private static final AtomicLong SEQUENCE = new AtomicLong(0);

    private final long sequence = SEQUENCE.getAndIncrement();

    /**
* Event sequence number, which can be used to handle the sequence of events.
*
* @return sequence num, It's best to make sure it's monotone.
*/
public long sequence() {
return sequence;
}
}

在事件抽象類中定義了一個事件的序列號,它是自增的。用於區分事件執行的前後順序。它是由DefaultPublisher來處理。

慢事件(SlowEvent)

之所以稱之為慢事件,可能因為所有的事件都共享同一個佇列吧。

package com.alibaba.nacos.common.notify;

/**
* This event share one event-queue.
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SlowEvent extends Event { @Override
public long sequence() {
return 0;
}
}

提示:

SlowEvent可以共享一個事件佇列,也就是一個釋出者可以同時管理多個事件的釋出(區別於DefaultPublisher只能管理一個事件)。

訂閱者(Subscriber)

單事件訂閱者

這裡的單事件訂閱者指的是當前的訂閱者只能訂閱一種型別的事件。

package com.alibaba.nacos.common.notify.listener;

/**
* An abstract subscriber class for subscriber interface.
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Subscriber<T extends Event> { /**
* Event callback.
* 事件處理入口,由對應的事件釋出器呼叫
* @param event {@link Event}
*/
public abstract void onEvent(T event); /**
* Type of this subscriber's subscription.
* 訂閱的事件型別
* @return Class which extends {@link Event}
*/
public abstract Class<? extends Event> subscribeType(); /**
* It is up to the listener to determine whether the callback is asynchronous or synchronous.
* 執行緒執行器,由具體的實現類來決定是非同步還是同步呼叫
* @return {@link Executor}
*/
public Executor executor() {
return null;
} /**
* Whether to ignore expired events.
* 是否忽略過期事件
* @return default value is {@link Boolean#FALSE}
*/
public boolean ignoreExpireEvent() {
return false;
}
}

這是預設的訂閱者物件,預設情況下一個訂閱者只能訂閱一個型別的事件。

多事件訂閱者

package com.alibaba.nacos.common.notify.listener;

/**
* Subscribers to multiple events can be listened to.
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SmartSubscriber extends Subscriber { /**
* Returns which event type are smartsubscriber interested in.
* 區別於父類,這裡支援多個事件型別
* @return The interestd event types.
*/
public abstract List<Class<? extends Event>> subscribeTypes(); @Override
public final Class<? extends Event> subscribeType() {
// 採用final修飾,禁止使用單一事件屬性
return null;
} @Override
public final boolean ignoreExpireEvent() {
return false;
}
}

提示

SmartSubscriber和Subscriber的區別是一個可以訂閱多個事件,一個只能訂閱一個事件,處理它們的釋出者也不同。

釋出者(Publisher)

釋出者指的是Nacos中的事件釋出者,頂級介面為EventPublisher。

package com.alibaba.nacos.common.notify;

/**
* Event publisher.
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
public interface EventPublisher extends Closeable { /**
* Initializes the event publisher.
* 初始化事件釋出者
* @param type {@link Event >}
* @param bufferSize Message staging queue size
*/
void init(Class<? extends Event> type, int bufferSize); /**
* The number of currently staged events.
* 當前暫存的事件數量
* @return event size
*/
long currentEventSize(); /**
* Add listener.
* 新增訂閱者
* @param subscriber {@link Subscriber}
*/
void addSubscriber(Subscriber subscriber); /**
* Remove listener.
* 移除訂閱者
* @param subscriber {@link Subscriber}
*/
void removeSubscriber(Subscriber subscriber); /**
* publish event.
* 釋出事件
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event); /**
* Notify listener.
* 通知訂閱者
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event); }

釋出者的主要功能就是新增訂閱者、通知訂閱者,目前有兩種型別的釋出者分別是DefaultPublisher和DefaultSharePublisher。

單事件釋出者(DefaultPublisher)

一個釋出者例項只能處理一種型別的事件。

public class DefaultPublisher extends Thread implements EventPublisher {

	// 釋出者是否初始化完畢
private volatile boolean initialized = false;
// 是否關閉了釋出者
private volatile boolean shutdown = false;
// 事件的型別
private Class<? extends Event> eventType;
// 訂閱者列表
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();
// 佇列最大容量
private int queueMaxSize = -1;
// 佇列型別
private BlockingQueue<Event> queue;
// 最後一個事件的序列號
protected volatile Long lastEventSequence = -1L;
// 事件序列號更新物件,用於更新原子屬性lastEventSequence
private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
}

釋出者的初始化

public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<Event>(bufferSize);
start();
}

在初始化方法中,將其設定為了守護執行緒,意味著它將持續執行(它需要持續監控內部的事件佇列),傳入的type屬性為當前釋出者需要處理的事件型別,設定當前執行緒的名稱以事件型別為區分,它將會以多個執行緒的形式存在,每個執行緒代表一種事件型別的釋出者,後面初始化了佇列的長度。最後呼叫啟動方法完成當前執行緒的啟動。

釋出者執行緒啟動

public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}

直接呼叫了Thread的start方法開啟守護執行緒,並設定初始化狀態為true。根據java執行緒的啟動方式,呼叫start方法之後start方法是會呼叫run方法的。

public void run() {
openEventHandler();
} void openEventHandler() {
try { // This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
// 執行緒終止條件判斷
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
// 執行緒休眠1秒
ThreadUtils.sleep(1000L);
// 等待次數減1
waitTimes--;
} for (; ; ) {
// 執行緒終止條件判斷
if (shutdown) {
break;
}
// 從佇列取出事件
final Event event = queue.take();
// 接收事件
receiveEvent(event);
// 更新事件序列號
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : {}", ex);
}
}

在run方法中呼叫了openEventHandler()方法。那釋出者的實際工作原理就存在於這個方法內部。在首次啟動的時候會等待1分鐘,然後再進行訊息消費。

接收併發布事件

這裡的接收事件指的是接收通知中心發過來的事件,釋出給訂閱者。

void receiveEvent(Event event) {
// 獲取當前事件的序列號,它是自增的
final long currentEventSequence = event.sequence(); // 通知所有訂閱了該事件的訂閱者
// Notification single event listener
for (Subscriber subscriber : subscribers) {
// 判斷訂閱者是否忽略事件過期,判斷當前事件是否被處理過(lastEventSequence初始化的值為-1,而Event的sequence初始化的值為0)
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
continue;
} // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
} public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); // 為每個訂閱者建立一個Runnable物件
final Runnable job = () -> subscriber.onEvent(event);
// 使用訂閱者的執行緒執行器
final Executor executor = subscriber.executor();
// 若訂閱者沒有自己的執行器,則直接執行run方法啟動訂閱者消費執行緒
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}

外部呼叫釋出事件

前面的釋出事件是指從佇列內部獲取事件並通知訂閱者,這裡的釋出事件區別在於它是開放給外部呼叫者,接收統一通知中心的事件並放入佇列中的。

public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}

在放入佇列成功的時候直接返回,若放入佇列失敗,則是直接同步傳送事件給訂閱者,不經過佇列。這裡的同步我認為的是從呼叫者到釋出者呼叫訂閱者之間是同步的,若佇列可用,則是呼叫者到入佇列就完成了本次呼叫,不需要等待迴圈通知訂閱者。使用佇列解耦無疑會提升通知中心的工作效率。

總體來說就是一個釋出者內部維護一個BlockingQueue,在實現上使用了ArrayBlockingQueue,它是一個有界阻塞佇列,元素先進先出。並且使用非公平模式提升效能,意味著等待消費的訂閱者執行順序將得不到保障(業務需求沒有這種順序性要求)。同時也維護了一個訂閱者集合(他們都訂閱了同一個事件型別),在死迴圈中不斷從ArrayBlockingQueue中獲取資料來迴圈通知每一個訂閱者,也就是呼叫訂閱者的onEvent()方法。

多事件釋出者(DefaultSharePublisher)

用於釋出SlowEvent事件並通知所有訂閱了該事件的訂閱者。

public class DefaultSharePublisher extends DefaultPublisher {
// 用於儲存事件型別為SlowEvent的訂閱者,一個事件型別對應多個訂閱者
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<Class<? extends SlowEvent>, Set<Subscriber>>();
// 可重入鎖
private final Lock lock = new ReentrantLock();
}

它繼承了DefaultPublisher,意味著它將擁有其所有的特性。從subMappings屬性來看,這個釋出器是支援多個SlowEvent事件的。DefaultSharePublisher過載了DefaultPublisher的addSubscriber()和removeSubscriber()方法,用於處理多事件型別的情形。

新增訂閱者:

public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {

	// 將事件型別轉換為當前釋出者支援的型別
// Actually, do a classification based on the slowEvent type.
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// 新增到父類的訂閱者列表中,為何要新增呢?因為它需要使用父類的佇列消費邏輯
// For adding to parent class attributes synchronization.
subscribers.add(subscriber);
// 為多個操作加鎖
lock.lock();
try {
// 首先從事件訂閱列表裡面獲取當前事件對應的訂閱者集合
Set<Subscriber> sets = subMappings.get(subSlowEventType);
// 若沒有訂閱者,則新增當前訂閱者
if (sets == null) {
Set<Subscriber> newSet = new ConcurrentHashSet<Subscriber>();
newSet.add(subscriber);
subMappings.put(subSlowEventType, newSet);
return;
}
// 若當前事件訂閱者列表不為空,則插入,因為使用的是Set集合因此可以避免重複資料
sets.add(subscriber);
} finally {
// 別忘了解鎖
lock.unlock();
}
}

提示:

Set newSet = new ConcurrentHashSet(); 它這裡實際上使用的是自己實現的ConcurrentHashSet,它內部使用了ConcurrentHashMap來實現儲存。

在ConcurrentHashSet.add()方法的實現上,它以當前插入的Subscriber物件為key,以一個Boolean值佔位:map.putIfAbsent(o, Boolean.TRUE)。

事件型別和訂閱者的儲存狀態為:

EventType1 -> {Subscriber1, Subscriber2, Subscriber3...}

EventType2 -> {Subscriber1, Subscriber2, Subscriber3...}

EventType3 -> {Subscriber1, Subscriber2, Subscriber3...}

感興趣的可以自己查閱一下原始碼。

移除訂閱者

public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// 轉換型別
// Actually, do a classification based on the slowEvent type.
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// 先移除父類中的訂閱者
// For removing to parent class attributes synchronization.
subscribers.remove(subscriber);
// 加鎖
lock.lock();
try {
// 移除指定事件的指定訂閱者
Set<Subscriber> sets = subMappings.get(subSlowEventType); if (sets != null) {
sets.remove(subscriber);
}
} finally {
// 解鎖
lock.unlock();
}
}

接收事件

@Override
public void receiveEvent(Event event) {
// 獲取當前事件的序列號
final long currentEventSequence = event.sequence();
// 獲取事件的型別,轉換為當前釋出器支援的事件
// get subscriber set based on the slow EventType.
final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass(); // 獲取當前事件的訂閱者列表
// Get for Map, the algorithm is O(1).
Set<Subscriber> subscribers = subMappings.get(slowEventType);
if (null == subscribers) {
LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", slowEventType.getName());
return;
} // 迴圈通知所有訂閱者
// Notification single event subscriber
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
continue;
}
// 通知邏輯和父類是共用的
// Notify single subscriber for slow event.
notifySubscriber(subscriber, event);
}
}

提示:

DefaultPublisher是一個釋出器只負責釋出一個事件,並通知訂閱了這個事件的所有訂閱者;DefaultSharePublisher則是一個釋出器可以釋出多個事件,並通知訂閱了這個事件的所有訂閱者。

通知中心(NotifyCenter)

NotifyCenter 在Nacos中主要用於註冊釋出者、呼叫釋出者釋出事件、為釋出者註冊訂閱者、為指定的事件增加指定的訂閱者等操作。可以說它完全接管了訂閱者、釋出者和事件他們的組合過程。直接呼叫通知中心的相關方法即可實現事件釋出訂閱者註冊等功能。

初始化資訊

package com.alibaba.nacos.common.notify;

public class NotifyCenter {

    /**
* 單事件釋出者內部的事件佇列初始容量
*/
public static int ringBufferSize = 16384; /**
* 多事件釋出者內部的事件佇列初始容量
*/
public static int shareBufferSize = 1024; /**
* 釋出者的狀態
*/
private static final AtomicBoolean CLOSED = new AtomicBoolean(false); /**
* 構造釋出者的工廠
*/
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null; /**
* 通知中心的例項
*/
private static final NotifyCenter INSTANCE = new NotifyCenter(); /**
* 預設的多事件釋出者
*/
private DefaultSharePublisher sharePublisher; /**
* 預設的單事件釋出者型別
* 此處並未直接指定單事件釋出者是誰,只是限定了它的類別
* 因為單事件釋出者一個釋出者只負責一個事件,因此會存在
* 多個釋出者例項,後面按需建立,並快取在publisherMap
*/
private static Class<? extends EventPublisher> clazz = null; /**
* Publisher management container.
* 單事件釋出者儲存容器
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<String, EventPublisher>(16); // 省略部分程式碼
}

可以看到它初始化了一個通知中心的例項,這裡是單例模式。定義了釋出者。訂閱者是儲存在釋出者的內部,而釋出者又儲存在通知者的內部。這樣就組成了一套完整的事件釋出機制。

靜態程式碼塊

static {

	// 初始化DefaultPublisher的queue容量值
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384); // 初始化DefaultSharePublisher的queue容量值
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024); // 使用Nacos SPI機制獲取事件釋出者
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class); // 獲取迭代器
Iterator<EventPublisher> iterator = publishers.iterator(); if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
// 若為空,則使用預設的釋出器(單事件釋出者)
clazz = DefaultPublisher.class;
} // 聲明發布者工廠為一個函式,用於建立釋出者例項
publisherFactory = new BiFunction<Class<? extends Event>, Integer, EventPublisher>() { /**
* 為指定型別的事件建立一個單事件釋出者物件
* @param cls 事件型別
* @param buffer 釋出者內部佇列初始容量
* @return
*/
@Override
public EventPublisher apply(Class<? extends Event> cls, Integer buffer) {
try {
// 例項化釋出者
EventPublisher publisher = clazz.newInstance();
// 初始化
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : {}", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
}
}; try {
// 初始化多事件釋出者
// Create and init DefaultSharePublisher instance.
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); } catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : {}", ex);
} // 增加關閉鉤子,用於關閉Publisher
ThreadUtils.addShutdownHook(new Runnable() {
@Override
public void run() {
shutdown();
}
}); }

在靜態程式碼塊中主要就做了兩件事:

初始化單事件釋出者:可以由使用者擴充套件指定(通過Nacos SPI機制),也可以是Nacos預設的(DefaultPublisher)。

初始化多事件釋出者:DefaultSharePublisher。

註冊訂閱者

註冊訂閱者實際上就是將Subscriber新增到Publisher中。因為事件的釋出是靠釋出者來通知它內部的所有訂閱者。

/**
* Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will
* preempt a placeholder Publisher first.
*
* @param consumer subscriber
* @param <T> event type
*/
public static <T> void registerSubscriber(final Subscriber consumer) { // 若想監聽多個事件,實現SmartSubscriber.subscribeTypes()方法,在裡面返回多個事件的列表即可
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher. // 多事件訂閱者註冊
if (consumer instanceof SmartSubscriber) {
// 獲取事件列表
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// 判斷它的事件型別來決定採用哪種Publisher,多事件訂閱者由多事件釋出者排程
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
//註冊到多事件釋出者中
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// 註冊到單事件釋出者中
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType);
}
}
return;
} // 單事件的訂閱者註冊
final Class<? extends Event> subscribeType = consumer.subscribeType();
// 防止誤使用,萬一有人在使用單事件訂閱者Subscriber的時候傳入了SlowEvent則可以在此避免
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
// 新增完畢返回
return;
} // 註冊到單事件釋出者中
addSubscriber(consumer, subscribeType);
} /**
* 單事件釋出者新增訂閱者
* Add a subscriber to publisher.
* @param consumer subscriber instance.
* @param subscribeType subscribeType.
*/
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
// 獲取類的規範名稱,實際上就是包名加類名,作為topic
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method. /**
* 生成指定型別的釋出者,並將其放入publisherMap中
* 使用topic為key從publisherMap獲取資料,若為空則使用publisherFactory函式並傳遞subscribeType和ringBufferSize來例項
* 化一個clazz型別的釋出者物件,使用topic為key放入publisherMap中,實際上就是為每一個型別的事件建立一個釋出者。具體
* 可檢視publisherFactory的邏輯。
*/
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
}
// 獲取生成的釋出者物件,將訂閱者新增進去
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.addSubscriber(consumer);
}

提示:

單事件釋出者容器內的儲存狀態為: 事件型別的完整限定名 -> DefaultPublisher.

例如:

com.alibaba.nacos.core.cluster.MembersChangeEvent -> {DefaultPublisher@6839} "Thread[nacos.publisher-com.alibaba.nacos.core.cluster.MembersChangeEvent,5,main]"

註冊釋出者

實際上並沒有直接的註冊釋出者這個概念,通過前面的章節你肯定知道釋出者就兩種型別:單事件釋出者、多事件釋出者。單事件釋出者直接就一個例項,多事件釋出者會根據事件型別建立不同的例項,儲存於publisherMap中。它已經在通知中心了,因此並不需要有刻意的註冊動作。需要使用的時候

直接取即可。

註冊事件

註冊事件實際上就是將具體的事件和具體的釋出者進行關聯,釋出者有2種類型,那麼事件也一定是兩種型別了(事件的型別這裡說的是分類,服務於單事件釋出者的事件和服務於多事件釋出者的事件)。

/**
* Register publisher.
*
* @param eventType class Instances type of the event type.
* @param queueMaxSize the publisher's queue max size.
*/
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) { // 慢事件由多事件釋出者處理
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
// 若不是慢事件,因為它可以存在多個不同的型別,因此需要判斷對應的釋出者是否存在
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// 當前傳入的事件型別對應的釋出者,有則忽略無則新建
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}

這裡並未有註冊動作,若是SlowEvent則直接返回了,為何呢?這裡再理一下關係,事件的實際用途是由訂閱者來決定的,由訂閱者來執行對應事件觸發後的操作,事件和釋出者並沒有直接關係。而多事件釋出者呢,它是一個釋出者來處理所有的事件和訂閱者(事件:訂閱者,一對多的關係),這個事件都沒人訂閱何談釋出呢?因此單純的註冊事件並沒有實際意義。反觀一次只能處理一個事件的單事件處理器(DefaultPublisher)則需要一個事件對應一個釋出者,即便這個事件沒有人訂閱,也可以快取起來。

登出訂閱者

登出的操作基本上就是註冊的反向操作。

public static <T> void deregisterSubscriber(final Subscriber consumer) {
// 若是多事件訂閱者
if (consumer instanceof SmartSubscriber) {
// 獲取事件列表
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// 若是慢事件
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
// 從多事件釋出者中移除
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
} else {
// 從單事件釋出者中移除
removeSubscriber(consumer, subscribeType);
}
}
return;
} // 若是單事件訂閱者
final Class<? extends Event> subscribeType = consumer.subscribeType();
// 判斷是否是慢事件
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
return;
} // 呼叫移除方法
if (removeSubscriber(consumer, subscribeType)) {
return;
}
throw new NoSuchElementException("The subscriber has no event publisher");
} private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
// 獲取topic
final String topic = ClassUtils.getCanonicalName(subscribeType);
// 根據topic獲取對應的釋出者
EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);
if (eventPublisher != null) {
// 從釋出者中移除訂閱者
eventPublisher.removeSubscriber(consumer);
return true;
}
return false;
}

登出釋出者

登出釋出者主要針對於單事件釋出者來說的,因為多事件釋出者只有一個例項,它需要處理多個事件型別,因此釋出者不能移除。而單事件釋出者一個釋出者對應一個事件型別,因此某個型別的事件不需要處理的時候則需要將對應的釋出者移除。

public static void deregisterPublisher(final Class<? extends Event> eventType) {
// 獲取topic
final String topic = ClassUtils.getCanonicalName(eventType);
// 根據topic移除對應的釋出者
EventPublisher publisher = INSTANCE.publisherMap.remove(topic);
try {
// 呼叫關閉方法
publisher.shutdown();
} catch (Throwable ex) {
LOGGER.error("There was an exception when publisher shutdown : {}", ex);
}
} public void shutdown() {
// 標記關閉
this.shutdown = true;
// 清空快取
this.queue.clear();
}

釋出事件

釋出事件的本質就是不同型別的釋出者來呼叫內部維護的訂閱者的onEvent()方法。

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {

	// 慢事件處理
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
} // 常規事件處理
final String topic = ClassUtils.getCanonicalName(eventType); EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}

總結

在Nacos中的事件釋出分為兩條線:單一事件處理、多事件處理。圍繞這兩條線又有負責單一型別事件的訂閱者、釋出者,也有負責多事件的訂閱者、釋出者。區分開來兩種型別便很容易理解。

上圖展示了在通知中心中不同型別的事件、訂閱者、釋出者的儲存狀態。

多事件釋出者:

  • 釋出者和事件的關係是一對多
  • 事件和訂閱者的關係是一對多
  • 釋出者和訂閱者的關係是一對多
  • 事件型別為SlowEvent, 訂閱者型別是SmartSubscriber

單事件釋出者

  • 釋出者和事件的關係是一對一
  • 事件和訂閱者的關係是一對多
  • 釋出者和訂閱者的關係是一對多
  • 事件型別為Event,訂閱者型別是Subscriber