1. 程式人生 > >spring boot中使用websocket實現點對點通訊與伺服器推送

spring boot中使用websocket實現點對點通訊與伺服器推送

WebSocket介紹

   websocket是html中一種新的協議,它實現了真正的長連線,實現了瀏覽器與伺服器的全雙工通訊(指在通訊的任意時刻,線路上存在A到B和B到A的雙向訊號傳輸)。 現在我們接觸的協議大多是htttp協議,在瀏覽器中通過http協議實現了單向的通訊,瀏覽器發出請求,伺服器在響應,一次客戶端與伺服器的請求就結束了,伺服器不能主動響應客戶端,主動往客戶端返回資料,而在某些需求上要實時重新整理資料,獲取伺服器上的最新資料,顯示給客戶端。為了實現這樣的需求,大多數公司使用了輪詢的技術。輪詢技術,在特定的時間間隔(如1秒)由瀏覽器發出http request,伺服器再將最新資料返回給瀏覽器,實現了資料的實時重新整理,很明顯,通過這種技術實現的偽長連線,存在著一些缺陷,每隔一段時間的http request,不見得每一次的請求都是有意義的,因為客戶端不會知道伺服器上的資料有沒有更新,這樣在多次請求當中肯定會存在無效的請求(上一次請求回來的資料跟本次的完全一樣)。 可見輪詢這種技術,存在很大的弊端,而websocket實現了真正的長連線,伺服器可以主動向客戶端傳送資料,正是這樣的特點,就能很好的實現這種需求,當伺服器有資料變化時,伺服器就可以將新的資料返回給客戶端,沒有無效的請求回覆

環境配置和工具

STS     ,JDK1.8,     Spring boot 2.0.4

新建spring boot專案,在pom.xml檔案中加入WebSocket依賴

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>
        <!-- websocket依賴 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

WebSocket配置類

**
 * 使用@ServerEndpoint創立websocket endpoint
 * 首先要注入 ServerEndpointExporter,
 * 這個bean會自動註冊使用了 @ServerEndpoint 注
 * 解宣告的 Web Socket endpoint。
 * 要注意,如果使用獨立的 Servlet 容器,
 * 而不是直接使用 Spring Boot 的內建容器,
 * 就不要注入 ServerEndpointExporter,
 * 因為它將由容器自己提供和管理
 */
@Configuration
public class WebSocketConfig{
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

注意!!!這裡要說明一下,由於我們是點對點通訊,不同於廣播式通訊的是,必須要區分不同的客戶端,那麼我們怎麼來讓服務端區分出不同的客戶端呢?

經過查詢資料和試驗,我找到了兩種可行獲取客戶端userId的方法。

一種是通過在Server取HttpSession中的值獲取當前使用者

一種是直接在客戶端建立連線時附帶上使用者的值。

 

先說第一種,新建一個MyEndpointConfigure類,程式碼如下,

/**
 * 
 * @author lipengbin
 *
 */
public class MyEndpointConfigure extends ServerEndpointRegistration.Configurator implements ApplicationContextAware
{
    private static volatile BeanFactory context;

    @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException
    {
         return context.getBean(clazz);
    }
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
    {
        System.out.println("auto load"+this.hashCode());
        MyEndpointConfigure.context = applicationContext;
    }
}

然後在配置類裡面新增如下程式碼,用來向spring註冊服務

@Bean
    public MyEndpointConfigure newConfigure()
    {
        return new MyEndpointConfigure();
    }

最後我們編寫WebSocket服務端的類

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
 
/**
 * Created by jack on 2017/10/25.
 */
/**
 * websocket的具體實現類
 */
@ServerEndpoint(value = "/websocket",configurator = MyEndpointConfigure.class)
@Component
public class WebSocketServer {
	
    //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。
    private static int onlineCount = 0;
    //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料
    private Session session;
    String str="";	
    /**
     * 連線建立成功呼叫的方法
     */
    @OnOpen
    public void onOpen(Session session,EndpointConfig config) {
        this.session = session;
        HttpSession httpSession= (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //線上數加1
        try {
            //sendMessage(CommonConstant.CURRENT_WANGING_NUMBER.toString());
            sendMessage("服務端連線成功");
        } catch (IOException e) {
            System.out.println("IO異常");
        }
    }
 
    /**
     * 連線關閉呼叫的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();   
       
        System.out.println("有一連線關閉!當前線上人數為" + getOnlineCount());
      
    }
 
    /**
     * 收到客戶端訊息後呼叫的方法
     *
     * @param message 客戶端傳送過來的訊息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("來自客戶端的訊息:" + message);
        str=message;
//        System.out.println("onMessage sessionId is : "+session.getId());
        //群發訊息
//        for (WebSocketServer item : webSocketSet) {
//            try {                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
//                item.sendMessage(message);
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
//        }
    }
 
    /**
     * 發生錯誤時呼叫
     */
    @OnError
    public void onError(Session session, Throwable error) {
    	for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage("響應超時");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        error.printStackTrace();
    }
 
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText("服務端訊息:"+message);
        //this.session.getAsyncRemote().sendText(message);
    }
 
