1. 程式人生 > >springboot 整合websocket 以及解決tomcat叢集環境websocket共享問題

springboot 整合websocket 以及解決tomcat叢集環境websocket共享問題


解決方案:使用redis訊息釋出訂閱解決多個tomcat應用伺服器下,連線不共享問題;具體如下

@Configuration
public class WebSocketConfig {


    //TODEO如果用外接tomcat,要註釋掉以下程式碼,否則啟動專案會報錯,用springboot內建tomcat就得放開以下程式碼
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

以及實現 WebSocketServer   

​
private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。
    private static int onlineCount = 0;
    //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料
    private Session session;

    //接收userId
    private String userId = "";
    /**
     * 連線建立成功呼叫的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //線上數加1
        log.info("有新視窗開始監聽:" + userId + ",當前線上人數為" + getOnlineCount());
        this.userId = userId;
        try {
            sendMessage("連線成功");
        } catch (IOException e) {
            log.error("websocket IO異常");
        }
    }

    /**
     * 連線關閉呼叫的方法
     */
    @OnClose
    public void onClose() {


        boolean flag = webSocketSet.remove(this);  //從set中刪除
        if (flag) {
            subOnlineCount();           //線上數減1
            log.info("有一連線關閉!當前線上人數為" + getOnlineCount());
        }
    }

    /**
     * 收到客戶端訊息後呼叫的方法
     *
     * @param message 客戶端傳送過來的訊息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到來自視窗" + userId + "的資訊:" + message);
        //群發訊息
//        for (WebSocketServer item : webSocketSet) {
//            try {
//                item.sendMessage(message);
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
//        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("發生錯誤:" + error.getMessage());
//        error.printStackTrace();
    }

    /**
     * 實現伺服器主動推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 群發自定義訊息
     */
    public static boolean sendInfo(String message, @PathParam("userId") String userId) {
        log.info("推送訊息到視窗" + userId + ",推送內容:" + message);
        for (WebSocketServer item : webSocketSet) {
            try {
                //這裡可以設定只推送給這個userId的,為null則全部推送
//                if (userId == null) {
//                    item.sendMessage(message);
//                } else if (item.userId.equals(userId)) {
               if (item.userId.equals(userId)) {
                   item.sendMessage(message);
                   return true;
               }
            } catch (IOException e) {
                continue;
            }
        }
        return false;

    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

​


設定監聽類

@Configuration
public class RedisSubListenerConfig {
  //初始化監聽器
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(RedisKeyConstants.TOPIC_CHANNEL_SENDWEBSOCKET));
        return container;
    }
  //利用反射來建立監聽到訊息之後的執行方法
    @Bean
    MessageListenerAdapter listenerAdapter(OutpatientRedisDaoImpl redisReceiver) {
        return new MessageListenerAdapter(redisReceiver, "sendMessageByOpen");
    }

}

//實現監聽後執行的方法

@Component("outpatientRedisDao")
public class OutpatientRedisDaoImpl {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void sendMessageByOpen(String message) {
        logger.info("reids訂閱訊息收到的引數message:{}", message);

        message = message.substring(1, message.length()-1);
        String [] info = message.split(";");
        JSONObject content = JSONObject.fromObject(info[1].replaceAll("\\\\\"", "\""));
        WebSocketServer.sendInfo(content.toString(), info[0]);

    }
}