Spring Boot中使用WebSocket總結(三):使用訊息佇列實現分散式WebSocket
在上一篇文章(ofollow,noindex">www.zifangsky.cn/1359.html )中我介紹了服務端如何給指定使用者的客戶端傳送訊息,並如何處理對方不線上的情況。在這篇文章中我們繼續思考另外一個重要的問題,那就是:如果我們的專案是分散式環境,登入的使用者被Nginx的反向代理分配到多個不同伺服器,那麼在其中一個伺服器建立了Socket/">WebSocket連線的使用者如何給在另外一個伺服器上建立了WebSocket連線的使用者傳送訊息呢 ?
其實,要解決這個問題就需要實現分散式WebSocket,而分散式WebSocket一般可以通過以下兩種方案來實現:
- 方案一:將訊息(<使用者id,訊息內容>)統一推送到一個訊息佇列(Redis、Kafka等)的的topic,然後每個應用節點都訂閱這個topic,在接收到WebSocket訊息後取出這個訊息的“訊息接收者的使用者ID/使用者名稱 ”,然後再比對自身是否存在相應使用者的連線,如果存在則推送訊息,否則丟棄接收到的這個訊息(這個訊息接收者所在的應用節點會處理)
- 方案二:在使用者建立WebSocket連線後,使用Redis快取記錄使用者的WebSocket建立在哪個應用節點上,然後同樣使用訊息佇列將訊息推送到接收者所在的應用節點上面(實現上比方案一要複雜,但是網路流量會更低)
在下面的示例中,我將根據相對簡單的方案一來是實現,具體實現方式如下:
(1)定義一個WebSocket Channel列舉類:
package cn.zifangsky.mqwebsocket.enums; import org.apache.commons.lang3.StringUtils; /** * WebSocket Channel列舉類 * * @author zifangsky * @date 2018/10/16 * @since 1.0.0 */ public enum WebSocketChannelEnum { //測試使用的簡易點對點聊天 CHAT("CHAT", "測試使用的簡易點對點聊天", "/topic/reply"); WebSocketChannelEnum(String code, String description, String subscribeUrl) { this.code = code; this.description = description; this.subscribeUrl = subscribeUrl; } /** * 唯一CODE */ private String code; /** * 描述 */ private String description; /** * WebSocket客戶端訂閱的URL */ private String subscribeUrl; public String getCode() { return code; } public String getDescription() { return description; } public String getSubscribeUrl() { return subscribeUrl; } /** * 通過CODE查詢列舉類 */ public static WebSocketChannelEnum fromCode(String code){ if(StringUtils.isNoneBlank(code)){ for(WebSocketChannelEnum channelEnum : values()){ if(channelEnum.code.equals(code)){ return channelEnum; } } } return null; } } 複製程式碼
(2)配置基於Redis的訊息佇列:
關於Redis實現的訊息佇列可以參考我之前的這篇文章:www.zifangsky.cn/1347.html
需要注意的是,在大中型正式專案中並不推薦使用Redis實現的訊息佇列,因為經過測試它並不是特別可靠,所以應該考慮使用Kafka
、rabbitMQ
等專業的訊息佇列中介軟體(PS:據說Redis 5.0全新的資料結構Streams
極大增強了Redis的訊息佇列功能?)
package cn.zifangsky.mqwebsocket.config; import cn.zifangsky.mqwebsocket.mq.MessageReceiver; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.util.Arrays; /** * Redis相關配置 * * @author zifangsky * @date 2018/7/30 * @since 1.0.0 */ @Configuration @ConditionalOnClass({JedisCluster.class}) public class RedisConfig { @Value("${spring.redis.timeout}") private String timeOut; @Value("${spring.redis.cluster.nodes}") private String nodes; @Value("${spring.redis.cluster.max-redirects}") private int maxRedirects; @Value("${spring.redis.jedis.pool.max-active}") private int maxActive; @Value("${spring.redis.jedis.pool.max-wait}") private int maxWait; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.min-idle}") private int minIdle; @Value("${spring.redis.message.topic-name}") private String topicName; @Bean public JedisPoolConfig jedisPoolConfig(){ JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWait); return config; } @Bean public RedisClusterConfiguration redisClusterConfiguration(){ RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes)); configuration.setMaxRedirects(maxRedirects); return configuration; } /** * JedisConnectionFactory */ @Bean public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){ return new JedisConnectionFactory(configuration,jedisPoolConfig); } /** * 使用Jackson序列化物件 */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); return serializer; } /** * RedisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); //字串方式序列化KEY StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); //JSON方式序列化VALUE redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 訊息監聽器 */ @Bean MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ //訊息接收者以及對應的預設處理方法 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage"); //訊息的反序列化方式 messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer); return messageListenerAdapter; } /** * message listener container */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , MessageListenerAdapter messageListenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //新增訊息監聽器 container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName)); return container; } } 複製程式碼
需要注意的是,這裡使用的配置如下所示:
spring: ... #redis redis: cluster: nodes: namenode22:6379,datanode23:6379,datanode24:6379 max-redirects: 6 timeout: 300000 jedis: pool: max-active: 8 max-wait: 100000 max-idle: 8 min-idle: 0 #自定義的監聽的TOPIC路徑 message: topic-name: topic-test 複製程式碼
(3)定義一個Redis訊息的處理者:
package cn.zifangsky.mqwebsocket.mq; import cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum; import cn.zifangsky.mqwebsocket.model.websocket.RedisWebsocketMsg; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.user.SimpUser; import org.springframework.messaging.simp.user.SimpUserRegistry; import org.springframework.stereotype.Component; import java.text.MessageFormat; /** * Redis中的WebSocket訊息的處理者 * * @author zifangsky * @date 2018/10/16 * @since 1.0.0 */ @Component public class MessageReceiver { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; /** * 處理WebSocket訊息 */ public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) { logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg)); //1. 取出使用者名稱並判斷是否連線到當前應用節點的WebSocket SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ //2. 獲取WebSocket客戶端的訂閱地址 WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if(channelEnum != null){ //3. 給WebSocket客戶端傳送訊息 messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } } } 複製程式碼
(4)在Controller中傳送WebSocket訊息:
package cn.zifangsky.mqwebsocket.controller; import cn.zifangsky.mqwebsocket.common.Constants; import cn.zifangsky.mqwebsocket.common.SpringContextUtils; import cn.zifangsky.mqwebsocket.enums.ExpireEnum; import cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum; import cn.zifangsky.mqwebsocket.model.User; import cn.zifangsky.mqwebsocket.model.websocket.HelloMessage; import cn.zifangsky.mqwebsocket.model.websocket.RedisWebsocketMsg; import cn.zifangsky.mqwebsocket.service.RedisService; import cn.zifangsky.mqwebsocket.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.user.SimpUser; import org.springframework.messaging.simp.user.SimpUserRegistry; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import java.text.MessageFormat; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 測試{@link org.springframework.messaging.simp.SimpMessagingTemplate}類的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */ @Controller @RequestMapping(("/wsTemplate")) public class RedisMessageController { private final Logger logger = LoggerFactory.getLogger(getClass()); @Value("${spring.redis.message.topic-name}") private String topicName; @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; /** * 給指定使用者傳送WebSocket訊息 */ @PostMapping("/sendToUser") @ResponseBody public String chat(HttpServletRequest request) { //訊息接收者 String receiver = request.getParameter("receiver"); //訊息內容 String msg = request.getParameter("msg"); HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg)); this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData)); return "ok"; } /** * 給指定使用者傳送訊息,並處理接收者不線上的情況 * @param sender 訊息傳送者 * @param receiver 訊息接收者 * @param destination 目的地 * @param payload 訊息正文 */ private void sendToUser(String sender, String receiver, String destination, String payload){ SimpUser simpUser = userRegistry.getUser(receiver); //如果接收者存在,則傳送訊息 if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ messagingTemplate.convertAndSendToUser(receiver, destination, payload); } //如果接收者線上,則說明接收者連線了叢集的其他節點,需要通知接收者連線的那個節點發送訊息 else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){ RedisWebsocketMsg<String> redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload); redisService.convertAndSend(topicName, redisWebsocketMsg); } //否則將訊息儲存到redis,等使用者上線後主動拉取未讀訊息 else{ //儲存訊息的Redis列表名 String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination; logger.info(MessageFormat.format("訊息接收者{0}還未建立WebSocket連線,{1}傳送的訊息【{2}】將被儲存到Redis的【{3}】列表中", receiver, sender, payload, listKey)); //儲存訊息到Redis中 redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); } } /** * 拉取指定監聽路徑的未讀的WebSocket訊息 * @param destination 指定監聽路徑 * @return java.util.Map<java.lang.String,java.lang.Object> */ @PostMapping("/pullUnreadMessage") @ResponseBody public Map<String, Object> pullUnreadMessage(String destination){ Map<String, Object> result = new HashMap<>(); try { HttpSession session = SpringContextUtils.getSession(); //當前登入使用者 User loginUser = (User) session.getAttribute(Constants.SESSION_USER); //儲存訊息的Redis列表名 String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination; //從Redis中拉取所有未讀訊息 List<Object> messageList = redisService.rangeList(listKey, 0, -1); result.put("code", "200"); if(messageList !=null && messageList.size() > 0){ //刪除Redis中的這個未讀訊息列表 redisService.delete(listKey); //將資料新增到返回集,供前臺頁面展示 result.put("result", messageList); } }catch (Exception e){ result.put("code", "500"); result.put("msg", e.getMessage()); } return result; } } 複製程式碼
(5)其他攔截器處理WebSocket連線相關問題:
i)AuthHandshakeInterceptor:
package cn.zifangsky.mqwebsocket.interceptor.websocket; import cn.zifangsky.mqwebsocket.common.Constants; import cn.zifangsky.mqwebsocket.common.SpringContextUtils; import cn.zifangsky.mqwebsocket.model.User; import cn.zifangsky.mqwebsocket.service.RedisService; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import javax.annotation.Resource; import javax.servlet.http.HttpSession; import java.text.MessageFormat; import java.util.Map; /** * 自定義{@link org.springframework.web.socket.server.HandshakeInterceptor},實現“需要登入才允許連線WebSocket” * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */ @Component public class AuthHandshakeInterceptor implements HandshakeInterceptor { private final Logger logger = LoggerFactory.getLogger(getClass()); @Resource(name = "redisServiceImpl") private RedisService redisService; @Override public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception { HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, loginUser.getUsername())){ logger.error("同一個使用者不準建立多個連線WebSocket"); return false; }else if(loginUser == null || StringUtils.isBlank(loginUser.getUsername())){ logger.error("未登入系統,禁止連線WebSocket"); return false; }else{ logger.debug(MessageFormat.format("使用者{0}請求建立WebSocket連線", loginUser.getUsername())); return true; } } @Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { } } 複製程式碼
ii)MyHandshakeHandler:
package cn.zifangsky.mqwebsocket.interceptor.websocket; import cn.zifangsky.mqwebsocket.common.Constants; import cn.zifangsky.mqwebsocket.common.SpringContextUtils; import cn.zifangsky.mqwebsocket.model.User; import cn.zifangsky.mqwebsocket.service.RedisService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.server.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import javax.annotation.Resource; import javax.servlet.http.HttpSession; import java.security.Principal; import java.text.MessageFormat; import java.util.Map; /** * 自定義{@link org.springframework.web.socket.server.support.DefaultHandshakeHandler},實現“生成自定義的{@link java.security.Principal}” * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */ @Component public class MyHandshakeHandler extends DefaultHandshakeHandler{ private final Logger logger = LoggerFactory.getLogger(getClass()); @Resource(name = "redisServiceImpl") private RedisService redisService; @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); if(loginUser != null){ logger.debug(MessageFormat.format("WebSocket連線開始建立Principal,使用者:{0}", loginUser.getUsername())); //1. 將使用者名稱存到Redis中 redisService.addToSet(Constants.REDIS_WEBSOCKET_USER_SET, loginUser.getUsername()); //2. 返回自定義的Principal return new MyPrincipal(loginUser.getUsername()); }else{ logger.error("未登入系統,禁止連線WebSocket"); return null; } } } 複製程式碼
iii)MyChannelInterceptor:
package cn.zifangsky.mqwebsocket.interceptor.websocket; import cn.zifangsky.mqwebsocket.common.Constants; import cn.zifangsky.mqwebsocket.service.RedisService; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.security.Principal; import java.text.MessageFormat; /** * 自定義{@link org.springframework.messaging.support.ChannelInterceptor},實現斷開連線的處理 * * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */ @Component public class MyChannelInterceptor implements ChannelInterceptor{ private final Logger logger = LoggerFactory.getLogger(getClass()); @Resource(name = "redisServiceImpl") private RedisService redisService; @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //使用者已經斷開連線 if(StompCommand.DISCONNECT.equals(command)){ String user = ""; Principal principal = accessor.getUser(); if(principal != null && StringUtils.isNoneBlank(principal.getName())){ user = principal.getName(); //從Redis中移除使用者 redisService.removeFromSet(Constants.REDIS_WEBSOCKET_USER_SET, user); }else{ user = accessor.getSessionId(); } logger.debug(MessageFormat.format("使用者{0}的WebSocket連線已經斷開", user)); } } } 複製程式碼
(6)WebSocket相關配置:
package cn.zifangsky.mqwebsocket.config; import cn.zifangsky.mqwebsocket.interceptor.websocket.MyHandshakeHandler; import cn.zifangsky.mqwebsocket.interceptor.websocket.AuthHandshakeInterceptor; import cn.zifangsky.mqwebsocket.interceptor.websocket.MyChannelInterceptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * WebSocket相關配置 * * @author zifangsky * @date 2018/9/30 * @since 1.0.0 */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{ @Autowired private AuthHandshakeInterceptor authHandshakeInterceptor; @Autowired private MyHandshakeHandler myHandshakeHandler; @Autowired private MyChannelInterceptor myChannelInterceptor; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/chat-websocket") .addInterceptors(authHandshakeInterceptor) .setHandshakeHandler(myHandshakeHandler) .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //客戶端需要把訊息傳送到/message/xxx地址 registry.setApplicationDestinationPrefixes("/message"); //服務端廣播訊息的路徑字首,客戶端需要相應訂閱/topic/yyy這個地址的訊息 registry.enableSimpleBroker("/topic"); //給指定使用者傳送訊息的路徑字首,預設值是/user/ registry.setUserDestinationPrefix("/user/"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); } } 複製程式碼
(7)示例頁面:
<html xmlns:th="http://www.thymeleaf.org"> <head> <meta content="text/html;charset=UTF-8"/> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <meta http-equiv="X-UA-Compatible" content="IE=edge"/> <meta name="viewport" content="width=device-width, initial-scale=1"/> <title>Chat With STOMP Message</title> <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script> <script th:src="@{/layui/layui.js}"></script> <script th:src="@{/layui/lay/modules/layer.js}"></script> <link th:href="@{/layui/css/layui.css}" rel="stylesheet"> <link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet"> <link th:href="@{/css/style.css}" rel="stylesheet"> <style type="text/css"> #connect-container { margin: 0 auto; width: 400px; } #connect-container div { padding: 5px; margin: 0 7px 10px 0; } .message input { padding: 5px; margin: 0 7px 10px 0; } .layui-btn { display: inline-block; } </style> <script type="text/javascript"> var stompClient = null; $(function () { var target = $("#target"); if (window.location.protocol === 'http:') { target.val('http://' + window.location.host + target.val()); } else { target.val('https://' + window.location.host + target.val()); } }); function setConnected(connected) { var connect = $("#connect"); var disconnect = $("#disconnect"); var echo = $("#echo"); if (connected) { connect.addClass("layui-btn-disabled"); disconnect.removeClass("layui-btn-disabled"); echo.removeClass("layui-btn-disabled"); } else { connect.removeClass("layui-btn-disabled"); disconnect.addClass("layui-btn-disabled"); echo.addClass("layui-btn-disabled"); } connect.attr("disabled", connected); disconnect.attr("disabled", !connected); echo.attr("disabled", !connected); } //連線 function connect() { var target = $("#target").val(); var ws = new SockJS(target); stompClient = Stomp.over(ws); stompClient.connect({}, function () { setConnected(true); log('Info: STOMP connection opened.'); //連線成功後,主動拉取未讀訊息 pullUnreadMessage("/topic/reply"); //訂閱服務端的/topic/reply地址 stompClient.subscribe("/user/topic/reply", function (response) { log(JSON.parse(response.body).content); }) },function () { //斷開處理 setConnected(false); log('Info: STOMP connection closed.'); }); } //斷開連線 function disconnect() { if (stompClient != null) { stompClient.disconnect(); stompClient = null; } setConnected(false); log('Info: STOMP connection closed.'); } //向指定使用者傳送訊息 function sendMessage() { if (stompClient != null) { var receiver = $("#receiver").val(); var msg = $("#message").val(); log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg})); $.ajax({ url: "/wsTemplate/sendToUser", type: "POST", dataType: "json", async: true, data: { "receiver": receiver, "msg": msg }, success: function (data) { } }); } else { layer.msg('STOMP connection not established, please connect.', { offset: 'auto' ,icon: 2 }); } } //從伺服器拉取未讀訊息 function pullUnreadMessage(destination) { $.ajax({ url: "/wsTemplate/pullUnreadMessage", type: "POST", dataType: "json", async: true, data: { "destination": destination }, success: function (data) { if (data.result != null) { $.each(data.result, function (i, item) { log(JSON.parse(item).content); }) } else if (data.code !=null && data.code == "500") { layer.msg(data.msg, { offset: 'auto' ,icon: 2 }); } } }); } //日誌輸出 function log(message) { console.debug(message); } </script> </head> <body> <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable Javascript and reload this page!</h2></noscript> <div> <div id="connect-container" class="layui-elem-field"> <legend>Chat With STOMP Message</legend> <div> <input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/> </div> <div> <button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button> <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="disconnect();">Disconnect </button> </div> <div class="message"> <input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/> <input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="訊息內容" value=""/> </div> <div> <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="sendMessage();">Send Message </button> </div> </div> </div> </body> </html> 複製程式碼
測試效果省略,具體效果可以自行在兩臺不同伺服器上面執行示例原始碼檢視。