1. 程式人生 > >Spring Websocket+SockJS+STOMP 實現即時通訊(六)—— SubProtocolWebSocketHandler

Spring Websocket+SockJS+STOMP 實現即時通訊(六)—— SubProtocolWebSocketHandler

目錄


WebsocketHandler

  • 一個用來處理Websocket Messages和生命週期事件的處理程式。

在Spring中,如果我們僅僅使用 Websocket 而非 STOMP,正如官方文件所說:

“建立WebSocket伺服器就像實現WebSocketHandler一樣簡單,或者更可能繼承TextWebSocketHandler或BinaryWebSocketHandler”

只需要去實現並配置自己的 WebsocketHandler,如下所示:

TextWebSocketHandler實現類:
public class MyHandler extends TextWebSocketHandler {
    @Override
    public void
handleTextMessage(WebSocketSession session, TextMessage message) { // ... } }

SubProtocolWebSocketHandler 也是 WebSocketHandler 的一種實現,就像它的名字那樣,是一種專門處理子協議(內容交換協議)的 WebSocketHandler 實現。

通過這個類,SpringSTOMP over Websocket完美的封裝起來,以至於我們完全不用去理會如何處理Websocket子協議?,僅僅要做的就是啟用訊息代理,然後實現自己的業務邏輯。


SubProtocolWebSocketHandler

  • 首先它是WebSocketHandler的一種實現,它將傳入的WebSocketMessage(已經被成功升級成Websocket的訊息)委託給SubProtocolHandler以及clientInboundChannelSubProtocolHandler可以將Upgrade後的訊息從WebSocket客戶端傳送到應用程式;
  • 另外它也是MessageHandler的一種實現,它找到與Message關聯的WebSocketSession,並將該session與Message一起傳遞給SubProtocolHandler,用來將Message從應用程式傳送回客戶端。

持有四個重要成員變數:

  • Map<String, SubProtocolHandler> protocolHandlerLookup =
    new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
  • Map<String, WebSocketSessionHolder> sessions = new ConcurrentHashMap<>();
  • MessageChannel clientInboundChannel;
  • SubscribableChannel clientOutboundChannel;

四個重要成員變數

protocolHandlerLookup

“協議處理程式查詢” —— 顧名思義,這個變數可以持有不止一個SubProtocolHandler例項,通過它我們能夠查詢到相應子協議的處理程式。該變數的實際型別是new TreeMap(),為了保證元素順序。key —— 為子協議的名稱,value —— 為 SubProtocolHandler 例項。

下面這段程式碼,可以看到如何向protocolHandlerLookup新增SubProtocolHandler例項。

SubProtocolWebSocketHandler:

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
	/**
	 * Register a sub-protocol handler.
	 */
	public void addProtocolHandler(SubProtocolHandler handler) {
		List<String> protocols = handler.getSupportedProtocols();
		if (CollectionUtils.isEmpty(protocols)) {
			if (logger.isErrorEnabled()) {
				logger.error("No sub-protocols for " + handler);
			}
			return;
		}
		for (String protocol : protocols) {
			SubProtocolHandler replaced = this.protocolHandlerLookup.put(protocol, handler);
			if (replaced != null && replaced != handler) {
				throw new IllegalStateException("Cannot map " + handler +
						" to protocol '" + protocol + "': already mapped to " + replaced + ".");
			}
		}
		this.protocolHandlers.add(handler);
	}
}
子協議
  • 處理WebSocket訊息的約定,作為更高級別協議的一部分,在WebSocket RFC規範中稱為“子協議”
SubProtocolHandler
  • 實際上SubProtocolWebSocketHandler所做的工作,最終都是通過SubProtocolHandler來完成,可以說後者就是前者的支撐例項。
  • 處理來自客戶端的WebSocketMessages以及要傳送給客戶端的Message。
  • 可以在SubProtocolWebSocketHandler上配置此介面的實現,SubProtocolWebSocketHandler根據客戶端傳遞的Sec-WebSocket-Protocol請求標頭,來選擇子協議處理程式以委派訊息。

通過名字和註釋我們很容易理解各個方法的作用,不過還是要強調兩個:

1. handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
將WebsocketMessage從客戶端傳送到應用程式。不過這裡需要強調兩點:

  1. 是WebsocketMessage。也就是說 HandeShake 握手請求成功,請求訊息升級成WebsocketMessage後,該處理程式才會被呼叫,同樣其他方法中所強調的連線,此時即是Websocket連線。
  2. 傳送到應用程式。所以這個方法的註釋和引數名,其實是錯的。這裡引數MessageChannel,應該是inboundChannel —— a inbound channel to send messages to application,是一個傳送訊息到應用程式的入棧通道。

2. handleMessageToClient(WebSocketSession session, Message<?> message)
將訊息從應用程式傳送到客戶端。

SubProtocolHandler :

public interface SubProtocolHandler {
	/**
	 * Return the list of sub-protocols supported by this handler (never {@code null}).
	 */
	List<String> getSupportedProtocols();
	/**
	 * Handle the given {@link WebSocketMessage} received from a client.
	 * @param session the client session
	 * @param message the client message
	 * @param outputChannel an output channel to send messages to
	 */
	void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
			throws Exception;
	/**
	 * Handle the given {@link Message} to the client associated with the given WebSocket session.
	 * @param session the client session
	 * @param message the client message
	 */
	void handleMessageToClient(WebSocketSession session, Message<?> message) throws Exception;
	/**
	 * Resolve the session id from the given message or return {@code null}.
	 * @param message the message to resolve the session id from
	 */
	@Nullable
	String resolveSessionId(Message<?> message);
	/**
	 * Invoked after a {@link WebSocketSession} has started.
	 * @param session the client session
	 * @param outputChannel a channel
	 */
	void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) throws Exception;

