1. 程式人生 > >SpringBoot+Stomp實現WebSocket訊息推送

SpringBoot+Stomp實現WebSocket訊息推送

一、Stomp概念

STOMP是在WebSocket之上提供了一個基於幀的線路格式層,用於定義訊息的語義。 比起原生WebSocket,穩定性和功能性都好得多。

SEND
destination:/app/sendTest
content-length:23

{"name":"asdfsadfsadf"}
  • 這裡STOMP的命令是SEND,後面接傳送的目標地址,訊息內容長度,然後是一個空行,最後是傳送內容,這個裡面是一個JSON訊息。
  • 這裡需要注意的是destination,目標地址,訊息會發送到這個目的地,這個目的地有服務端元件來進行處理。
  • Spring使用STOMP需要進行配置,並且Spring為STOMP訊息提供了基於SpringMVC的程式設計模型!

1、SpringBoot的POM依賴:

        <dependency>
		    <groupId>org.springframework.boot</groupId>
		    <artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.datatype</groupId>
			<artifactId>jackson-datatype-joda</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.module</groupId>
			<artifactId>jackson-module-parameter-names</artifactId>
		</dependency>

2、配置Stomp:

package com.sample.suncht.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

import com.sample.suncht.websocket.HttpSessionIdHandshakeInterceptor;
import com.sample.suncht.websocket.PresenceChannelInterceptor;

/**
 * 
 * @ClassName: WebSocketStompConfig
 * @Description: springboot websocket stomp配置 
 * 參考:
 * 	https://docs.spring.io/spring/docs/4.0.1.RELEASE/spring-framework-reference/html/websocket.html
 *	https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-fallback-sockjs-client
 */

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer {

	/**
	 * 註冊stomp的端點(必須)
	 */
	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		// 允許使用socketJs方式訪問,訪問點為webSocketServer
		// 在網頁上我們就可以通過這個連結 http://localhost:8080/webSocketServer 來和伺服器的WebSocket連線
		registry.addEndpoint("/webSocketServer").withSockJS();
	}

	/**
	 * 配置資訊代理(必須)
	 */
	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		// 訂閱Broker名稱
		registry.enableSimpleBroker("/queue", "/topic");
		// 全域性使用的訊息字首(客戶端訂閱路徑上會體現出來)
		registry.setApplicationDestinationPrefixes("/ms");
		// 點對點使用的訂閱字首(客戶端訂閱路徑上會體現出來),不設定的話,預設也是/user/
		// registry.setUserDestinationPrefix("/user/");
	}

	/**
	 * 訊息傳輸引數配置(可選)
	 */
	@Override
	public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
		registry.setMessageSizeLimit(8192) //設定訊息位元組數大小
				.setSendBufferSizeLimit(8192)//設定訊息快取大小
				.setSendTimeLimit(10000); //設定訊息傳送時間限制毫秒
	}

	/**
	 * 輸入通道引數設定(可選)
	 */
	@Override
	public void configureClientInboundChannel(ChannelRegistration registration) {
		registration.taskExecutor().corePoolSize(4) //設定訊息輸入通道的執行緒池執行緒數
				.maxPoolSize(8)//最大執行緒數
				.keepAliveSeconds(60);//執行緒活動時間
		registration.setInterceptors(presenceChannelInterceptor());
	}

	/**
	 * 輸出通道引數設定(可選)
	 */
	@Override
	public void configureClientOutboundChannel(ChannelRegistration registration) {
		registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
		registration.setInterceptors(presenceChannelInterceptor());
	}

	@Bean
	public HttpSessionIdHandshakeInterceptor httpSessionIdHandshakeInterceptor() {
		return new HttpSessionIdHandshakeInterceptor();
	}

	@Bean
	public PresenceChannelInterceptor presenceChannelInterceptor() {
		return new PresenceChannelInterceptor();
	}

}

注意:

