1. 程式人生 > >[轉]spring boot下WebSocket訊息推送

[轉]spring boot下WebSocket訊息推送

原文連結:spring boot下WebSocket訊息推送, 修正部分錯別字

WebSocket協議

WebSocket是一種在單個TCP連線上進行全雙工通訊的協議。WebSocket通訊協議於2011年被IETF定為標準RFC 6455,並由RFC7936補充規範。WebSocket API也被W3C定為標準。

WebSocket使得客戶端和伺服器之間的資料交換變得更加簡單,允許服務端主動向客戶端推送資料。在WebSocket API中,瀏覽器和伺服器只需要完成一次握手,兩者之間就直接可以建立永續性的連線,並進行雙向資料傳輸

STOMP協議

STOMP是面向文字的訊息傳送協議。STOMP客戶端與支援STOMP協議的訊息代理進行通訊。STOMP使用不同的命令,如連線,傳送,訂閱,斷開等進行通訊。

具體參考:官方介紹

SockJS

SockJS是一個JavaScript庫,提供跨瀏覽器JavaScript的API,建立了一個低延遲、全雙工的瀏覽器和web伺服器之間通訊通道


以上內容出自維基百科和百度百科

 

使用websocket有兩種方式:1是使用sockjs,2是使用h5的標準。使用Html5標準自然更方便簡單,所以記錄的是配合h5的使用方法。

1、pom引入

 核心是@ServerEndpoint這個註解。這個註解是Javaee標準裡的註解,tomcat7以上已經對其進行了實現,如果是用傳統方法使用tomcat釋出專案,只要在pom檔案中引入javaee標準即可使用。

     <dependency>
       <groupId>javax</groupId>
       <artifactId>javaee-api</artifactId>
       <version>7.0</version>
       <scope>provided</scope>
     </dependency>

但使用springboot的內建tomcat時,就不需要引入javaee-api了,spring-boot已經包含了。使用springboot的websocket功能首先引入springboot元件。

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-websocket</artifactId>
     </dependency>


2、使用@ServerEndpoint創立websocket endpoint

  首先要注入ServerEndpointExporter,這個bean會自動註冊使用了@ServerEndpoint註解宣告的Websocket endpoint。要注意,如果使用獨立的servlet容器,而不是直接使用springboot的內建容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理。

     @Configuration  
     public class WebSocketConfig {  
         @Bean  
         public ServerEndpointExporter serverEndpointExporter() {  
             return new ServerEndpointExporter();  
         }  
  
     }  

下面是websocket的具體實現方法,程式碼如下:

 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 @ServerEndpoint(value = "/websocket")
 @Component    //此註解千萬千萬不要忘記,它的主要作用就是將這個監聽器納入到Spring容器中進行管理
 public class WebSocket {
   //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。
   private static int onlineCount = 0;

   //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
    private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
  
    //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料
    private Session session;
  
    /**
     * 連線建立成功呼叫的方法
     */
    @OnOpen
    public void onOpen(Session session) {
      this.session = session;
      webSocketSet.add(this);     //加入set中
      addOnlineCount();           //線上數加1
      System.out.println("有新連線加入!當前線上人數為" + getOnlineCount());
      try {
        sendMessage("Hello world");
      } catch (IOException e) {
        System.out.println("IO異常");
      }
    }
  
    /**
     * 連線關閉呼叫的方法
     */
    @OnClose
    public void onClose() {
      webSocketSet.remove(this);  //從set中刪除
      subOnlineCount();           //線上數減1
      System.out.println("有一連線關閉!當前線上人數為" + getOnlineCount());
    }
  
    /**
     * 收到客戶端訊息後呼叫的方法
     *
     * @param message 客戶端傳送過來的訊息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
      System.out.println("來自客戶端的訊息:" + message);
 
      //群發訊息
      for (WebSocket item : webSocketSet) {
        try {
          item.sendMessage(message);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  
    /**
     * 發生錯誤時呼叫
     */
    @OnError
    public void onError(Session session, Throwable error) {
      System.out.println("發生錯誤");
      error.printStackTrace();
    }
  
  
    public void sendMessage(String message) throws IOException {
      this.session.getBasicRemote().sendText(message);
      //this.session.getAsyncRemote().sendText(message);
    }
  
    /**
    * 群發自定義訊息
    */
    public static void sendInfo(String message) throws IOException {
      for (WebSocket item : webSocketSet) {
        try {
          item.sendMessage(message);
        } catch (IOException e) {
          continue;
        }
      }
    }
  
    public static synchronized int getOnlineCount() {
      return onlineCount;
    }
  
    public static synchronized void addOnlineCount() {
      WebSocket.onlineCount++;
   }
 
   public static synchronized void subOnlineCount() {
     WebSocket.onlineCount--;
   }
 }