	/**
	 * Invoked after a {@link WebSocketSession} has ended.
	 * @param session the client session
	 * @param closeStatus the reason why the session was closed
	 * @param outputChannel a channel
	 */
	void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel)
			throws Exception;
}

在Spring中(截止當前Spring Framework 5.1.2StompSubProtocolHandler—— 處理STOMP子協議目前是SubProtocolHandler 的唯一實現,但是該介面為以後子協議的擴充套件提供了鉤子(簡直驚歎啊,這是程式設計者值得學習的地方)。


sessions

“會話集合” —— 維持著與該應用程式建立Websocket連線的所有WebsoketSession。這裡實際型別是new ConcurrentHashMap<>(),滿足應用程式的併發量,key —— WebsocketSession id,value —— WebSocketSessionHolder例項。

afterConnectionEstablished(WebSocketSession session):
在Websocket 連線建立之後,將WebSocketSession 存入sessions。我們可以通過複寫該方法來實現session儲存的自定義。
     當然Spring早叫為我們想到了,所以它提供了WebSocketHandlerDecoratorWebSocketHandlerDecoratorFactory,作為一個鉤子,允許我們不改變程式碼的情況下,為SubProtocolWebSocketHandler增加功能,比如說“實現分散式WebsocketSession”,以此來滿足我們構建分散式應用程式的需求。

SubProtocolWebSocketHandler:

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
	@Override
	public void afterConnectionEstablished(WebSocketSession session) throws Exception {
		// WebSocketHandlerDecorator could close the session
		if (!session.isOpen()) {
			return;
		}

		this.stats.incrementSessionCount(session);
		session = decorateSession(session);
		this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
		findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
	}
}

clientInboundChannel

“入站通道” —— SubProtocolWebSocketHandler實現了WebSocketHandler的handleMessage(WebSocketSession session, WebSocketMessage<?> message) 方法,處理升級成功WebsocketMessage。

先通過sessionIdsessions中取出對應的WebSocketSessionHolder,再通過protocol子協議型別從protocolHandlerLookup取出對應的SubProtocolHandler ,再由SubProtocolHandler通過StompDecoder——STOMP訊息解碼器,將ByteBuffer解碼成STOMP格式的訊息內容,然後對該內容增加更多的資訊,最後由clientInboundChannel傳送到應用程式。

SubProtocolWebSocketHandler:

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
	/**
	 * Handle an inbound message from a WebSocket client.
	 */
	@Override
	public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
		WebSocketSessionHolder holder = this.sessions.get(session.getId());
		if (holder != null) {
			session = holder.getSession();
		}
		SubProtocolHandler protocolHandler = findProtocolHandler(session);
		protocolHandler.handleMessageFromClient(session, message, this.clientInboundChannel);
		if (holder != null) {
			holder.setHasHandledMessages();
		}
		checkSessions();
	}
}

clientOutboundChannel

“出站通道” —— SubProtocolWebSocketHandler的bean在生命週期開始時,訂閱了它的成員變數clientOutboundChannel,並且SubProtocolWebSocketHandlerMessageHandler的實現,以至於通過clientOutboundChannel傳送的資訊,會被該處理程式接收,通過handleMessage(Message<?> message)方法進行處理。

先通過sessionId獲得對應的WebSocketSession,再由對應的SubProtocolHandler呼叫handleMessageToClient(WebSocketSession session, Message<?> message)Message進行附加內容,之後通過StompEcoder——STOMP訊息編碼器,將STOMP格式的訊息內容編碼成位元組陣列,最後由WebSocketSession傳送到對應客戶端。

SubProtocolWebSocketHandler:

public class SubProtocolWebSocketHandler
		implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
    @Override
	public final void start() {
		Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");

		synchronized (this.lifecycleMonitor) {
			this.clientOutboundChannel.subscribe(this);
			this.running = true;
		}
	}
    /**
	 * Handle an outbound Spring Message to a WebSocket client.
	 */
	@Override
	public void handleMessage(Message<?> message) throws MessagingException {
		String sessionId = resolveSessionId(message);
		if (sessionId == null) {
			if (logger.isErrorEnabled()) {
				logger.error("Could not find session id in " + message);
			}
			return;
		}

		WebSocketSessionHolder holder = this.sessions.get(sessionId);
		if (holder == null) {
			if (logger.isDebugEnabled()) {
				// The broker may not have removed the session yet
				logger.debug("No session for " + message);
			}
			return;
		}

		WebSocketSession session = holder.getSession();
		try {
			findProtocolHandler(session).handleMessageToClient(session, message);
		}
		catch (SessionLimitExceededException ex) {
			try {
				if (logger.isDebugEnabled()) {
					logger.debug("Terminating '" + session + "'", ex);
				}
				this.stats.incrementLimitExceededCount();
				clearSession(session, ex.getStatus()); // clear first, session may be unresponsive
				session.close(ex.getStatus());
			}
			catch (Exception secondException) {
				logger.debug("Failure while closing session " + sessionId + ".", secondException);
			}
		}
		catch (Exception ex) {
			// Could be part of normal workflow (e.g. browser tab closed)
			if (logger.isDebugEnabled()) {
				logger.debug("Failed to send message to client in " + session + ": " + message, ex);
			}
		}
	}
}