Spring Websocket+SockJS+STOMP 實現即時通訊(五)—— UserRegistryMessageHandler與NoOpMessageHandler
阿新 • • 發佈:2018-12-29
目錄
UserRegistryMessageHandler
- 用來處理來自其他應用服務的使用者登錄檔廣播,同時定期地廣播本地使用者登錄檔的內容;
- 使用者登錄檔的聚合資訊,被維護在一個MultiServerUserRegistry成員變數中;
- 無需訂閱MessageChannel,所以沒有實現
SmartLiftCycle
介面;
處理來自其它應用服務的登錄檔廣播
UserRegistryMessageHandler :
public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<BrokerAvailabilityEvent> {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageConverter converter = this.brokerTemplate.getMessageConverter();
this.userRegistry.addRemoteRegistryDto(message, converter, getRegistryExpirationPeriod());
}
}
定期廣播本地登錄檔
UserRegistryMessageHandler :
public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener< BrokerAvailabilityEvent> {
private final UserRegistryTask schedulerTask = new UserRegistryTask();
@Nullable
private volatile ScheduledFuture<?> scheduledFuture;
private long registryExpirationPeriod = TimeUnit.SECONDS.toMillis(20);
@Override
public void onApplicationEvent(BrokerAvailabilityEvent event) {
if (event.isBrokerAvailable()) {
long delay = getRegistryExpirationPeriod() / 2;
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this.schedulerTask, delay);
}
else {
ScheduledFuture<?> future = this.scheduledFuture;
if (future != null ){
future.cancel(true);
this.scheduledFuture = null;
}
}
}
private class UserRegistryTask implements Runnable {
@Override
public void run() {
try {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
accessor.setLeaveMutable(true);
Object payload = userRegistry.getLocalRegistryDto();
brokerTemplate.convertAndSend(getBroadcastDestination(), payload, accessor.getMessageHeaders());
}
finally {
userRegistry.purgeExpiredRegistries();
}
}
}
}
使用場景
啟用代理中繼,連線外部訊息代理。 通常情況下,為叢集模式,不止一個應用服務程式 —— 即不止一個代理中繼,如下圖所示:
使用條件
配置UserRegistryBroadcast目的地。
在AbstractMessageBrokerConfiguration
類的userRegistryMessageHandler()Bean方法中,我們可以看到,當UserRegistryBroadcast目的地沒被設定的時候,將會new NoOpMessageHandler()。所以必須滿足兩個條件:
- 啟用代理中繼;
- 配置UserRegistryBroadcast廣播地址;
AbstractMessageBrokerConfiguration:
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
@Bean
public MessageHandler userRegistryMessageHandler() {
if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
return new NoOpMessageHandler();
}
SimpUserRegistry userRegistry = userRegistry();
Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
messageBrokerTaskScheduler());
}
}
啟用配置
WebSocketMessageBrokerConfigurer實現類:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfigurer implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("topic").setUserRegistryBroadcast("/topic/registry");
}
}
NoOpMessageHandler
“無操作的訊息處理器” —— 在上面AbstractMessageBrokerConfiguration
類的userRegistryMessageHandler()Bean方法中,我們可以看到,當UserRegistryBroadcast目的地沒被設定的時候,將會new NoOpMessageHandler()。
AbstractMessageBrokerConfiguration :
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
private static class NoOpMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) {
}
}
}
說白了,NoOpMessageHandler就是在沒有配置UserRegistryBroadcast廣播地址時,用來代替UserRegistryMessageHandler的 ——
- 不訂閱MessageChannel;
- 不做任何Message處理;