springboot+rabbitmq+websocket廣播模式進行訊息實時推送
如何安裝rabbitmq在此就不再贅述了,直接上程式碼,使用的direct佇列模式。
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
生產者
application.properties
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#開啟訊息傳送確認
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
#返回無法插入佇列的訊息
spring.rabbitmq.template.mandatory=true
準備訊息佇列與路由key
public class RabbitConstant { //交換機名稱 public final static String EXCHANGE = "exchange_test"; //佇列 public final static String QUEUE_TRANSACTION = "queue_transaction"; public final static String QUEUE_CONTRACT = "queue_contract"; public final static String QUEUE_QUALIFICATION = "queue_qualification"; //路由key public final static String RK_TRANSACTION = "transaction"; public final static String RK_CONTRACT = "contract"; public final static String RK_QUALIFICATION = "qualification"; }
配置
@Configuration
public class RabbitMqConfig {
/**
* 宣告佇列
*
* @return
*/
@Bean
public Queue queueTransaction() {
// true表示持久化該佇列
return new Queue(RabbitConstant.QUEUE_TRANSACTION, true);
}
@Bean
public Queue queueContract() {
// true表示持久化該佇列
return new Queue(RabbitConstant.QUEUE_CONTRACT, true);
}
@Bean
public Queue queueQualification() {
// true表示持久化該佇列
return new Queue(RabbitConstant.QUEUE_QUALIFICATION, true);
}
/**
* 宣告互動器
*
* @return
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange(RabbitConstant.EXCHANGE);
}
/**
* 繫結
*
* @return
*/
@Bean
public Binding bindingTransaction() {
return BindingBuilder.bind(queueTransaction()).to(directExchange()).with(RabbitConstant.RK_TRANSACTION);
}
/**
* 繫結
*
* @return
*/
@Bean
public Binding bindingContract() {
return BindingBuilder.bind(queueContract()).to(directExchange()).with(RabbitConstant.RK_CONTRACT);
}
/**
* 繫結
*
* @return
*/
@Bean
public Binding bindingQualification() {
return BindingBuilder.bind(queueQualification()).to(directExchange()).with(RabbitConstant.RK_QUALIFICATION);
}
}
釋出者實現介面
@Component
@Slf4j
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
//訊息傳送確認回撥方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("訊息傳送成功:" + correlationData);
}
//訊息傳送失敗回撥方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("訊息傳送失敗:" + new String(message.getBody()));
}
/**
* 傳送訊息,不需要實現任何介面,供外部呼叫
*
* @param messageVo
*/
public void send(MessageVo messageVo) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE, messageVo.getRoutingKey(), JSONObject.toJSON(messageVo).toString(), correlationId);
}
}
Ok,只要呼叫send方法就可一把訊息傳送到指定的佇列中了,傳送後會呼叫confirm或者returnedMessage方法來返回結果,
下面開始講消費者。
消費者
application.properties
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
消費者配置
import com.rabbitmq.client.Channel;
import com.yxd.rabbitmq.web.websocket.WebSocketServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import java.io.IOException;
@Configuration
@EnableRabbit
@Slf4j
public class ConsumerConfig implements RabbitListenerConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private WebSocketServerEndpoint webSocketServerEndpoint;
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Bean
public SimpleMessageListenerContainer messageContainer(@Qualifier("queueTransaction") Queue transaction, @Qualifier("queueContract") Queue contract, @Qualifier("queueQualification") Queue qualification) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(transaction, contract, qualification);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
log.info("receive msg : " + new String(body));
try {
webSocketServerEndpoint.sendMessageToAll(new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//確認訊息成功消費
} catch (IOException e) {
log.error("訊息推送前臺出錯:" + e.getMessage() + "/r/n重新發送");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //重新發送
}
}
});
return container;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}
}
配置完後會自動監聽訊息佇列,AcknowledgeMode設定為手動模式可以自己來管理是否確認消費成功或者重新發送,當然我這裡重新發送會不斷的傳送,到時候根據具體業務進行處理就行了。
websocket
配置
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
socket服務
package com.yxd.rabbitmq.web.websocket;
import com.alibaba.fastjson.JSONObject;
import com.yxd.rabbitmq.model.MessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* ServerEndpoint
* <p>
* 使用springboot的唯一區別是要@Component宣告下,而使用獨立容器是由容器自己管理websocket的,但在springboot中連容器都是spring管理的。
* <p>
* 雖然@Component預設是單例模式的,但springboot還是會為每個websocket連線初始化一個bean,所以可以用一個靜態set儲存起來。
*/
@ServerEndpoint("/ws/yxd/{userId}") //WebSocket客戶端建立連線的地址
@Component
@Slf4j
public class WebSocketServerEndpoint {
/**
* 存活的session集合(使用執行緒安全的map儲存)
*/
private static Map<String, Session> livingSessions = new ConcurrentHashMap<>();
/**
* 建立連線的回撥方法
*
* @param session 與客戶端的WebSocket連線會話
* @param userId 使用者名稱,WebSocket支援路徑引數
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
livingSessions.put(session.getId(), session);
log.info(userId + "進入連線");
}
@OnMessage
public void onMessage(String message, Session session, @PathParam("userId") String userId) {
log.info(userId + " : " + message);
sendMessageToAll(userId + " : " + message);
}
@OnError
public void onError(Session session, Throwable error) {
log.info("發生錯誤");
log.error(error.getStackTrace() + "");
}
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
livingSessions.remove(session.getId());
log.info(userId + " 關閉連線");
}
/**
* 單獨傳送訊息
*
* @param session
* @param message
*/
public void sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群發訊息
*
* @param message
*/
public void sendMessageToAll(String message) {
MessageVo messageVo = JSONObject.parseObject(message, MessageVo.class);
livingSessions.forEach((sessionId, session) -> {
//發給指定的接收使用者
// if (userId.equals(messageVo.getReceiveUserId())) {
sendMessage(session, message);
// }
});
}
}
前臺
我這裡使用的是vue
queueReceiveSetting: {//訊息佇列配置
websock: null,
client: null,
wsuri: "ws://localhost:8111/rabbitmq/ws/huhy/lxy",
},
initWebSocket() {
//ws地址
if (this.queueReceiveSetting.websock) {
this.queueReceiveSetting.websock.close()
}
this.queueReceiveSetting.websock = new WebSocket(this.queueReceiveSetting.wsuri);
this.queueReceiveSetting.websock.onopen = function (res) {
console.log("開啟連線")
};
this.queueReceiveSetting.websock.onmessage = function (res) {
let data = JSON.parse(res.data)
console.log("收到資料:" + data.content)
Message({
message: data.content,
type: 'info',
showClose: true,
duration: 3 * 1000
})
};
this.queueReceiveSetting.websock.onclose = function (res) {
console.log("連線關閉")
};
this.queueReceiveSetting.websock.onerror = function (res) {
console.log("連接出錯")
// this.initWebSocket();
};
}
這樣就能建立連線,進行訊息推送了。因為是自己剛寫的,還有很多不足,希望大家也能提出來^_^相關推薦
springboot+rabbitmq+websocket廣播模式進行訊息實時推送
如何安裝rabbitmq在此就不再贅述了,直接上程式碼,使用的direct佇列模式。依賴 <dependency> <groupId>org.springframework.boot</groupId>
JavaScript仿通知欄新訊息實時推送更新效果
用到websocket做訊息推送,其中有這樣的一個效果,在訊息列表的模組,接收到很多條資訊,展示在介面的是最近的十條接受到的訊息,實時更新模組,每次接受一條訊息,都會展示在最頂部,舊的訊息排列在地下,大致效果如下所示。 用js仿寫了一個簡單的,關於通知欄新訊息實時推送更新效果,程式碼如下:
轉載自-亭鈺wang,php 訊息實時推送(反ajax推送)
入口檔案index.html <!DOCTYPE HTML> <html> <head> <title>反ajax推送</title> <style>  
WebSocket 教程,關於伺服器實時推送
原文出處: http://www.ruanyifeng.com/blog/2017/05/websocket.html WebSocket 是一種網路通訊協議,很多高階功能都需要它。 本文介紹 WebSocket 協議的使用方法。 一、為什麼需要 WebSocket?
Asp.net SignalR 實現服務端訊息實時推送到所有Web端
ASP .NET SignalR是一個ASP .NET 下的類庫,可以在ASP .NET 的Web專案中實現實時通訊。實際上 Asp.net SignalR 2 實現 服務端訊息推送到Web端, 更加簡單
WebSocket和kafka實現資料實時推送到前端
一. 需求背景 最近新接觸一個需求,需要將kafka中的資料實時推送到前端展示。最開始想到的是前端輪詢介面資料,但是無法保證輪詢的頻率和消費的頻率完全一致,或造成資料缺失等問題。最終確定用利用WebSocket實現資料的實時推送。 二. websocket簡介 網上已經有好
Java Spring WebSocket實現後端訊息主動推送
這篇文章將介紹如何構建一個簡單的WebSocket訊息推送Demo使用eclipse建立maven專案後引入相關的依賴jar包,如下:<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://m
利用 socket.io 實現訊息實時推送
專案背景介紹 最近在寫的專案中存在著社交模組,需要實現這樣的一個功能:當發生了使用者被點贊、評論、關注等操作時,需要由伺服器向用戶實時地推送一條訊息。最終完成的專案地址為:https://github.com/noiron/socket-message-push,這裡將介紹一下實現的思路及部分程式碼。
C/C++ 基於 websocket 的前臺及後臺實時推送
實現步驟如下: 1. 獲取 GoEasy appkey. 在 goeasy 官網上註冊一個賬號,並新建一個 app. APP 建立好後系統會為該 app 自動生成兩個 key, 一個既可以用來接收又可以用來推送 (supper key),另一個只可以用來接收(subscr
SpringBoot整合WebSocket【基於STOMP協議】進行點對點[一對一]和廣播[一對多]實時推送,內附簡易聊天室demo
最近專案來了新需求,需要做一個實時推送的功能,伺服器主動推送訊息給客戶端,在網上經過一輪搜查之後,確定使用WebSocket來進行開發。以前經常聽說WebSocket的神奇之處,如今終於可以嘗試使用它了。1.淺談WebSocketWebSocket是在HTML5基礎上單個TC
SpringBoot整合WebSocket【基於純H5】進行點對點[一對一]和廣播[一對多]實時推送
之前實現WebSocket基於STOMP的,覺得SpringBoot封裝的太高,不怎麼靈活,現在實現一個純H5的,也大概瞭解webSocket在內部是怎麼傳輸的。1.環境搭建因為在上一篇基於STOMP協議實現的WebSocket裡已經有大概介紹過Web的基本情況了,所以在這篇
RabbitMQ訂閱釋出的訊息,通過WebSocket實現資料實時推送到前端上顯
一、架構簡單概述 RabbitMQ訊息佇列服務善於解決多系統、異構系統間的資料交換(訊息通知/通訊)問題,並且可以訂閱和釋出,而隨著HTML5誕生的WebSocket協議實現了瀏覽器與伺服器的全雙工通訊,擴充套件了瀏覽器與服務端的通訊功能,使服務端也能主動向客戶端傳送資料
Python-RabbitMQ fanout廣播模式
空閑 ron count 現實 打開 公平分發 簡單 實現 mage 現實生活中,機器的配置不同,有的性能高,有的性能低,有可能A機器處理了四條消息,B機器一條消息還沒處理完,要是按照輪詢方式來分發消息的話,那就是A機器一直是空閑,B機器有一大丟消息處理不完。我們在部署集群
Python-RabbitMQ direct廣播模式
shadow sha vpd 一個 ima info mar 技術 ror fanout廣播模式是全部都能收到信息,那我要是想要有條件選擇的接收呢,需要用到direct模式 這張圖的大概意思是Exchange的類型為direct,發的error級別的消息投遞到第一個隊列,消
WebSocket實時推送訊息
廢話不多說,直接上程式碼,本人親自測試,可以使用。 前臺js程式碼: var websocket; var path = window.location.host+"/web-socket"; // 首先判斷是否 支援 WebSocket if('WebSocket' in windo
springboot整合rabbitMQ實現訊息的推送
RabbitMQ訊息中介軟體元件,目前比較流行的訊息中介軟體有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。 我的程式碼中用的是RabbitMQ,先介紹幾個概念: 一:訊息佇列的特性如下: 非同步性,將耗時的同步操作通過以傳送訊息的方
web的實時推送技術--websocket
Python的tornado框架中,websocket允許瀏覽器和伺服器之間進行雙向,實時,持久的,全雙工通訊。協議本身使用 ws://URL的格式,如果是加密的websocket則使用的是wss://URL websocket和其他方式的比較: 1、http協
WebSocket實現實時推送資料到前端
@Component @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer{ @Resource goodsWebSocketHandler handler;
Spring+Websocket實現訊息的推送
本文主要有三個步驟 1、使用者登入後建立websocket連線,預設選擇websocket連線,如果瀏覽器不支援,則使用sockjs進行模擬連線 2、建立連線後,服務端返回該使用者的未讀訊息 3、服務端進行相關操作後,推送給某一個使用者或者所有使用者新訊息 相關
spring boot 整合websocket實現實時推送
websocket暫時能想到的應用是瀏覽器聊天和後臺日誌實時顯示到前臺。 當後臺啟動程式後,等待前臺連線,連線上之後,前後臺就可以相互發送資料了。 先貼上pom配置: <dependency> <groupId&g