1. 程式人生 > >Spring boot整合websocket實現分散式websocketsession共享(一)--基於redis的釋出訂閱

Spring boot整合websocket實現分散式websocketsession共享(一)--基於redis的釋出訂閱

本文主要是針對分散式場景下的使用websocket的一個解決方案。我們以下面的圖來說明下業務使用場景。

  針對如圖的情況,很多人第一時間想到的是websocket的session共享,這是大多數的第一反應。很遺憾的是,websocketsession是不支援序列化操作,所以也就不可能存在redis中。那麼我們有什麼其他的方式解決呢。

  我們知道在單節點中我們只需要吧websocketsession儲存在Map中就OK,每次傳送通知都從map中根據clientID獲取對應的websocket的session進行訊息通知。那麼我們是不是可以這樣,不管是哪個服務節點要傳送訊息,我都告訴其他的服務,當前要傳送到的客戶端和傳送的內容。然後各個服務節點判斷自己是否儲存了對應的clientID,然後將訊息推送給出去呢。

 具體的實現邏輯,redis配置:

@Configuration
public class RedisConfig {
	  // 注入 RedisConnectionFactory
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisTemplate<String, Object> functionDomainRedisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
        return redisTemplate;
    }

    /**
     * 設定資料存入 redis 的序列化方式
     * @param redisTemplate
     * @param factory
     */
    private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setConnectionFactory(factory);
    }
    
     /**
     * 例項化 HashOperations 物件,可以使用 Hash 型別操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    /**
     * 例項化 ValueOperations 物件,可以使用 String 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    /**
     * 例項化 ListOperations 物件,可以使用 List 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    /**
     * 例項化 SetOperations 物件,可以使用 Set 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }
    
    /**
     * 例項化 ZSetOperations 物件,可以使用 ZSet 操作
     * @param redisTemplate
     * @return
     */
    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }
}

redis釋出訂閱配置:

@Configuration
public class RedisPublishConfig {
	@Autowired
	private StaticProperties staticProperties;
	/**
	 * redis訊息監聽器容器 可以新增多個監聽不同話題的redis監聽器,只需要把訊息監聽器和相應的訊息訂閱處理器繫結,該訊息監聽器
	 * 通過反射技術呼叫訊息訂閱處理器的相關方法進行一些業務處理
	 * 
	 * @param connectionFactory
	 * @param listenerAdapter
	 * @return
	 */
	@Bean // 相當於xml中的bean
	RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
			MessageListenerAdapter listenerAdapter) {
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		// 訂閱了一個叫chat 的通道
		container.addMessageListener(listenerAdapter, new PatternTopic(staticProperties.getWS_CHANNEL()));
		// 這個container 可以新增多個 messageListener
		return container;
	}

	/**
	 * 訊息監聽器介面卡,繫結訊息處理器,利用反射技術呼叫訊息處理器的業務方法
	 * 
	 * @param receiver
	 * @return
	 */
	@Bean
	MessageListenerAdapter listenerAdapter(RedisMsg receiver) {
		// 這個地方 是給messageListenerAdapter 傳入一個訊息接受的處理器,利用反射的方法呼叫“receiveMessage”
		// 也有好幾個過載方法,這邊預設呼叫處理器的方法 叫handleMessage 可以自己到原始碼裡面看
		return new MessageListenerAdapter(receiver, "receiveMessage");
	}

}

 定義接受資訊的介面

@Component
public interface RedisMsg {
	/**
	 * 接受資訊
	 * @param message
	 */
	public void receiveMessage(String message);
}

websocket配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
 
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        
        //handler是webSocket的核心,配置入口
        registry.addHandler(new CTIHandler(), "/webscoket/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor());
        
    }

}

interceptor配置

public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
	/**
	 * 配置日誌
	 */
	private final static Logger logger = LoggerFactory.getLogger(WebSocketInterceptor.class);

	@Override
	public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse seHttpResponse,
			WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//		HttpServletRequest request = ((ServletServerHttpRequest) serverHttpRequest).getServletRequest();
		String userName = serverHttpRequest.getURI().toString().split("ID=")[1];
		attributes.put("userName", userName);
		logger.info("握手之前");
		//從request裡面獲取物件,存放attributes
		return super.beforeHandshake(serverHttpRequest, seHttpResponse, wsHandler, attributes);
	}

	@Override
	public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
			Exception ex) {
		logger.info("握手之後");
		super.afterHandshake(request, response, wsHandler, ex);
	}
}

