1. 程式人生 > >SpringBoot使用WebSocket實現服務端推送--叢集實現(2)

SpringBoot使用WebSocket實現服務端推送--叢集實現(2)

書接上文,本文介紹了一種實現叢集管理和訊息傳送方式。

在叢集模式情況下,一般是Nginx反向代理到多臺Tomcat或者SLB代理到多臺Tomcat的方式,怎麼實現給某個人推送訊息?比如WebSocket1連線到Tomcat1,但是在Tomcat2需要給WebSocket1傳送訊息,怎麼辦?一般的想法是像httpsession的叢集處理方式一樣,利用一箇中間件Redis來儲存session即可。但是實際測試才發現,根本不可取,因為WebSocket的session是有狀態的,並且無法序列化,在往redis中儲存的時候就拋異常了。通過查詢資料,發現可以通過Redis的釋出訂閱模式來實現。其基本原理是:初始化的時候都訂閱某個頻道,Tomcat只管理連線到我的WebSocket的session,需要給某人釋出訊息的時候通過Redis釋出一個訊息,所有訂閱了該頻道的Tomcat都能接收到該訊息,根據此訊息找WebSocket,能找到就傳送訊息,不能找到忽略即可。

具體實現:

1.首先實現WebSocketManager,因為連線到本機的WebSocket還是本機管理,所以繼承於MemWebSocketManager實現,只需要實現傳送訊息相關方法。

/**
 * WebSocket的session無法序列化,所以session還是儲存在本地記憶體中,傳送訊息這種就走訂閱釋出模式
 * 1.redis或者mq進行釋出訂閱,廣播->有某個節點能找到此人就傳送訊息,其他的忽略
 * 2.Nginx進行IP hash 可以使用{@link MemWebSocketManager}
 * @author xiongshiyan at 2018/10/10 , contact me with email 
[email protected]
or phone 15208384257 */ public class RedisWebSocketManager extends MemWebSocketManager { public static final String CHANNEL = "websocket"; private static final String COUNT_KEY = "CountKey"; private StringRedisTemplate stringRedisTemplate; public RedisWebSocketManager(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } @Override public void put(String identifier, WebSocket webSocket) { super.put(identifier, webSocket); //線上數量加1 countChange(1); } @Override public void remove(String identifier) { super.remove(identifier); //線上數量減1 countChange(-1); } @Override public int size() { return getCount(); } @Override public void sendMessage(String identifier, String message) { WebSocket webSocket = get(identifier); //本地能找到就直接發 if(null != webSocket && WebSocket.STATUS_AVAILABLE == webSocket.getStatus()){ WebSocketUtil.sendMessage(webSocket.getSession() , message); return; } Map<String , Object> map = new HashMap<>(3); map.put(RedisReceiver.ACTION , ActionFactory.ACTION_SEND_MESSAGE); map.put(RedisReceiver.IDENTIFIER , identifier); map.put("message" , message); //在websocket頻道上釋出傳送訊息的訊息 stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map)); } @Override public void broadcast(String message) { Map<String , Object> map = new HashMap<>(2); map.put(RedisReceiver.ACTION , ActionFactory.ACTION_BROADCAST_MESSAGE); map.put("message" , message); //在websocket頻道上釋出廣播的訊息 stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map)); } @Override public void changeStatus(String identifier, int status) { WebSocket webSocket = get(identifier); if(null != webSocket){ webSocket.setStatus(status); return; } Map<String , Object> map = new HashMap<>(3); map.put(RedisReceiver.ACTION , ActionFactory.ACTION_CHANGE_STATUS); map.put(RedisReceiver.IDENTIFIER , identifier); map.put("status" , status); //在websocket頻道上釋出改變狀態的訊息 stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map)); } /** * 增減線上數量 */ private void countChange(int delta){ ValueOperations<String, String> value = stringRedisTemplate.opsForValue(); //獲取線上當前數量 int count = getCount(value); count = count + delta; count = count > 0 ? count : 0; //設定新的數量 value.set(COUNT_KEY , "" + count); } /** * 獲取當前線上數量 */ private int getCount(){ ValueOperations<String, String> value = stringRedisTemplate.opsForValue(); return getCount(value); } private int getCount(ValueOperations<String, String> value) { String countStr = value.get(COUNT_KEY); int count = 0; if(null != countStr){ count = Integer.parseInt(countStr); } return count; } }

該類中,重寫了所有需要操作某個session的方法,在這些方法中指定不同的操作Action及帶上相應的資料。

2.訂閱者收到改訊息及能做出不同的動作,訂閱者持有WebSocketManager,可以操作相關的session。

/**
 * redis訊息訂閱者
 * @author xiongshiyan
 */
public class RedisReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisReceiver.class);
    public static final String IDENTIFIER = "identifier";
    public static final String ACTION     = "action";

    private CountDownLatch latch;
    private WebSocketManager webSocketManager;

    public RedisReceiver(WebSocketManager webSocketManager, CountDownLatch latch) {
        this.webSocketManager = webSocketManager;
        this.latch = latch;
    }

    /**
     * 此方法會被反射呼叫
     */
    public void receiveMessage(String message) {
        LOGGER.info(message);

        JSONObject object = new JSONObject(message);
        if(!object.containsKey(ACTION)){
            return;
        }
        String actionString = object.getString(ACTION);
        Action action = ActionFactory.create(actionString);
        action.doMessage(this.webSocketManager , object);

        //接收到訊息要做的事情
        latch.countDown();
    }
}