(1)註冊stomp的端點 registry.addEndpoint("/webSocketServer").withSockJS();  沒有新增.setAllowedOrigins("*")允許跨域

       sockjs1.0版本以上必須要setAllowedOrigins("*"),否則報錯: http://localhost:8112/endpointWisely/info?t=1510914882520 的遠端資源。(原因:CORS 頭缺少 'Access-Control-Allow-Origin')

       sockjs0.3.4版本可以不需要setAllowedOrigins("*"),也就是不會允許跨域。

      本人一專案一開始使用sockjs1.1.4版本,也設定setAllowedOrigins("*"),但是還是一直上述報錯,弄了一整天都沒搞定,可能是跟開發環境配置有關吧。最後使用sockjs0.3.4版本就可以了。

(2)上述配置是Spring程式碼方式配置, 也可以使用XML配置,如下:

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:transport message-size="131072" send-timeout="1000" send-buffer-size="8192"/>

        <websocket:stomp-endpoint path="/webSocketServer">
            <websocket:handshake-interceptors>
                <bean class="com.sample.suncht.websocket.HttpSessionIdHandshakeInterceptor"/>
            </websocket:handshake-interceptors>
            <websocket:sockjs/>
        </websocket:stomp-endpoint>

        <websocket:simple-broker prefix="/topic,/queue"/>
    </websocket:message-broker>

3、Controller:

package com.sample.suncht.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;

import com.sample.suncht.model.ClientMessage;
import com.sample.suncht.model.ServerMessage;

@Controller
public class WebSocketController {

	private Logger logger = LoggerFactory.getLogger(this.getClass());

	@Autowired
    private SimpMessagingTemplate messagingTemplate;
	
	@MessageMapping("/sendTest")
	@SendTo("/topic/subscribeTest")
	public ServerMessage sendDemo(ClientMessage message) {
		logger.info("接收到了資訊" + message.getName());
		return new ServerMessage("你傳送的服務返回訊息為:" + message.getName());
	}

	@SubscribeMapping("/subscribeTest")
	public ServerMessage sub() {
		logger.info("XXX使用者訂閱了我。。。");
		return new ServerMessage("感謝你訂閱了我。。。");
	}