配置websocket的handler,並配置為redis的接受訊息的實現類:

@Service
public class CTIHandler implements WebSocketHandler ,RedisMsg{
	/**
	 * 配置日誌
	 */
	private final static Logger logger = LoggerFactory.getLogger(CTIHandler.class);
	/**
	 * concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
	 */
    private static Map<String,WebSocketSession> socketMap = new HashMap<String, WebSocketSession>();
    //新增socket
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    	logger.info("成功建立連線");
    	//獲取使用者資訊
    	String userName = (String) session.getAttributes().get("userName");
    	logger.info("獲取當前"+socketMap.get(userName));
    	if(socketMap.get(userName)==null) {
    		socketMap.put(userName,session);
    		sendMessageToUser(userName, new TextMessage("連結建立成功"));
    		//並且通過redis釋出和訂閱廣播給其他的的機器,或者通過訊息佇列
    	}
    	logger.info("連結成功");
    }

    //接收socket資訊
    @Override
    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
    	logger.info("收到資訊"+webSocketMessage.toString());
    	String userName = (String) webSocketSession.getAttributes().get("userName");
    	webSocketSession.sendMessage(new TextMessage("aaa"));
    	sendMessageToUser(userName, new TextMessage("我收到你的資訊了"));
    }

    /**
     * 傳送資訊給指定使用者
     * @param clientId
     * @param message
     * @return
     */
    public boolean sendMessageToUser(String clientId, TextMessage message) {
    	WebSocketSession session = socketMap.get(clientId);
    	if(session==null) {
    		return false;
    	}
    	logger.info("進入傳送訊息");
    	if (!session.isOpen()) {
    		return false;
        }
    	try {
    		logger.info("正在傳送訊息");
			session.sendMessage(message);
		} catch (IOException e) {
			e.printStackTrace();
		}
        return true;
    }


    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            session.close();
        }
        logger.info("連接出錯");
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    	//獲取使用者資訊
    	String userName = (String) session.getAttributes().get("userName");
    	if(socketMap.get(userName)!=null) {
    		socketMap.remove(userName);
    		//並且通過redis釋出和訂閱廣播給其他的的機器,或者通過訊息佇列
    	}
        logger.info("連線已關閉:" + status);
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    /**
     * 接受訂閱資訊
     */
	@Override
	public void receiveMessage(String message) {
		// TODO Auto-generated method stub
		JSONObject sendMsg = JSONObject.fromObject(message.substring(message.indexOf("{")));
		String clientId = sendMsg.getString("userName");
		TextMessage receiveMessage = new TextMessage(sendMsg.getString("message"));
		boolean flag = sendMessageToUser(clientId, receiveMessage);
		if(flag) {
			logger.info("我傳送訊息成功了!");
		}
	}


}

配置檔案配置

spring:
  application:
    name: crm-cti
  #redis配置
  redis:
    host: 47.95.250.218
    password: zhudaxian;.,68NB
    port: 6379
    database: 0

POM檔案配置 

 <!-- websocket支援 -->
	 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
     </dependency>
     <!-- redis的支援 -->
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>

已上就是大體的環境配置,業務介面傳送訊息

	@Override
	public String tjJTPush(String cno) {
		// TODO Auto-generated method stub
		JSONObject result = new JSONObject();
		if(null==Pub_Tools.getString_TrimZeroLenAsNull(cno)) {
			result.put("result", "error");
		}else {
			try {
				Map<String, Object> userData = crmCallBindDao.findSalesMessage(cno);
				//單節點實現方式,如果是單節點建議使用該方式,如果是分散式部署廢棄該方式
			    Boolean flag = ctiHanler.sendMessageToUser(userData.get("userName").toString(), new TextMessage("hangup"));
			    if(!flag) {//傳送失敗廣播出去,讓其他節點發送
					//廣播訊息到各個訂閱者
					JSONObject message = new JSONObject();
					message.put("userName", userData.get("userName"));
					message.put("message", "connect");
					redisTemplate.convertAndSend(staticProperties.getWS_CHANNEL(),message.toString());
			    }
			} catch (Exception e) {
				e.printStackTrace();
				logger.error("推送給客戶端失敗");
			}
			result.put("result", "success");
		}
		return result.toString();
	}

大家如果有什麼不明白的可以留言