socket系列(三)——Spring-socket實時通訊、推送
Spring-socket實現實時通訊
實現
Spring4.x釋出spring-socket模組包,用於支援websocket,同時相容支援socketJS。
需要把spring所有的包更新到4.x以上版本,並下載spring-websocket包。
所需jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <!-- For SockJS --> <!--http://jira.codehaus.org/browse/JACKSON-884 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.6.4</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> <scope>runtime</scope> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency>
專案結構,馬賽克的檔案不用管
SpringMVC配置檔案
<?xml version="1.0"encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd"> <context:component-scan base-package="com.lc.controller"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <mvc:annotation-driven /> <!-- websocket配置 --> <bean id="HelloHandler" class="com.lc.websocket.MySorketHandle" /> <websocket:handlers> <!-- 配置訊息處理bean和路徑的對映關係 --> <websocket:mapping path="/hello" handler="HelloHandler"/> <!-- 配置握手攔截器 --> <websocket:handshake-interceptors> <bean class="com.lc.websocket.HandshakeInterceptor"/> </websocket:handshake-interceptors> <!-- 開啟sockjs,去掉則關閉sockjs --> <!-- <websocket:sockjs /> --> </websocket:handlers> <!-- 配置websocket訊息的最大緩衝區長度 --> <!-- <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean"> <propertyname="maxTextMessageBufferSize" value="8192" /> <propertyname="maxBinaryMessageBufferSize" value="8192" /> </bean>--> <!-- 檢視層配置 --> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="prefix" value="/WEB-INF/jsps/"/> <property name="suffix" value=".jsp"/> </bean> </beans>
Websorket實現類
import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; public classMySorketHandle extends TextWebSocketHandler { // 線上人數 private static int count; private staticCopyOnWriteArraySet<WebSocketSession> set = new CopyOnWriteArraySet<>(); private WebSocketSession session; @Override public voidafterConnectionEstablished(WebSocketSession session) { this.session = session; try{ set.add(this.session); }catch(Exception e) { e.printStackTrace(); } MySorketHandle.addOnlineCount(); System.out.println("目前連線人數:" + getOnlineCount()); } public voidafterConnectionClosed(WebSocketSession session,CloseStatus closeStatus) { this.session = session; set.remove(this.session); subOnlineCount(); System.out.println("目前連線人數:" + getOnlineCount()); } public void handleMessage(WebSocketSessionsession,WebSocketMessage<?>message){ System.out.println("text message: "+ session.getId()+ "-"+ message.getPayload()); for(WebSocketSession ssion : set) { try { ssion.sendMessage(message); }catch(IOException e) { e.printStackTrace(); } } } public static int getOnlineCount() { return count; } public static void addOnlineCount() { count++; } public static void subOnlineCount() { count--; } }
HandshakeInterceptor類
import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
importorg.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
public classHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
/*
* 握手前處理動作
*/
@Override
public booleanbeforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler,
Map<String,Object> map)throwsException {
System.out.println("握手前");
return super.beforeHandshake(request, response, handler, map);
}
@Override
public void afterHandshake(ServerHttpRequestrequest,ServerHttpResponse response,WebSocketHandler wsHandler,Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}
}
Controller頁面跳轉
@Controller
@RequestMapping("/im")
public classIMController {
@RequestMapping("/page")
public Stringpage(HttpServletRequest request, HttpServletResponse response) {
return "IMpage";
}
}
頁面
<%@ page language="java"contentType="text/html; charset=UTF-8"
pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>socket</title>
<script type="text/javascript" src="http://cdn.static.runoob.com/libs/jquery/2.1.1/jquery.min.js"></script>
</head>
<body>
welcome<br />
<input id="text" type="text"/>
<button onclick="sendMsg()">sendMsg</button>
<hr/>
<button onclick="closeWebSocket()">close WebSocketconnection</button>
<hr/>
<div id="message"></div>
</body>
<script type="text/javascript">
var websocket = null;
//判斷瀏覽器是否支援websocket
if('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8080/sdz-web//hello");
}else{
$("#message").html("該瀏覽器不支援實時通訊功能");
}
websocket.onopen= function() {
console.log("websocket連線成功");
}
websocket.onclose= function() {
console.log("websocket連線關閉");
}
websocket.onmessage= function(event) {
console.log("接收訊息");
console.log(event);
printMsg(event.data);
}
//列印訊息
function printMsg(msg) {
$("#message").append(msg+ "<br/>");
}
function sendMsg() {
var msg = $("#text").val();
websocket.send(msg);
}
function closeWebSocket(){
websocket.close();
}
</script>
</html>
執行
總結
MySorketHandle類是websocket的實現類,這裡我繼承TextWebSocketHandler類,也可以實現WebSocketHandle介面,詳細可以看看官方文件,這裡大致介紹一下幾個主要方法。
afterConnectionEstablished(WebSocketSessionsession):當有新的連線時執行方法,這裡執行的是給session賦值,新增該連線物件,增加連線數量,列印連線資訊
afterConnectionClosed(WebSocketSessionsession,CloseStatus closeStatus):當有連線關閉時執行方法,這裡執行的是移除連線物件和列印資訊操作。
handleMessage(WebSocketSessionsession,WebSocketMessage<?> message):當有新訊息傳後臺時執行方法,這裡執行的是給所有連線物件傳送該請求。
handleTransportError(WebSocketSessionsession,Throwable exception):當有連線錯誤/異常時執行方法。
在spring-socket中,每次連線,都不會建立新的MySorketHandle,但會建立新的sesion物件,所以這裡用CopyOnWriteArraySet<WebSocketSession> set來存放所有連線物件。
配置方面主要是這些
<!-- websocket配置 -->
<bean id="HelloHandler"class="com.lc.websocket.MySorketHandle" />
<websocket:handlers>
<!--配置訊息處理bean和路徑的對映關係 -->
<websocket:mappingpath="/hello" handler="HelloHandler" />
<!--配置握手攔截器 -->
<websocket:handshake-interceptors>
<beanclass="com.lc.websocket.HandshakeInterceptor" />
</websocket:handshake-interceptors>
<!--開啟sockjs,去掉則關閉sockjs -->
<!--<websocket:sockjs /> -->
</websocket:handlers>
<!-- 配置websocket訊息的最大緩衝區長度-->
<!-- <bean
class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean">
<propertyname="maxTextMessageBufferSize" value="8192" />
<propertyname="maxBinaryMessageBufferSize" value="8192" />
</bean> -->
當然,別忘了加上
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="http://www.springframework.org/schema/websocket
http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd"
實現觸發推送
對於上個例子,實現伺服器觸發推送功能,需求如下:
訪問” http://127.0.0.1:8080/sdz-web/im/push”介面時,伺服器會向指定的連線客戶端推送資訊,引數sessionId是客戶端連線的sessionId,如果不帶引數,則向所有的連線的客戶端推送資訊。
實現
專案結構,馬賽克的檔案不用管
編寫測試方法,分別測試全部推送和指定推送
public classIMControllerPushTest {
@Test
public void pushAll() throws IOException {
URLurl= newURL("http://127.0.0.1:8080/sdz-web/im/push");
HttpURLConnectionconn= (HttpURLConnection) url.openConnection();
conn.setRequestProperty("User-Agent","Mozilla/5.0 (Windows NT 6.1; Win64;x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110Safari/537.36");
conn.setDoOutput(true);
conn.setDoInput(true);
OutputStreamos = conn.getOutputStream();
Stringparams= "";
os.write(params.getBytes("UTF-8"));
InputStreamis = conn.getInputStream();
byte[] res = new byte[1024];
StringBuffersb = new StringBuffer();
int l = 0;
while((l = is.read(res)) != -1) {
sb.append(new String(res, 0, l, "UTF-8"));
}
System.out.println(sb);
}
@Test
public void push() throws IOException {
URLurl= newURL("http://127.0.0.1:8080/sdz-web/im/push");
HttpURLConnectionconn= (HttpURLConnection) url.openConnection();
conn.setRequestProperty("User-Agent","Mozilla/5.0 (Windows NT 6.1; Win64;x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110Safari/537.36");
conn.setDoOutput(true);
conn.setDoInput(true);
OutputStreamos = conn.getOutputStream();
Stringparams= "sessionId=0";
os.write(params.getBytes("UTF-8"));
InputStreamis = conn.getInputStream();
byte[] res = new byte[1024];
StringBuffersb = new StringBuffer();
int l = 0;
while((l = is.read(res)) != -1) {
sb.append(new String(res, 0, l, "UTF-8"));
}
System.out.println(sb);
}
}
IMController
@Controller
@RequestMapping("/im")
public classIMController {
@Bean
public MySorketHandlemySorketHandle() {
return new MySorketHandle();
}
@RequestMapping("/page")
public Stringpage(HttpServletRequest request, HttpServletResponse response) {
return "IMpage";
}
@ResponseBody
@RequestMapping("/push")
public String push(@RequestParam(required = false) String sessionId,
HttpServletResponseresponse){
Stringmsg= "";
if (StringUtils.isEmpty(sessionId)) {
msg =mySorketHandle().pushMsg("伺服器推送資訊了");
System.out.println(msg);
}else{
msg =mySorketHandle().pushMsg(sessionId, "伺服器推送資訊了");
System.out.println(msg);
}
return msg;
}
}
MySorkerHandler
/**
* 給指定連線推訊息
* @param session
* @param message
*/
public String pushMsg(String sessionid, String message){
for(WebSocketSession ssion : set) {
try {
if(sessionid.equals(ssion.getId())){
ssion.sendMessage(new TextMessage(message));
return "機器:" + sessionid+ "推送成功";
}
}catch(IOException e) {
e.printStackTrace();
}
}
return "推送失敗";
}
/**
* 給全部連線
* @param message
* @return
*/
public String pushMsg(String message) {
int i = 0;
for(WebSocketSession ssion : set) {
try {
ssion.sendMessage(new TextMessage(message));
i++;
}catch(IOException e) {
e.printStackTrace();
}
}
return "共有" + i + "得到推送";
}
執行
總結
關鍵程式碼就是socket的功能實現和controller的呼叫
controller的呼叫
@Bean
public MySorketHandle mySorketHandle() {
returnnew MySorketHandle();
}
@ResponseBody
@RequestMapping("/push")
public String push(@RequestParam(required =false) String sessionId,
HttpServletResponseresponse) {
Stringmsg = "";
if(StringUtils.isEmpty(sessionId)) {
msg= mySorketHandle().pushMsg("伺服器推送資訊了");
System.out.println(msg);
}else {
msg= mySorketHandle().pushMsg(sessionId, "伺服器推送資訊了");
System.out.println(msg);
}
returnmsg;
}
socket的功能
/**
* 給指定連線推訊息
* @param session
* @param message
*/
public String pushMsg(String sessionid, String message){
for(WebSocketSession ssion : set) {
try {
if(sessionid.equals(ssion.getId())){
ssion.sendMessage(new TextMessage(message));
return "機器:" + sessionid+ "推送成功";
}
}catch(IOException e) {
e.printStackTrace();
}
}
return "推送失敗";
}
/**
* 給全部連線
* @param message
* @return
*/
public String pushMsg(String message) {
int i = 0;
for(WebSocketSession ssion : set) {
try {
ssion.sendMessage(new TextMessage(message));
i++;
}catch(IOException e) {
e.printStackTrace();
}
}
return "共有" + i + "得到推送";
}
Push是一個觸發介面,呼叫MySorketHandle的pushMsg()方法。
如果”@ResponBody”返回資料是亂碼,在配置檔案中加上
<mvc:annotation-driven >
<mvc:message-converters register-defaults="true">
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes" value="text/html;charset=UTF-8"/>
</bean>
</mvc:message-converters>
</mvc:annotation-driven>
原因是springMVC對string的處理類StringHttpMessageConverter 中,預設採用的字符集是ISO-8859-1,而且是final的。
public static final Charset DEFAULT_CHARSET= Charset.forName("ISO-8859-1");