spring boot Rabbitmq集成,延時消息隊列實現
spring boot提供對mq消息隊列支持amqp相關包,引入即可:
[html] view plain copy
- <!-- rabbit mq -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
屬性配置文件application.properties:
[plain] view plain copy
- #rabbitmq
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=root
- spring.rabbitmq.password=root
RabbitMq配置類,配置連接工廠以及操作對象:
[java] view plain copy
- @Configuration
- @ConfigurationProperties(prefix = "spring.rabbitmq")
- public class RabbitMQConfiguration {
- private static Logger logger = Logger.getLogger(RabbitMQConfiguration.class);
- private String host;
- private int port;
- private String username;
- private String password;
- // 鏈接信息
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true);
- logger.info("Create ConnectionFactory bean ..");
- return connectionFactory;
- }
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- //省略getter setter
- }
定義Service接口如下:
暫時不考慮延時隊列,定義發送消息接口
[java] view plain copy
- /**
- *
- * @author victor
- * @desc 消息隊列服務接口
- */
- public interface IMessageQueueService {
- /**
- * 發送消息到隊列
- * @param queue 隊列名稱
- * @param message 消息內容
- */
- public void send(String queueName,String message);
- }
Service實現
[java] view plain copy
- package com.ks.server.service.impl.queue;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import com.base.common.codec.JSONUtils;
- import com.ks.common.constant.MQConstant;
- import com.ks.common.service.queue.IMessageQueueService;
- import com.ks.modal.queue.DLXMessage;
- /**
- *
- * @author victor
- * @desc 消息隊列服務接口實現
- */
- @Service("messageQueueService")
- public class MessageQueueServiceImpl implements IMessageQueueService{
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Override
- public void send(String queueName, String msg) {
- rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, msg);
- }
- }
相關常量類:
[java] view plain copy
- package com.ks.common.constant;
- /**
- *
- * @author victor
- * @desc Rabbit消息隊列相關常量
- */
- public final class MQConstant {
- private MQConstant(){
- }
- //exchange name
- public static final String DEFAULT_EXCHANGE = "KSHOP";
- //DLX QUEUE
- public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";
- //DLX repeat QUEUE 死信轉發隊列
- public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";
- //Hello 測試消息隊列名稱
- public static final String HELLO_QUEUE_NAME = "HELLO";
- }
到現在為止,隊列相關配置,以及使用以及封裝完成,接下來是創建隊列,
這裏我是單獨創建一個配置類,用於隊列配置, 創建Hello隊列示例如下:
[java] view plain copy
- package com.ks.ons.config;
- import java.util.HashMap;
- import java.util.Map;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import com.ks.common.constant.MQConstant;
- /**
- *
- * @author victor
- * @desc 隊列配置
- */
- @Configuration
- public class QueueConfiguration {
- //信道配置
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);
- }
- /********************* hello 隊列 測試 *****************/
- @Bean
- public Queue queue() {
- Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);
- return queue;
- }
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);
- }
- }
通過配置隊列bean,在程序啟動時會在rabbitmq中創建相關隊列,啟動程序,可以在rabbtmq管理界面看到信道和隊列信息:
眾所周知,既然有了隊列,用來處理業務的最終還是需要消費者,消費者創建示例如下:
[java] view plain copy
- package com.ks.ons.processor.hello;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import com.ks.common.constant.MQConstant;
- /**
- *
- * @author victor
- * @desc hello 消息隊列消費者
- */
- @Component
- @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME)
- public class HelloProcessor {
- @RabbitHandler
- public void process(String content) {
- System.out.println("接受消息:" + content);
- }
- }
註入service
[java] view plain copy
- @Autowired
- private IMessageQueueService messageQueueService;
發送消息
[java] view plain copy- messageQueueService.send(MQConstant.HELLO_QUEUE_NAME, "測試發送消息");
接下來展示如何實現延時隊列,在此之前如果讀者像我一樣對rabbitmq隊列了解程度並不深入的話,--> 推薦文章, 可以對rabbitmq延時隊列實現思路有大概了解.
在本文中,主要是通過rabbitmq的DLX特性來實現發送延時隊列:
思路如下:
客戶端:指具體往MQ發生消息端, 客戶端將消息內容進行自定義包裝, 將消息中附帶目標隊列名稱。如:客戶端向隊列Q1發送字符串“hello” , 延時時間為60秒, 包裝後修改為{"queueName":"Q1","body": “hello”},此時,將消息發送到DLX死信隊列,而非Q1隊列,並將消息設置為60秒超時。
DLX:死信隊列,用來存儲有超時時間信息的消息, 並且可以設置當消息超時時,轉發到另一個指定隊列(此處設置轉發到router), 無消費者,當接收到客戶端消息之後,等待消息超時,將消息轉發到指定的Router隊列
Router: 轉發隊列,用來接收死信隊列超時消息, 如上示例消息,在接收到之後,消費者將消息解析,獲取queueName,body,再向所獲取的queueName隊列發送一條消息,內容為body.
Q1,Q2,Q3.: 用戶業務隊列,當Q1收到hello,已經是60秒之後,再進行消費
修改上面代碼 , 新增兩個隊列,
死信隊列:存放發送的延時消息,
路由轉發隊列:用於接受死信消息死亡, 並將消息轉發到業務目標隊列
修改之後代碼如下:
[java] view plain copy
- /**
- *
- * @author victor
- * @desc 隊列配置
- */
- @Configuration
- public class QueueConfiguration {
- //信道配置
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);
- }
- @Bean
- public Queue repeatTradeQueue() {
- Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);
- return queue;
- }
- @Bean
- public Binding drepeatTradeBinding() {
- return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
- }
- @Bean
- public Queue deadLetterQueue() {
- Map<String, Object> arguments = new HashMap<>();
- arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE);
- arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
- Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);
- System.out.println("arguments :" + queue.getArguments());
- return queue;
- }
- @Bean
- public Binding deadLetterBinding() {
- return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);
- }
- /********************* hello 隊列 測試 *****************/
- @Bean
- public Queue queue() {
- Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);
- return queue;
- }
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);
- }
- }
修改Service服務:
[java] view plain copy
- package com.ks.common.service.queue;
- /**
- *
- * @author victor
- * @desc 消息隊列服務接口
- */
- public interface IMessageQueueService {
- /**
- * 發送消息到隊列
- * @param queue 隊列名稱
- * @param message 消息內容
- */
- public void send(String queueName,String message);
- /**
- * 延遲發送消息到隊列
- * @param queue 隊列名稱
- * @param message 消息內容
- * @param times 延遲時間 單位毫秒
- */
- public void send(String queueName,String message,long times);
- }
[java] view plain copy
- package com.ks.server.service.impl.queue;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import com.base.common.codec.JSONUtils;
- import com.ks.common.constant.MQConstant;
- import com.ks.common.service.queue.IMessageQueueService;
- import com.ks.modal.queue.DLXMessage;
- /**
- *
- * @author victor
- * @desc 消息隊列服務接口實現
- */
- @Service("messageQueueService")
- public class MessageQueueServiceImpl implements IMessageQueueService{
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Override
- public void send(String queueName, String msg) {
- rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, msg);
- }
- @Override
- public void send(String queueName, String msg, long times) {
- DLXMessage dlxMessage = new DLXMessage(queueName,msg,times);
- MessagePostProcessor processor = new MessagePostProcessor(){
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setExpiration(times + "");
- return message;
- }
- };
- dlxMessage.setExchange(MQConstant.DEFAULT_EXCHANGE);
- rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSONUtils.toJson(dlxMessage), processor);
- }
- }
JSONUtils 為一個JSON工具類
新增消息實體,用於包裝消息:
[java] view plain copy
- package com.ks.modal.queue;
- import java.io.Serializable;
- /**
- *
- * @author victor
- * @desc rabbit 死信消息載體
- */
- public class DLXMessage implements Serializable {
- private static final long serialVersionUID = 9956432152000L;
- public DLXMessage() {
- super();
- }
- public DLXMessage(String queueName, String content, long times) {
- super();
- this.queueName = queueName;
- this.content = content;
- this.times = times;
- }
- public DLXMessage(String exchange, String queueName, String content, long times) {
- super();
- this.exchange = exchange;
- this.queueName = queueName;
- this.content = content;
- this.times = times;
- }
- private String exchange;
- private String queueName;
- private String content;
- private long times;
- //省略getter setter
- }
路由轉發隊列消費者實現,負責接收超時消息,進行轉發:
[java] view plain copy
- package com.ks.ons.processor.system;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import com.base.common.codec.JSONUtils;
- import com.ks.common.constant.MQConstant;
- import com.ks.common.service.queue.IMessageQueueService;
- import com.ks.modal.queue.DLXMessage;
- /**
- *
- * @author victor
- * @desc 死信接收處理消費者
- */
- @Component
- @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)
- public class TradeProcessor {
- @Autowired
- private IMessageQueueService messageQueueService;
- @RabbitHandler
- public void process(String content) {
- DLXMessage message = JSONUtils.toBean(content, DLXMessage.class);
- messageQueueService.send(message.getQueueName(), message.getContent());
- }
- }
啟動項目之後,管理界面如下:
測試代碼片段:
[java] view plain copy
- messageQueueService.send(MQConstant.HELLO_QUEUE_NAME,"測試延遲發送消息",60000);
spring boot Rabbitmq集成,延時消息隊列實現