訊息訂閱者接收到訊息之後,根據Action得到操作類,實現不同的操作,如果以後有更多的功能,那麼新增相應的Action類即可。

/**
 * @author xiongshiyan at 2018/10/12 , contact me with email [email protected] or phone 15208384257
 */
public class ActionFactory {
    public static final String ACTION_SEND_MESSAGE       = "sendMessage";
    public static final String ACTION_CHANGE_STATUS      = "changeStatus";
    public static final String ACTION_BROADCAST_MESSAGE  = "broadcast";
    public static Action create(String action){
        if(ACTION_SEND_MESSAGE.equalsIgnoreCase(action)){
            return new SendMessageAction();
        }else if(ACTION_CHANGE_STATUS.equalsIgnoreCase(action)){
            return new ChangeStatusAction();
        }else if(ACTION_BROADCAST_MESSAGE.equalsIgnoreCase(action)){
            return new BroadCastAction();
        }else {
            return new NoActionAction();
        }
    }
}
/**
 * {
 *     "action":"sendMessage",
 *     "identifier":"xxx",
 *     "message":"xxxxxxxxxxx"
 * }
 * 給webSocket傳送訊息的action
 * @author xiongshiyan at 2018/10/12 , contact me with email [email protected] or phone 15208384257
 */
public class SendMessageAction implements Action{
    private static final String MESSAGE = "message";
    @Override
    public void doMessage(WebSocketManager manager, JSONObject object) {
        if(!object.containsKey(RedisReceiver.IDENTIFIER)){
            return;
        }
        if(!object.containsKey(MESSAGE)){
            return;
        }

        String identifier = object.getString(RedisReceiver.IDENTIFIER);

        WebSocket webSocket = manager.get(identifier);
        if(null == webSocket || WebSocket.STATUS_AVAILABLE != webSocket.getStatus()){
            return;
        }
        WebSocketUtil.sendMessage(webSocket.getSession() , object.getString(MESSAGE));
    }
}
/**
 * {
 *     "action":"changeStatus",
 *     "identifier":"xxx",
 *     "status":1
 * }
 * 改變狀態的action
 * @author xiongshiyan at 2018/10/12 , contact me with email [email protected] or phone 15208384257
 */
public class ChangeStatusAction implements Action{
    private static final String STATUS = "status";
    @Override
    public void doMessage(WebSocketManager manager , JSONObject object) {
        if(!object.containsKey(RedisReceiver.IDENTIFIER)){
            return;
        }
        if(!object.containsKey(STATUS)){
            return;
        }
        WebSocket webSocket = manager.get(object.getString(RedisReceiver.IDENTIFIER));
        if(null == webSocket){
            return;
        }
        webSocket.setStatus(object.getInteger(STATUS));
    }
}
/**
 * {
 *     "action":"broadcast",
 *     "message":"xxxxxxxxxxxxx"
 * }
 * 廣播給所有的websocket傳送訊息 action
 * @author xiongshiyan at 2018/10/12 , contact me with email [email protected] or phone 15208384257
 */
public class BroadCastAction implements Action{
    @Override
    public void doMessage(WebSocketManager manager, JSONObject object) {
        String message = object.getString("message");
        //從本地取出所有的websocket傳送訊息
        manager.localWebSocketMap().values().forEach(
                webSocket -> WebSocketUtil.sendMessage(
                        webSocket.getSession() , message));
    }
}
/**
 * do nothing action
 * @author xiongshiyan at 2018/10/12 , contact me with email [email protected] or phone 15208384257
 */
public class NoActionAction implements Action{
    @Override
    public void doMessage(WebSocketManager manager, JSONObject object) {
        // do no thing
    }
}

3.配置WebSocketManager和訊息訂閱。

@Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
    /**
     * 使用redis管理,具備叢集功能
     */
    @Bean(WebSocketManager.WEBSOCKET_MANAGER_NAME)
    public RedisWebSocketManager webSocketManager(@Autowired StringRedisTemplate stringRedisTemplate){
        return new RedisWebSocketManager(stringRedisTemplate);
    }
**
 * @author xiongshiyan
 * redis管理websocket配置
 */
@Configuration
@ConditionalOnBean(RedisWebSocketManager.class)
public class RedisWebSocketConfig {
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(RedisWebSocketManager.CHANNEL));

        return container;
    }

    @Bean
    public RedisReceiver receiver(
            @[email protected]("webSocketManager") WebSocketManager webSocketManager,
            @[email protected]("latch") CountDownLatch latch) {
        return new RedisReceiver(webSocketManager , latch);
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    public CountDownLatch latch() {
        return new CountDownLatch(1);
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

至此,實現了WebSocket的叢集管理。

 

專案參見 https://gitee.com/xxssyyyyssxx/websocket-springboot-starter 目前支援多ServerEndPoint和多WebSocketManager,一般情況下他們的關係是一對一的,便於管理。

使用demo見 https://gitee.com/xxssyyyyssxx/websocket-demo