1. 程式人生 > >Spring Websocket+SockJS+STOMP 實現即時通訊(四)—— MessageHandler

Spring Websocket+SockJS+STOMP 實現即時通訊(四)—— MessageHandler

目錄


MessageHandler的作用

上一節中我們提到過,ExecutorSubscribableChannel類持有一個例項handlers —— MessageHandler集合,是MessageChannel的訂閱者,用來作為處理Messages的約定。

MessageHandler是一個介面,它的實現類都必須實現方法 —— handleMessage(Message<?> message)

用來處理訊息Message —— 從外部接收的從應用程式內部傳遞的

/**
 * Contract for handling a {@link Message}.
 *
 * @author Mark Fisher
 * @author Iwein Fuld
 * @since 4.0
 */
public interface MessageHandler {
	/**
	 * Handle the given message.
	 * @param message the message to be handled
	 */
	void handleMessage(Message<?>
message) throws MessagingException; }

MessageHandler實現類

MessageHandler 的具體實現類共有 兩類九種,分別用來處理不同型別的Message

  • 未實現org.springframework.context.SmartLifecycle介面
    • UserRegistryMessageHandler
    • NoOpMessageHandler
  • 實現org.springframework.context.SmartLifecycle介面
    • SubProtocolWebSocketHandler
    • SimpAnnotationMethodMessageHandler
    • WebSocketAnnotationMethodMessageHandler
    • NoOpBrokerMessageHandler
    • SimpleBrokerMessageHandler
    • StompBrokerRelayMessageHandler
    • UserDestinationMessageHandler

兩類MessageHandler有什麼區別?

  • 先來看下org.springframework.context.SmartLifecycle介面 —— 智慧生命週期,是org.springframework.context.Lifecycleorg.springframework.context.Phased的擴充套件。說白了:只要你實現了SmartLifeCycle介面,你便可以在任何時候判斷Bean所處的生命週期,並可以通過實現指定方法在指定的生命週期去搞事情
SmartLifecycle :
 public interface SmartLifecycle extends Lifecycle, Phased {
   boolean isAutoStartup();
   void stop(Runnable callback);
}
  • 就拿AbstractBrokerMessageHandler來說,它是NoOpBrokerMessageHandler、SimpleBrokerMessageHandler和StompBrokerRelayMessageHandler的父類介面,實現了SmartLifecycle,在start()stop()方法中分別實現了對相關MessageChannel訂閱取消訂閱
  • 到這裡我們可以明白,不實現SmartLifecycle介面的那類MessageHandler就是不需要繫結MessageChannel,相反另一類則是用來在指定生命週期訂閱或解訂閱MessageChannel。
AbstractBrokerMessageHandler:
public abstract class AbstractBrokerMessageHandler
   	implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle {
   @Override
   public boolean isAutoStartup() {
   	return this.autoStartup;
   }
   @Override
   public int getPhase() {
   	return Integer.MAX_VALUE;
   }
   @Override
   public void start() {
   	synchronized (this.lifecycleMonitor) {
   		logger.info("Starting...");
   		this.clientInboundChannel.subscribe(this);
   		this.brokerChannel.subscribe(this);
   		if (this.clientInboundChannel instanceof InterceptableChannel) {
   			((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor);
   		}
   		startInternal();
   		this.running = true;
   		logger.info("Started.");
   	}
   }
   @Override
   public void stop() {
   	synchronized (this.lifecycleMonitor) {
   		logger.info("Stopping...");
   		stopInternal();
   		this.clientInboundChannel.unsubscribe(this);
   		this.brokerChannel.unsubscribe(this);
   		if (this.clientInboundChannel instanceof InterceptableChannel) {
   			((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor);
   		}
   		this.running = false;
   		logger.info("Stopped.");
   	}
   }
   @Override
   public final void stop(Runnable callback) {
   	synchronized (this.lifecycleMonitor) {
   		stop();
   		callback.run();
   	}
   }
   @Override
   public final boolean isRunning() {
   	return this.running;
   }
}