Spring boot整合websocket實現分散式websocketsession共享(一)--基於redis的釋出訂閱
阿新 • • 發佈:2018-12-29
本文主要是針對分散式場景下的使用websocket的一個解決方案。我們以下面的圖來說明下業務使用場景。
針對如圖的情況,很多人第一時間想到的是websocket的session共享,這是大多數的第一反應。很遺憾的是,websocketsession是不支援序列化操作,所以也就不可能存在redis中。那麼我們有什麼其他的方式解決呢。
我們知道在單節點中我們只需要吧websocketsession儲存在Map中就OK,每次傳送通知都從map中根據clientID獲取對應的websocket的session進行訊息通知。那麼我們是不是可以這樣,不管是哪個服務節點要傳送訊息,我都告訴其他的服務,當前要傳送到的客戶端和傳送的內容。然後各個服務節點判斷自己是否儲存了對應的clientID,然後將訊息推送給出去呢。
具體的實現邏輯,redis配置:
@Configuration public class RedisConfig { // 注入 RedisConnectionFactory @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public RedisTemplate<String, Object> functionDomainRedisTemplate() { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); initDomainRedisTemplate(redisTemplate, redisConnectionFactory); return redisTemplate; } /** * 設定資料存入 redis 的序列化方式 * @param redisTemplate * @param factory */ private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) { redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer()); redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); redisTemplate.setConnectionFactory(factory); } /** * 例項化 HashOperations 物件,可以使用 Hash 型別操作 * @param redisTemplate * @return */ @Bean public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForHash(); } /** * 例項化 ValueOperations 物件,可以使用 String 操作 * @param redisTemplate * @return */ @Bean public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForValue(); } /** * 例項化 ListOperations 物件,可以使用 List 操作 * @param redisTemplate * @return */ @Bean public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForList(); } /** * 例項化 SetOperations 物件,可以使用 Set 操作 * @param redisTemplate * @return */ @Bean public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForSet(); } /** * 例項化 ZSetOperations 物件,可以使用 ZSet 操作 * @param redisTemplate * @return */ @Bean public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) { return redisTemplate.opsForZSet(); } }
redis釋出訂閱配置:
@Configuration public class RedisPublishConfig { @Autowired private StaticProperties staticProperties; /** * redis訊息監聽器容器 可以新增多個監聽不同話題的redis監聽器,只需要把訊息監聽器和相應的訊息訂閱處理器繫結,該訊息監聽器 * 通過反射技術呼叫訊息訂閱處理器的相關方法進行一些業務處理 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean // 相當於xml中的bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 訂閱了一個叫chat 的通道 container.addMessageListener(listenerAdapter, new PatternTopic(staticProperties.getWS_CHANNEL())); // 這個container 可以新增多個 messageListener return container; } /** * 訊息監聽器介面卡,繫結訊息處理器,利用反射技術呼叫訊息處理器的業務方法 * * @param receiver * @return */ @Bean MessageListenerAdapter listenerAdapter(RedisMsg receiver) { // 這個地方 是給messageListenerAdapter 傳入一個訊息接受的處理器,利用反射的方法呼叫“receiveMessage” // 也有好幾個過載方法,這邊預設呼叫處理器的方法 叫handleMessage 可以自己到原始碼裡面看 return new MessageListenerAdapter(receiver, "receiveMessage"); } }
定義接受資訊的介面
@Component
public interface RedisMsg {
/**
* 接受資訊
* @param message
*/
public void receiveMessage(String message);
}
websocket配置
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//handler是webSocket的核心,配置入口
registry.addHandler(new CTIHandler(), "/webscoket/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor());
}
}
interceptor配置
public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
/**
* 配置日誌
*/
private final static Logger logger = LoggerFactory.getLogger(WebSocketInterceptor.class);
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse seHttpResponse,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// HttpServletRequest request = ((ServletServerHttpRequest) serverHttpRequest).getServletRequest();
String userName = serverHttpRequest.getURI().toString().split("ID=")[1];
attributes.put("userName", userName);
logger.info("握手之前");
//從request裡面獲取物件,存放attributes
return super.beforeHandshake(serverHttpRequest, seHttpResponse, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception ex) {
logger.info("握手之後");
super.afterHandshake(request, response, wsHandler, ex);
}
}
配置websocket的handler,並配置為redis的接受訊息的實現類:
@Service
public class CTIHandler implements WebSocketHandler ,RedisMsg{
/**
* 配置日誌
*/
private final static Logger logger = LoggerFactory.getLogger(CTIHandler.class);
/**
* concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
*/
private static Map<String,WebSocketSession> socketMap = new HashMap<String, WebSocketSession>();
//新增socket
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.info("成功建立連線");
//獲取使用者資訊
String userName = (String) session.getAttributes().get("userName");
logger.info("獲取當前"+socketMap.get(userName));
if(socketMap.get(userName)==null) {
socketMap.put(userName,session);
sendMessageToUser(userName, new TextMessage("連結建立成功"));
//並且通過redis釋出和訂閱廣播給其他的的機器,或者通過訊息佇列
}
logger.info("連結成功");
}
//接收socket資訊
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
logger.info("收到資訊"+webSocketMessage.toString());
String userName = (String) webSocketSession.getAttributes().get("userName");
webSocketSession.sendMessage(new TextMessage("aaa"));
sendMessageToUser(userName, new TextMessage("我收到你的資訊了"));
}
/**
* 傳送資訊給指定使用者
* @param clientId
* @param message
* @return
*/
public boolean sendMessageToUser(String clientId, TextMessage message) {
WebSocketSession session = socketMap.get(clientId);
if(session==null) {
return false;
}
logger.info("進入傳送訊息");
if (!session.isOpen()) {
return false;
}
try {
logger.info("正在傳送訊息");
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
session.close();
}
logger.info("連接出錯");
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
//獲取使用者資訊
String userName = (String) session.getAttributes().get("userName");
if(socketMap.get(userName)!=null) {
socketMap.remove(userName);
//並且通過redis釋出和訂閱廣播給其他的的機器,或者通過訊息佇列
}
logger.info("連線已關閉:" + status);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 接受訂閱資訊
*/
@Override
public void receiveMessage(String message) {
// TODO Auto-generated method stub
JSONObject sendMsg = JSONObject.fromObject(message.substring(message.indexOf("{")));
String clientId = sendMsg.getString("userName");
TextMessage receiveMessage = new TextMessage(sendMsg.getString("message"));
boolean flag = sendMessageToUser(clientId, receiveMessage);
if(flag) {
logger.info("我傳送訊息成功了!");
}
}
}
配置檔案配置
spring:
application:
name: crm-cti
#redis配置
redis:
host: 47.95.250.218
password: zhudaxian;.,68NB
port: 6379
database: 0
POM檔案配置
<!-- websocket支援 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- redis的支援 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
已上就是大體的環境配置,業務介面傳送訊息
@Override
public String tjJTPush(String cno) {
// TODO Auto-generated method stub
JSONObject result = new JSONObject();
if(null==Pub_Tools.getString_TrimZeroLenAsNull(cno)) {
result.put("result", "error");
}else {
try {
Map<String, Object> userData = crmCallBindDao.findSalesMessage(cno);
//單節點實現方式,如果是單節點建議使用該方式,如果是分散式部署廢棄該方式
Boolean flag = ctiHanler.sendMessageToUser(userData.get("userName").toString(), new TextMessage("hangup"));
if(!flag) {//傳送失敗廣播出去,讓其他節點發送
//廣播訊息到各個訂閱者
JSONObject message = new JSONObject();
message.put("userName", userData.get("userName"));
message.put("message", "connect");
redisTemplate.convertAndSend(staticProperties.getWS_CHANNEL(),message.toString());
}
} catch (Exception e) {
e.printStackTrace();
logger.error("推送給客戶端失敗");
}
result.put("result", "success");
}
return result.toString();
}
大家如果有什麼不明白的可以留言