	@RequestMapping("/startStomp.do")
    @ResponseBody
    public String startStomp() {
        final int counter = 10;
        MoreExecutors.newDirectExecutorService().submit(() -> {
            int index = 0;
            while (index++ < counter) {
                messagingTemplate.convertAndSend("/topic/subscribeTest", new ServerMessage("伺服器主動推的資料["+index+"] : " + DateUtils.simpleFormat(new Date())));
                try {
                    Thread.sleep(RandomUtils.nextInt(0, 3000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        return "ok";
    }
}

4、前端Html:

<!DOCTYPE html>
<html lang="zh-CN">

<head>
	<meta charset="UTF-8">
	<meta http-equiv="content-type" content="text/html; charset=utf-8">
	<title>stomp</title>
</head>

<body>
	Welcome
	<br />
	<input id="text" type="text" />
	<button onclick="send()">傳送訊息</button>
	<button onclick="subscribe1()">訂閱訊息/topic/subscribeTest</button>
	<hr />
	<button onclick="closeWebSocket()">關閉WebSocket連線</button>
	<hr />
	<div id="message"></div>
</body>

<script src="/static/stomp.js/stomp.min.js"></script>
<script src="/static/sockjs-client/0.3.4/sockjs.min.js"></script>
<script src="/static/WebsocketProxy.js"></script>
<script type="text/javascript">
	var socket = new WebsocketProxy("/webSocketServer");
	socket.connect({}, function connectCallback(frame) {
		// 連線成功時(伺服器響應 CONNECTED 幀)的回撥方法
		setMessageInnerHTML("連線成功");
		subscribe1();
	}, function errorCallBack(error) {
		// 連線失敗時(伺服器響應 ERROR 幀)的回撥方法
		setMessageInnerHTML("連線失敗");
	});

	//傳送訊息
	function send() {
		var message = document.getElementById('text').value;
		var messageJson = JSON.stringify({
			"name" : message
		});
		socket.send("/ms/sendTest", {}, messageJson);
		//stompClient.send("/ms/sendTest", {}, messageJson);
		//setMessageInnerHTML("/ms/sendTest 你傳送的訊息:" + message);
	}

	//訂閱訊息
	function subscribe1() {
		socket.subscribe('/topic/subscribeTest', function(response) {
			setMessageInnerHTML("已成功訂閱/topic/subscribeTest");
			var returnData = JSON.parse(response.body);
			setMessageInnerHTML("/topic/subscribeTest 你接收到的訊息為:" + returnData.responseMessage);
		});
	}
	

	//將訊息顯示在網頁上
	function setMessageInnerHTML(innerHTML) {
		document.getElementById('message').innerHTML += innerHTML + '<br/>';
	}
</script>

</html>

WebsocketProxy.js封裝了Stomp實現:

(function(window, undefined) {
	/**
	 * Stomp的API可檢視:https://blog.csdn.net/jqsad/article/details/77745379
	 */
	var StompProxy = function(websocketUrl, heartbeat) {
		this.socket = new SockJS(websocketUrl);
		this.stompClient = Stomp.over(this.socket);
		if(heartbeat) {
			//心跳檢測機制
			this.stompClient.heartbeat.outgoing = heartbeat.outgoing || 20000;
			this.stompClient.heartbeat.incoming = heartbeat.incoming || 0;
		}
	};
	
	/**
	 * 發起連線
	 * headers表示客戶端的認證資訊
	 * connectCallback 表示連線成功時(伺服器響應 CONNECTED 幀)的回撥方法; 
		errorCallback 表示連線失敗時(伺服器響應 ERROR 幀)的回撥方法,非必須;
	 */
	StompProxy.prototype.connect = function(headers, connectCallback, errorCallBack) {
		this.stompClient.connect(headers||{}, connectCallback, errorCallBack);
	};
	
	/**
	 * 斷開連線
	 */
	StompProxy.prototype.disconnect = function(disconnectCallback) {
		disconnectCallback && disconnectCallback();
	};
	
	/**
	 * 傳送資訊
	 * destination url 為伺服器 controller中 @MessageMapping 中匹配的URL,字串,必須引數; 
		headers 為傳送資訊的header,JavaScript 物件,可選引數; 
		body 為傳送資訊的 body,字串,可選引數;
	 */
	StompProxy.prototype.send = function(sendUrl, param, messageJson) {
		this.stompClient.send(sendUrl, param||{}, messageJson);
	};
	
	/**
	 * 訂閱、接收資訊
	 * destination url 為伺服器 @SendTo 匹配的 URL,字串; 
		callback 為每次收到伺服器推送的訊息時的回撥方法,該方法包含引數 message; 
		headers 為附加的headers,JavaScript 物件;什麼作用? 
		該方法返回一個包含了id屬性的 JavaScript 物件,可作為 unsubscribe() 方法的引數;
	 */
	StompProxy.prototype.subscribe = function(subscribeUrl, subscribeCallback, headers) {
		var subscribeObj = this.stompClient.subscribe(subscribeUrl, subscribeCallback, headers||{});
		return subscribeObj;
	};
	
	/**
	 * 取消訂閱
	 */
	StompProxy.prototype.unsubscribe = function(subscribeObj) {
		subscribeObj && subscribeObj.unsubscribe();
	};
	
	/**
	 * STOMP 客戶端預設將傳輸過程中的所有 debug 資訊以 console.log() 形式輸出到客戶端瀏覽器
	 */
	StompProxy.prototype.debug = function(debugCallback) {
		this.stompClient.debug = function(str) {
			window.console && window.console.log(str);
			debugCallback && debugCallback(str);
		};
	};
	
	window.WebsocketProxy = StompProxy;
})(window, undefined);