使用springboot的唯一區別是要@Component宣告下,而使用獨立容器是由容器自己管理websocket的,但在springboot中連容器都是spring管理的。

雖然@Component預設是單例模式的,但springboot還是會為每個websocket連線初始化一個bean,所以可以用一個靜態set儲存起來。

3、前端程式碼

<!DOCTYPE HTML>
<html>
<head>
    <title>My WebSocket</title>
</head>

<body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>

<script type="text/javascript">
    var websocket = null;

    //判斷當前瀏覽器是否支援WebSocket
    if('WebSocket' in window){
        websocket = new WebSocket("ws://localhost:8084/websocket");
    }
    else{
        alert('Not support websocket')
    }

    //連線發生錯誤的回撥方法
    websocket.onerror = function(){
        setMessageInnerHTML("error");
    };

    //連線成功建立的回撥方法
    websocket.onopen = function(event){
        setMessageInnerHTML("open");
    }

    //接收到訊息的回撥方法
    websocket.onmessage = function(event){
        setMessageInnerHTML(event.data);
    }

    //連線關閉的回撥方法
    websocket.onclose = function(){
        setMessageInnerHTML("close");
    }

    //監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連線,防止連線還沒斷開就關閉視窗,server端會拋異常。
    window.onbeforeunload = function(){
        websocket.close();
    }

    //將訊息顯示在網頁上
    function setMessageInnerHTML(innerHTML){
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }

    //關閉連線
    function closeWebSocket(){
        websocket.close();
    }

    //傳送訊息
    function send(){
        var message = document.getElementById('text').value;
        websocket.send(message);
    }
</script>
</html>

以上程式碼,實現了websocket簡單訊息推送,可以實現兩個頁面間的訊息顯示,但是Java後臺主動推送訊息時,無法獲取訊息推送的websocket下的session,即無法實現websocket下session的共享。

為解決主動推送的難題,需要在建立連線時,將websocket下的session與servlet下的HttpSession(或者其他session,我們這用到了shiro下的session)建立關聯關係。

 webSocket配置Java類:

import com.bootdo.common.utils.ShiroUtils;
import org.apache.catalina.session.StandardSessionFacade;
import org.apache.shiro.session.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;

@Configuration
public class WebSocketConfig extends Configurator {

  @Override
  public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    /*如果沒有監聽器,那麼這裡獲取到的HttpSession是null*/
    StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
    if (ssf != null) {
      HttpSession httpSession = (HttpSession) request.getHttpSession();
      //關鍵操作
      sec.getUserProperties().put("sessionId", httpSession.getId());
      System.out.println("獲取到的SessionID:" + httpSession.getId());
    }
  }

  /**
   * 引入shiro框架下的session,獲取session資訊
   */
  /*
  @Override
  public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    Session shiroSession = ShiroUtils.getSubjct().getSession();
    sec.getUserProperties().put("sessionId", shiroSession.getId());
  }
  */

  @Bean
  public ServerEndpointExporter serverEndpointExporter() {
    //這個物件說一下,貌似只有伺服器是tomcat的時候才需要配置,具體我沒有研究
    return new ServerEndpointExporter();
  }
}


