1. 程式人生 > >Spring Websocket+SockJS+STOMP 實現即時通訊(五)—— UserRegistryMessageHandler與NoOpMessageHandler

Spring Websocket+SockJS+STOMP 實現即時通訊(五)—— UserRegistryMessageHandler與NoOpMessageHandler

目錄


UserRegistryMessageHandler

  • 用來處理來自其他應用服務的使用者登錄檔廣播,同時定期地廣播本地使用者登錄檔的內容;
  • 使用者登錄檔的聚合資訊,被維護在一個MultiServerUserRegistry成員變數中;
  • 無需訂閱MessageChannel,所以沒有實現SmartLiftCycle介面;
處理來自其它應用服務的登錄檔廣播
UserRegistryMessageHandler :
public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<BrokerAvailabilityEvent> {
	@Override
	public void handleMessage(Message<?> message) throws MessagingException {
MessageConverter converter = this.brokerTemplate.getMessageConverter(); this.userRegistry.addRemoteRegistryDto(message, converter, getRegistryExpirationPeriod()); } }
定期廣播本地登錄檔
UserRegistryMessageHandler :

public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<
BrokerAvailabilityEvent>
{ private final UserRegistryTask schedulerTask = new UserRegistryTask(); @Nullable private volatile ScheduledFuture<?> scheduledFuture; private long registryExpirationPeriod = TimeUnit.SECONDS.toMillis(20); @Override public void onApplicationEvent(BrokerAvailabilityEvent event) { if (event.isBrokerAvailable()) { long delay = getRegistryExpirationPeriod() / 2; this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this.schedulerTask, delay); } else { ScheduledFuture<?> future = this.scheduledFuture; if (future != null ){ future.cancel(true); this.scheduledFuture = null; } } } private class UserRegistryTask implements Runnable { @Override public void run() { try { SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); accessor.setLeaveMutable(true); Object payload = userRegistry.getLocalRegistryDto(); brokerTemplate.convertAndSend(getBroadcastDestination(), payload, accessor.getMessageHeaders()); } finally { userRegistry.purgeExpiredRegistries(); } } } }

使用場景

啟用代理中繼,連線外部訊息代理。 通常情況下,為叢集模式,不止一個應用服務程式 —— 即不止一個代理中繼,如下圖所示:
在這裡插入圖片描述

使用條件

配置UserRegistryBroadcast目的地
AbstractMessageBrokerConfiguration類的userRegistryMessageHandler()Bean方法中,我們可以看到,當UserRegistryBroadcast目的地沒被設定的時候,將會new NoOpMessageHandler()。所以必須滿足兩個條件:

  1. 啟用代理中繼;
  2. 配置UserRegistryBroadcast廣播地址;
AbstractMessageBrokerConfiguration:
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
    @Bean
	public MessageHandler userRegistryMessageHandler() {
		if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
			return new NoOpMessageHandler();
		}
		SimpUserRegistry userRegistry = userRegistry();
		Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
		return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
				brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
				messageBrokerTaskScheduler());
	}
}

啟用配置

WebSocketMessageBrokerConfigurer實現類:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfigurer implements WebSocketMessageBrokerConfigurer {
	@Override
	public void configureMessageBroker(MessageBrokerRegistry config) {
		config.enableStompBrokerRelay("topic").setUserRegistryBroadcast("/topic/registry");
	}
}

NoOpMessageHandler

“無操作的訊息處理器” —— 在上面AbstractMessageBrokerConfiguration類的userRegistryMessageHandler()Bean方法中,我們可以看到,當UserRegistryBroadcast目的地沒被設定的時候,將會new NoOpMessageHandler()

AbstractMessageBrokerConfiguration :

public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
      private static class NoOpMessageHandler implements MessageHandler {
		   @Override
		   public void handleMessage(Message<?> message) {
		   }
	  }
}

說白了,NoOpMessageHandler就是在沒有配置UserRegistryBroadcast廣播地址時,用來代替UserRegistryMessageHandler的 ——

  1. 不訂閱MessageChannel;
  2. 不做任何Message處理;