    /**
     * 群發自定義訊息
     */
    public static void sendInfo(String message) throws IOException {
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }
 
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
 
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }
 
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

注意這一行

@ServerEndpoint(value = "/websocket",configurator = MyEndpointConfigure.class)

加上了它。我們就可以用

@OnOpen
        public void onOpen(Session session, EndpointConfig config){
            HttpSession httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
            }

來獲取httpSession物件了,然後直接取出登入時儲存的使用者物件就可以了。

但是!!!這麼做有問題啊

原則上來講我在server獲取的httpsession中取出來的使用者物件就是現在正和服務端建立連線的物件,因為這種情況的操作肯定是先登入,然後直接建立連線,可是在實際中多使用者同時登入時就不一定是這樣子了。因為登入是客戶端發起的操作,建立連線也是客戶端發起的操作,且不說在客戶端這兩個操作是否是緊密相連,就算是緊密相連,從伺服器驗證成功(此時已經放入currentUser物件)返回登入結果給客戶端到客戶端向服務端發起連線這中間因為網路原因也是會消耗一定時間的。那麼這時候一件尷尬的事情就發生了:此時,另一個使用者也在登入,並且在之前使用者兩個操作期間完成了登入驗證操作,那麼第一個使用者連線建立之後取出的use物件就不是這個使用者的而是第二個使用者的,這就亂套了。這種方法相當於是 ,使用者A先對伺服器說,記住了,我叫A,然後過了一會兒來說,我要建立連線,我是剛剛告訴你名字那個人。那如果B在A離開那會兒也告訴了伺服器我叫B,那麼伺服器就會把A當成B了。

但是,哎,不慌!我們還有planB呢

服務端可以用@PathParam獲取使用者物件,如此這般,使用者在建立WebSocket連線的時候告訴伺服器自己的使用者id,這樣伺服器就肯定不會把使用者搞錯了。

    服務端註解的地方改成這樣寫

@ServerEndpoint(value = "/websocket/{userId}")

    建立連線的方法引數

 @OnOpen
    //public void onOpen(Session session,EndpointConfig config) {
    public void onOpen(@PathParam("userId")String userId,Session session){

        this.session = session;
        String[] userArray = userId.split(",");
        this.userid = userArray[0];
        webSocketMap.put(userid, this);
        
        addOnlineCount();           //線上數加1
       
		send("服務端連線成功", this.userid);
		System.out.println("服務端連線成功");
    }

    客戶端建立連線時的請求如下

ws = "ws://localhost:8080"  + "/websocket"+"/${userId}";

    userId就是你的使用者id,可以從session中獲取。使用這種方法獲取使用者id,也就不用再配置類裡新增下面這段程式碼了

@Bean
    public MyEndpointConfigure newConfigure()
    {
        return new MyEndpointConfigure();
    }

以上,實現了一個websocket作為服務端,html頁面作為客戶端的一個websocket連線的例子,下面介紹java作為客戶端的例子。

spring boot專案引入依賴

<dependency>
	 <groupId>org.java-websocket</groupId>
	 <artifactId>Java-WebSocket</artifactId>
	 <version>1.3.4</version>
	 <scope>test</scope>
</dependency>

建立一個測試類

import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
 

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
 
/**
 * Created by jack on 2018.9.25
 */
public class WebsocketClient {
    public static WebSocketClient client;
    public static void main(String[] args) throws InterruptedException {
        try {
            client = new WebSocketClient(new URI("ws://localhost:8081/websocket"),new Draft_6455()) {
        	//client = new WebSocketClient(new URI("ws://192.168.87.59:80/websocket"),new Draft_6455()) {
                @Override
                public void onOpen(ServerHandshake serverHandshake) {
                    System.out.println("開啟連結");
                }
 
                @Override
                public void onMessage(String s) {
                    System.out.println("收到訊息"+s);
                }
 
                @Override
                public void onClose(int i, String s, boolean b) {
                    System.out.println("連結已關閉");
                }
 
                @Override
                public void onError(Exception e) {
                    e.printStackTrace();
                    System.out.println("發生錯誤已關閉");
                }
            };
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
 
        client.connect();
        System.out.println(client.getDraft());
       while(!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)){
        System.out.println("還沒有開啟");
        Thread.sleep(1000);
       }
        
        try {
        	for(int i=0;i<1000;i++){
        		String str = "打開了"+i;
        		System.out.println(str);
        		send("hello world".getBytes("utf-8"));
        		Thread.sleep(3000);
        		client.send(str);
        	}
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        
       
       }
    
 
 
    public static void send(byte[] bytes){
        client.send(bytes);
    }
 
}

這裡我還是用的第一種獲取session的方法,改成第二種只需要在建立連線的時候後面加上Id即可。

執行結果如下

 

參考:https://blog.csdn.net/wonderful_life_mrchi/article/details/52535463

           https://blog.csdn.net/qq_33171970/article/details/55001587

https://blog.csdn.net/j903829182/article/details/78342941