SpringBoot+Stomp實現WebSocket訊息推送
阿新 • • 發佈:2019-01-01
一、Stomp概念
STOMP是在WebSocket之上提供了一個基於幀的線路格式層,用於定義訊息的語義。 比起原生WebSocket,穩定性和功能性都好得多。
SEND
destination:/app/sendTest
content-length:23
{"name":"asdfsadfsadf"}
- 這裡STOMP的命令是SEND,後面接傳送的目標地址,訊息內容長度,然後是一個空行,最後是傳送內容,這個裡面是一個JSON訊息。
- 這裡需要注意的是destination,目標地址,訊息會發送到這個目的地,這個目的地有服務端元件來進行處理。
- Spring使用STOMP需要進行配置,並且Spring為STOMP訊息提供了基於SpringMVC的程式設計模型!
1、SpringBoot的POM依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-joda</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-parameter-names</artifactId> </dependency>
2、配置Stomp:
package com.sample.suncht.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; import com.sample.suncht.websocket.HttpSessionIdHandshakeInterceptor; import com.sample.suncht.websocket.PresenceChannelInterceptor; /** * * @ClassName: WebSocketStompConfig * @Description: springboot websocket stomp配置 * 參考: * https://docs.spring.io/spring/docs/4.0.1.RELEASE/spring-framework-reference/html/websocket.html * https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-fallback-sockjs-client */ @Configuration @EnableWebSocketMessageBroker public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer { /** * 註冊stomp的端點(必須) */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 允許使用socketJs方式訪問,訪問點為webSocketServer // 在網頁上我們就可以通過這個連結 http://localhost:8080/webSocketServer 來和伺服器的WebSocket連線 registry.addEndpoint("/webSocketServer").withSockJS(); } /** * 配置資訊代理(必須) */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // 訂閱Broker名稱 registry.enableSimpleBroker("/queue", "/topic"); // 全域性使用的訊息字首(客戶端訂閱路徑上會體現出來) registry.setApplicationDestinationPrefixes("/ms"); // 點對點使用的訂閱字首(客戶端訂閱路徑上會體現出來),不設定的話,預設也是/user/ // registry.setUserDestinationPrefix("/user/"); } /** * 訊息傳輸引數配置(可選) */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { registry.setMessageSizeLimit(8192) //設定訊息位元組數大小 .setSendBufferSizeLimit(8192)//設定訊息快取大小 .setSendTimeLimit(10000); //設定訊息傳送時間限制毫秒 } /** * 輸入通道引數設定(可選) */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(4) //設定訊息輸入通道的執行緒池執行緒數 .maxPoolSize(8)//最大執行緒數 .keepAliveSeconds(60);//執行緒活動時間 registration.setInterceptors(presenceChannelInterceptor()); } /** * 輸出通道引數設定(可選) */ @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(4).maxPoolSize(8); registration.setInterceptors(presenceChannelInterceptor()); } @Bean public HttpSessionIdHandshakeInterceptor httpSessionIdHandshakeInterceptor() { return new HttpSessionIdHandshakeInterceptor(); } @Bean public PresenceChannelInterceptor presenceChannelInterceptor() { return new PresenceChannelInterceptor(); } }
注意:
(1)註冊stomp的端點 registry.addEndpoint("/webSocketServer").withSockJS(); 沒有新增.setAllowedOrigins("*")允許跨域
sockjs1.0版本以上必須要setAllowedOrigins("*"),否則報錯: http://localhost:8112/endpointWisely/info?t=1510914882520 的遠端資源。(原因:CORS 頭缺少 'Access-Control-Allow-Origin')
sockjs0.3.4版本可以不需要setAllowedOrigins("*"),也就是不會允許跨域。
本人一專案一開始使用sockjs1.1.4版本,也設定setAllowedOrigins("*"),但是還是一直上述報錯,弄了一整天都沒搞定,可能是跟開發環境配置有關吧。最後使用sockjs0.3.4版本就可以了。
(2)上述配置是Spring程式碼方式配置, 也可以使用XML配置,如下:
<websocket:message-broker application-destination-prefix="/app">
<websocket:transport message-size="131072" send-timeout="1000" send-buffer-size="8192"/>
<websocket:stomp-endpoint path="/webSocketServer">
<websocket:handshake-interceptors>
<bean class="com.sample.suncht.websocket.HttpSessionIdHandshakeInterceptor"/>
</websocket:handshake-interceptors>
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/topic,/queue"/>
</websocket:message-broker>
3、Controller:
package com.sample.suncht.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
import com.sample.suncht.model.ClientMessage;
import com.sample.suncht.model.ServerMessage;
@Controller
public class WebSocketController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@MessageMapping("/sendTest")
@SendTo("/topic/subscribeTest")
public ServerMessage sendDemo(ClientMessage message) {
logger.info("接收到了資訊" + message.getName());
return new ServerMessage("你傳送的服務返回訊息為:" + message.getName());
}
@SubscribeMapping("/subscribeTest")
public ServerMessage sub() {
logger.info("XXX使用者訂閱了我。。。");
return new ServerMessage("感謝你訂閱了我。。。");
}
@RequestMapping("/startStomp.do")
@ResponseBody
public String startStomp() {
final int counter = 10;
MoreExecutors.newDirectExecutorService().submit(() -> {
int index = 0;
while (index++ < counter) {
messagingTemplate.convertAndSend("/topic/subscribeTest", new ServerMessage("伺服器主動推的資料["+index+"] : " + DateUtils.simpleFormat(new Date())));
try {
Thread.sleep(RandomUtils.nextInt(0, 3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
return "ok";
}
}
4、前端Html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta http-equiv="content-type" content="text/html; charset=utf-8">
<title>stomp</title>
</head>
<body>
Welcome
<br />
<input id="text" type="text" />
<button onclick="send()">傳送訊息</button>
<button onclick="subscribe1()">訂閱訊息/topic/subscribeTest</button>
<hr />
<button onclick="closeWebSocket()">關閉WebSocket連線</button>
<hr />
<div id="message"></div>
</body>
<script src="/static/stomp.js/stomp.min.js"></script>
<script src="/static/sockjs-client/0.3.4/sockjs.min.js"></script>
<script src="/static/WebsocketProxy.js"></script>
<script type="text/javascript">
var socket = new WebsocketProxy("/webSocketServer");
socket.connect({}, function connectCallback(frame) {
// 連線成功時(伺服器響應 CONNECTED 幀)的回撥方法
setMessageInnerHTML("連線成功");
subscribe1();
}, function errorCallBack(error) {
// 連線失敗時(伺服器響應 ERROR 幀)的回撥方法
setMessageInnerHTML("連線失敗");
});
//傳送訊息
function send() {
var message = document.getElementById('text').value;
var messageJson = JSON.stringify({
"name" : message
});
socket.send("/ms/sendTest", {}, messageJson);
//stompClient.send("/ms/sendTest", {}, messageJson);
//setMessageInnerHTML("/ms/sendTest 你傳送的訊息:" + message);
}
//訂閱訊息
function subscribe1() {
socket.subscribe('/topic/subscribeTest', function(response) {
setMessageInnerHTML("已成功訂閱/topic/subscribeTest");
var returnData = JSON.parse(response.body);
setMessageInnerHTML("/topic/subscribeTest 你接收到的訊息為:" + returnData.responseMessage);
});
}
//將訊息顯示在網頁上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</html>
WebsocketProxy.js封裝了Stomp實現:
(function(window, undefined) {
/**
* Stomp的API可檢視:https://blog.csdn.net/jqsad/article/details/77745379
*/
var StompProxy = function(websocketUrl, heartbeat) {
this.socket = new SockJS(websocketUrl);
this.stompClient = Stomp.over(this.socket);
if(heartbeat) {
//心跳檢測機制
this.stompClient.heartbeat.outgoing = heartbeat.outgoing || 20000;
this.stompClient.heartbeat.incoming = heartbeat.incoming || 0;
}
};
/**
* 發起連線
* headers表示客戶端的認證資訊
* connectCallback 表示連線成功時(伺服器響應 CONNECTED 幀)的回撥方法;
errorCallback 表示連線失敗時(伺服器響應 ERROR 幀)的回撥方法,非必須;
*/
StompProxy.prototype.connect = function(headers, connectCallback, errorCallBack) {
this.stompClient.connect(headers||{}, connectCallback, errorCallBack);
};
/**
* 斷開連線
*/
StompProxy.prototype.disconnect = function(disconnectCallback) {
disconnectCallback && disconnectCallback();
};
/**
* 傳送資訊
* destination url 為伺服器 controller中 @MessageMapping 中匹配的URL,字串,必須引數;
headers 為傳送資訊的header,JavaScript 物件,可選引數;
body 為傳送資訊的 body,字串,可選引數;
*/
StompProxy.prototype.send = function(sendUrl, param, messageJson) {
this.stompClient.send(sendUrl, param||{}, messageJson);
};
/**
* 訂閱、接收資訊
* destination url 為伺服器 @SendTo 匹配的 URL,字串;
callback 為每次收到伺服器推送的訊息時的回撥方法,該方法包含引數 message;
headers 為附加的headers,JavaScript 物件;什麼作用?
該方法返回一個包含了id屬性的 JavaScript 物件,可作為 unsubscribe() 方法的引數;
*/
StompProxy.prototype.subscribe = function(subscribeUrl, subscribeCallback, headers) {
var subscribeObj = this.stompClient.subscribe(subscribeUrl, subscribeCallback, headers||{});
return subscribeObj;
};
/**
* 取消訂閱
*/
StompProxy.prototype.unsubscribe = function(subscribeObj) {
subscribeObj && subscribeObj.unsubscribe();
};
/**
* STOMP 客戶端預設將傳輸過程中的所有 debug 資訊以 console.log() 形式輸出到客戶端瀏覽器
*/
StompProxy.prototype.debug = function(debugCallback) {
this.stompClient.debug = function(str) {
window.console && window.console.log(str);
debugCallback && debugCallback(str);
};
};
window.WebsocketProxy = StompProxy;
})(window, undefined);