webSocket訊息實現類方法:

複製程式碼
 

import org.springframework.stereotype.Component;

 import javax.websocket.*;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 //configurator = WebsocketConfig.class 該屬性就是我上面配置的資訊
 @ServerEndpoint(value = "/websocket", configurator = WebSocketConfig.class)
 @Component    //此註解千萬千萬不要忘記,它的主要作用就是將這個監聽器納入到Spring容器中進行管理
 public class WebSocket {
   //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。
   private static int onlineCount = 0;
 
   //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
   private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
 
   //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料
   private Session session;
 
   /**
    * 連線建立成功呼叫的方法
    * <p>
    * config用來獲取WebsocketConfig中的配置資訊
    */
   @OnOpen
   public void onOpen(Session session, EndpointConfig config) {
 
     //獲取WebsocketConfig.java中配置的“sessionId”資訊值
     String httpSessionId = (String) config.getUserProperties().get("sessionId");
 
     this.session = session;
     webSocketSet.add(this);     //加入set中
     addOnlineCount();           //線上數加1
     System.out.println("有新連線加入!當前線上人數為" + getOnlineCount());
     try {
       sendMessage("Hello world");
     } catch (IOException e) {
       System.out.println("IO異常");
     }
   }
 
   /**
    * 連線關閉呼叫的方法
    */
   @OnClose
   public void onClose() {
     webSocketSet.remove(this);  //從set中刪除
     subOnlineCount();           //線上數減1
     System.out.println("有一連線關閉!當前線上人數為" + getOnlineCount());
   }
 
   /**
    * 收到客戶端訊息後呼叫的方法
    *
    * @param message 客戶端傳送過來的訊息
    */
   @OnMessage
   public void onMessage(String message, Session session) {
     System.out.println("來自客戶端的訊息:" + message);
 
     //群發訊息
     for (WebSocket item : webSocketSet) {
       try {
         item.sendMessage(message);
       } catch (IOException e) {
         e.printStackTrace();
       }
     }
   }
 
   /**
    * 發生錯誤時呼叫
    */
   @OnError
   public void onError(Session session, Throwable error) {
     System.out.println("發生錯誤");
     error.printStackTrace();
   }
 
 
   public void sendMessage(String message) throws IOException {
     this.session.getBasicRemote().sendText(message);
     //this.session.getAsyncRemote().sendText(message);
   }
 
 
   /**
    * 群發自定義訊息
    */
   public static void sendInfo(String message) throws IOException {
     for (WebSocket item : webSocketSet) {
       try {
         item.sendMessage(message);
       } catch (IOException e) {
         continue;
       }
     }
   }
 
   public static synchronized int getOnlineCount() {
     return onlineCount;
   }
 
   public static synchronized void addOnlineCount() {
     WebSocket.onlineCount++;
   }
 
   public static synchronized void subOnlineCount() {
     WebSocket.onlineCount--;
   }
 }


注意,有上面配置後,如果配置獲取的資訊為null,需加入監聽實現類:

import org.springframework.stereotype.Component;

import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
 * 監聽器類:主要任務是用ServletRequest將我們的HttpSession攜帶過去
 */
@Component //此註解千萬千萬不要忘記,它的主要作用就是將這個監聽器納入到Spring容器中進行管理,相當於註冊監聽吧
public class RequestListener implements ServletRequestListener {
  @Override
  public void requestInitialized(ServletRequestEvent sre) {
    //將所有request請求都攜帶上httpSession
    HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession();
    System.out.println("將所有request請求都攜帶上httpSession " + httpSession.getId());
  }

  public RequestListener() {
  }

  @Override
  public void requestDestroyed(ServletRequestEvent arg0) {
  }
}

對應的前端頁面無需改變。

以上資訊類之於網路上多篇文章整理的資訊,參考文章:spring boot Websocket使用spring boot +WebSocket實現(後臺主動)訊息推送  ; Springboot-WebSocket初探-獲取HttpSession問題