Spring Websocket+SockJS+STOMP 實現即時通訊(六)—— SubProtocolWebSocketHandler
目錄
WebsocketHandler
- 一個用來處理Websocket Messages和生命週期事件的處理程式。
在Spring中,如果我們僅僅使用 Websocket 而非 STOMP,正如官方文件所說:
“建立WebSocket伺服器就像實現WebSocketHandler一樣簡單,或者更可能繼承TextWebSocketHandler或BinaryWebSocketHandler”
只需要去實現並配置自己的 WebsocketHandler,如下所示:
TextWebSocketHandler實現類:
public class MyHandler extends TextWebSocketHandler {
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}
}
SubProtocolWebSocketHandler 也是 WebSocketHandler 的一種實現,就像它的名字那樣,是一種專門處理子協議(內容交換協議)的 WebSocketHandler 實現。
通過這個類,Spring 將STOMP over Websocket完美的封裝起來,以至於我們完全不用去理會如何處理Websocket子協議?
,僅僅要做的就是啟用訊息代理,然後實現自己的業務邏輯。
SubProtocolWebSocketHandler
- 首先它是WebSocketHandler的一種實現,它將傳入的WebSocketMessage(已經被成功升級成Websocket的訊息)委託給SubProtocolHandler以及clientInboundChannel,SubProtocolHandler可以將Upgrade後的訊息從WebSocket客戶端傳送到應用程式;
- 另外它也是MessageHandler的一種實現,它找到與Message關聯的WebSocketSession,並將該session與Message一起傳遞給SubProtocolHandler,用來將Message從應用程式傳送回客戶端。
持有四個重要成員變數:
- Map<String, SubProtocolHandler> protocolHandlerLookup =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - Map<String, WebSocketSessionHolder> sessions = new ConcurrentHashMap<>();
- MessageChannel clientInboundChannel;
- SubscribableChannel clientOutboundChannel;
四個重要成員變數
protocolHandlerLookup
“協議處理程式查詢” —— 顧名思義,這個變數可以持有不止一個SubProtocolHandler例項,通過它我們能夠查詢到相應子協議的處理程式。該變數的實際型別是new TreeMap(),為了保證元素順序。key —— 為子協議的名稱,value —— 為 SubProtocolHandler 例項。
下面這段程式碼,可以看到如何向protocolHandlerLookup新增SubProtocolHandler例項。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
/**
* Register a sub-protocol handler.
*/
public void addProtocolHandler(SubProtocolHandler handler) {
List<String> protocols = handler.getSupportedProtocols();
if (CollectionUtils.isEmpty(protocols)) {
if (logger.isErrorEnabled()) {
logger.error("No sub-protocols for " + handler);
}
return;
}
for (String protocol : protocols) {
SubProtocolHandler replaced = this.protocolHandlerLookup.put(protocol, handler);
if (replaced != null && replaced != handler) {
throw new IllegalStateException("Cannot map " + handler +
" to protocol '" + protocol + "': already mapped to " + replaced + ".");
}
}
this.protocolHandlers.add(handler);
}
}
子協議
- 處理WebSocket訊息的約定,作為更高級別協議的一部分,在WebSocket RFC規範中稱為“子協議”
SubProtocolHandler
- 實際上SubProtocolWebSocketHandler所做的工作,最終都是通過SubProtocolHandler來完成,可以說後者就是前者的支撐例項。
- 處理來自客戶端的WebSocketMessages以及要傳送給客戶端的Message。
- 可以在SubProtocolWebSocketHandler上配置此介面的實現,SubProtocolWebSocketHandler根據客戶端傳遞的Sec-WebSocket-Protocol請求標頭,來選擇子協議處理程式以委派訊息。
通過名字和註釋我們很容易理解各個方法的作用,不過還是要強調兩個:
1. handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
將WebsocketMessage從客戶端傳送到應用程式。不過這裡需要強調兩點:
- 是WebsocketMessage。也就是說 HandeShake 握手請求成功,請求訊息升級成WebsocketMessage後,該處理程式才會被呼叫,同樣其他方法中所強調的連線,此時即是Websocket連線。
- 傳送到應用程式。所以這個方法的註釋和引數名,其實是錯的。這裡引數MessageChannel,應該是
inboundChannel
——a inbound channel to send messages to application
,是一個傳送訊息到應用程式的入棧通道。
2. handleMessageToClient(WebSocketSession session, Message<?> message)
將訊息從應用程式傳送到客戶端。
SubProtocolHandler :
public interface SubProtocolHandler {
/**
* Return the list of sub-protocols supported by this handler (never {@code null}).
*/
List<String> getSupportedProtocols();
/**
* Handle the given {@link WebSocketMessage} received from a client.
* @param session the client session
* @param message the client message
* @param outputChannel an output channel to send messages to
*/
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
throws Exception;
/**
* Handle the given {@link Message} to the client associated with the given WebSocket session.
* @param session the client session
* @param message the client message
*/
void handleMessageToClient(WebSocketSession session, Message<?> message) throws Exception;
/**
* Resolve the session id from the given message or return {@code null}.
* @param message the message to resolve the session id from
*/
@Nullable
String resolveSessionId(Message<?> message);
/**
* Invoked after a {@link WebSocketSession} has started.
* @param session the client session
* @param outputChannel a channel
*/
void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) throws Exception;
/**
* Invoked after a {@link WebSocketSession} has ended.
* @param session the client session
* @param closeStatus the reason why the session was closed
* @param outputChannel a channel
*/
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel)
throws Exception;
}
在Spring中(截止當前Spring Framework 5.1.2
)StompSubProtocolHandler—— 處理STOMP子協議
目前是SubProtocolHandler 的唯一實現,但是該介面為以後子協議的擴充套件提供了鉤子(簡直驚歎啊,這是程式設計者值得學習的地方)。
sessions
“會話集合” —— 維持著與該應用程式建立Websocket連線的所有WebsoketSession。這裡實際型別是new ConcurrentHashMap<>(),滿足應用程式的併發量,key —— WebsocketSession id,value —— WebSocketSessionHolder例項。
afterConnectionEstablished(WebSocketSession session):
在Websocket 連線建立之後,將WebSocketSession 存入sessions。我們可以通過複寫該方法來實現session儲存的自定義。
當然Spring早叫為我們想到了,所以它提供了WebSocketHandlerDecorator 與WebSocketHandlerDecoratorFactory,作為一個鉤子,允許我們不改變程式碼的情況下,為SubProtocolWebSocketHandler增加功能,比如說“實現分散式WebsocketSession”
,以此來滿足我們構建分散式應用程式的需求。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// WebSocketHandlerDecorator could close the session
if (!session.isOpen()) {
return;
}
this.stats.incrementSessionCount(session);
session = decorateSession(session);
this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
}
}
clientInboundChannel
“入站通道” —— SubProtocolWebSocketHandler實現了WebSocketHandler的handleMessage(WebSocketSession session, WebSocketMessage<?> message) 方法,處理升級成功WebsocketMessage。
先通過sessionId從sessions中取出對應的WebSocketSessionHolder,再通過protocol子協議型別從protocolHandlerLookup取出對應的SubProtocolHandler ,再由SubProtocolHandler通過StompDecoder——STOMP訊息解碼器,將ByteBuffer解碼成STOMP格式的訊息內容,然後對該內容增加更多的資訊,最後由clientInboundChannel傳送到應用程式。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
/**
* Handle an inbound message from a WebSocket client.
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
WebSocketSessionHolder holder = this.sessions.get(session.getId());
if (holder != null) {
session = holder.getSession();
}
SubProtocolHandler protocolHandler = findProtocolHandler(session);
protocolHandler.handleMessageFromClient(session, message, this.clientInboundChannel);
if (holder != null) {
holder.setHasHandledMessages();
}
checkSessions();
}
}
clientOutboundChannel
“出站通道” —— SubProtocolWebSocketHandler的bean在生命週期開始時,訂閱了它的成員變數clientOutboundChannel,並且SubProtocolWebSocketHandler是MessageHandler的實現,以至於通過clientOutboundChannel傳送的資訊,會被該處理程式接收,通過handleMessage(Message<?> message)方法進行處理。
先通過sessionId獲得對應的WebSocketSession,再由對應的SubProtocolHandler呼叫handleMessageToClient(WebSocketSession session, Message<?> message)對Message進行附加內容,之後通過StompEcoder——STOMP訊息編碼器,將STOMP格式的訊息內容編碼成位元組陣列,最後由WebSocketSession傳送到對應客戶端。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
@Override
public final void start() {
Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");
synchronized (this.lifecycleMonitor) {
this.clientOutboundChannel.subscribe(this);
this.running = true;
}
}
/**
* Handle an outbound Spring Message to a WebSocket client.
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String sessionId = resolveSessionId(message);
if (sessionId == null) {
if (logger.isErrorEnabled()) {
logger.error("Could not find session id in " + message);
}
return;
}
WebSocketSessionHolder holder = this.sessions.get(sessionId);
if (holder == null) {
if (logger.isDebugEnabled()) {
// The broker may not have removed the session yet
logger.debug("No session for " + message);
}
return;
}
WebSocketSession session = holder.getSession();
try {
findProtocolHandler(session).handleMessageToClient(session, message);
}
catch (SessionLimitExceededException ex) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Terminating '" + session + "'", ex);
}
this.stats.incrementLimitExceededCount();
clearSession(session, ex.getStatus()); // clear first, session may be unresponsive
session.close(ex.getStatus());
}
catch (Exception secondException) {
logger.debug("Failure while closing session " + sessionId + ".", secondException);
}
}
catch (Exception ex) {
// Could be part of normal workflow (e.g. browser tab closed)
if (logger.isDebugEnabled()) {
logger.debug("Failed to send message to client in " + session + ": " + message, ex);
}
}
}
}