1. 程式人生 > >spring boot Rabbitmq集成,延時消息隊列實現

spring boot Rabbitmq集成,延時消息隊列實現

-c 轉發 包裝 let err 接下來 rec ger str

本篇主要記錄Spring boot 集成Rabbitmq,分為兩部分, 第一部分為創建普通消息隊列, 第二部分為延時消息隊列實現:

spring boot提供對mq消息隊列支持amqp相關包,引入即可:

[html] view plain copy
  1. <!-- rabbit mq -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

屬性配置文件application.properties:

[plain] view plain copy
  1. #rabbitmq
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=root
  5. spring.rabbitmq.password=root



RabbitMq配置類,配置連接工廠以及操作對象:

[java] view plain copy
  1. @Configuration
  2. @ConfigurationProperties(prefix = "spring.rabbitmq")
  3. public class RabbitMQConfiguration {
  4. private static Logger logger = Logger.getLogger(RabbitMQConfiguration.class);
  5. private String host;
  6. private int port;
  7. private String username;
  8. private String password;
  9. // 鏈接信息
  10. @Bean
  11. public ConnectionFactory connectionFactory() {
  12. CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
  13. connectionFactory.setUsername(username);
  14. connectionFactory.setPassword(password);
  15. connectionFactory.setVirtualHost("/");
  16. connectionFactory.setPublisherConfirms(true);
  17. logger.info("Create ConnectionFactory bean ..");
  18. return connectionFactory;
  19. }
  20. @Bean
  21. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  22. public RabbitTemplate rabbitTemplate() {
  23. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  24. return template;
  25. }
  26. //省略getter setter
[java] view plain copy
  1. }

定義Service接口如下:

暫時不考慮延時隊列,定義發送消息接口

[java] view plain copy
  1. /**
  2. *
  3. * @author victor
  4. * @desc 消息隊列服務接口
  5. */
  6. public interface IMessageQueueService {
  7. /**
  8. * 發送消息到隊列
  9. * @param queue 隊列名稱
  10. * @param message 消息內容
  11. */
  12. public void send(String queueName,String message);
  13. }

Service實現

[java] view plain copy
  1. package com.ks.server.service.impl.queue;
  2. import org.springframework.amqp.AmqpException;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessagePostProcessor;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import com.base.common.codec.JSONUtils;
  9. import com.ks.common.constant.MQConstant;
  10. import com.ks.common.service.queue.IMessageQueueService;
  11. import com.ks.modal.queue.DLXMessage;
  12. /**
  13. *
  14. * @author victor
  15. * @desc 消息隊列服務接口實現
  16. */
  17. @Service("messageQueueService")
  18. public class MessageQueueServiceImpl implements IMessageQueueService{
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. @Override
  22. public void send(String queueName, String msg) {
  23. rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, msg);
  24. }
  25. }


相關常量類:

[java] view plain copy
  1. package com.ks.common.constant;
  2. /**
  3. *
  4. * @author victor
  5. * @desc Rabbit消息隊列相關常量
  6. */
  7. public final class MQConstant {
  8. private MQConstant(){
  9. }
  10. //exchange name
  11. public static final String DEFAULT_EXCHANGE = "KSHOP";
  12. //DLX QUEUE
  13. public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";
  14. //DLX repeat QUEUE 死信轉發隊列
  15. public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";
  16. //Hello 測試消息隊列名稱
  17. public static final String HELLO_QUEUE_NAME = "HELLO";
  18. }

到現在為止,隊列相關配置,以及使用以及封裝完成,接下來是創建隊列,

這裏我是單獨創建一個配置類,用於隊列配置, 創建Hello隊列示例如下:

[java] view plain copy
  1. package com.ks.ons.config;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.springframework.amqp.core.Binding;
  5. import org.springframework.amqp.core.BindingBuilder;
  6. import org.springframework.amqp.core.DirectExchange;
  7. import org.springframework.amqp.core.Queue;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import com.ks.common.constant.MQConstant;
  11. /**
  12. *
  13. * @author victor
  14. * @desc 隊列配置
  15. */
  16. @Configuration
  17. public class QueueConfiguration {
  18. //信道配置
  19. @Bean
  20. public DirectExchange defaultExchange() {
  21. return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);
  22. }
  23. /********************* hello 隊列 測試 *****************/
  24. @Bean
  25. public Queue queue() {
  26. Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);
  27. return queue;
  28. }
  29. @Bean
  30. public Binding binding() {
  31. return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);
  32. }
  33. }


通過配置隊列bean,在程序啟動時會在rabbitmq中創建相關隊列,啟動程序,可以在rabbtmq管理界面看到信道和隊列信息:

技術分享

技術分享

眾所周知,既然有了隊列,用來處理業務的最終還是需要消費者,消費者創建示例如下:

[java] view plain copy
  1. package com.ks.ons.processor.hello;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import com.ks.common.constant.MQConstant;
  6. /**
  7. *
  8. * @author victor
  9. * @desc hello 消息隊列消費者
  10. */
  11. @Component
  12. @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME)
  13. public class HelloProcessor {
  14. @RabbitHandler
  15. public void process(String content) {
  16. System.out.println("接受消息:" + content);
  17. }
  18. }

註入service

[java] view plain copy
  1. @Autowired
  2. private IMessageQueueService messageQueueService;

發送消息

[java] view plain copy
  1. messageQueueService.send(MQConstant.HELLO_QUEUE_NAME, "測試發送消息");




接下來展示如何實現延時隊列,在此之前如果讀者像我一樣對rabbitmq隊列了解程度並不深入的話,--> 推薦文章, 可以對rabbitmq延時隊列實現思路有大概了解.

在本文中,主要是通過rabbitmq的DLX特性來實現發送延時隊列:

思路如下:

技術分享

客戶端:指具體往MQ發生消息端, 客戶端將消息內容進行自定義包裝, 將消息中附帶目標隊列名稱。如:客戶端向隊列Q1發送字符串“hello” , 延時時間為60秒, 包裝後修改為{"queueName":"Q1","body": “hello”},此時,將消息發送到DLX死信隊列,而非Q1隊列,並將消息設置為60秒超時。

DLX:死信隊列,用來存儲有超時時間信息的消息, 並且可以設置當消息超時時,轉發到另一個指定隊列(此處設置轉發到router), 無消費者,當接收到客戶端消息之後,等待消息超時,將消息轉發到指定的Router隊列

Router: 轉發隊列,用來接收死信隊列超時消息, 如上示例消息,在接收到之後,消費者將消息解析,獲取queueName,body,再向所獲取的queueName隊列發送一條消息,內容為body.

Q1,Q2,Q3.: 用戶業務隊列,當Q1收到hello,已經是60秒之後,再進行消費

修改上面代碼 , 新增兩個隊列,

死信隊列:存放發送的延時消息,

路由轉發隊列:用於接受死信消息死亡, 並將消息轉發到業務目標隊列

修改之後代碼如下:

[java] view plain copy
  1. /**
  2. *
  3. * @author victor
  4. * @desc 隊列配置
  5. */
  6. @Configuration
  7. public class QueueConfiguration {
  8. //信道配置
  9. @Bean
  10. public DirectExchange defaultExchange() {
  11. return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);
  12. }
  13. @Bean
  14. public Queue repeatTradeQueue() {
  15. Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);
  16. return queue;
  17. }
  18. @Bean
  19. public Binding drepeatTradeBinding() {
  20. return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
  21. }
  22. @Bean
  23. public Queue deadLetterQueue() {
  24. Map<String, Object> arguments = new HashMap<>();
  25. arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE);
  26. arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
  27. Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);
  28. System.out.println("arguments :" + queue.getArguments());
  29. return queue;
  30. }
  31. @Bean
  32. public Binding deadLetterBinding() {
  33. return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);
  34. }
  35. /********************* hello 隊列 測試 *****************/
  36. @Bean
  37. public Queue queue() {
  38. Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);
  39. return queue;
  40. }
  41. @Bean
  42. public Binding binding() {
  43. return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);
  44. }
  45. }


修改Service服務:

[java] view plain copy
  1. package com.ks.common.service.queue;
  2. /**
  3. *
  4. * @author victor
  5. * @desc 消息隊列服務接口
  6. */
  7. public interface IMessageQueueService {
  8. /**
  9. * 發送消息到隊列
  10. * @param queue 隊列名稱
  11. * @param message 消息內容
  12. */
  13. public void send(String queueName,String message);
  14. /**
  15. * 延遲發送消息到隊列
  16. * @param queue 隊列名稱
  17. * @param message 消息內容
  18. * @param times 延遲時間 單位毫秒
  19. */
  20. public void send(String queueName,String message,long times);
  21. }

[java] view plain copy
  1. package com.ks.server.service.impl.queue;
  2. import org.springframework.amqp.AmqpException;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessagePostProcessor;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import com.base.common.codec.JSONUtils;
  9. import com.ks.common.constant.MQConstant;
  10. import com.ks.common.service.queue.IMessageQueueService;
  11. import com.ks.modal.queue.DLXMessage;
  12. /**
  13. *
  14. * @author victor
  15. * @desc 消息隊列服務接口實現
  16. */
  17. @Service("messageQueueService")
  18. public class MessageQueueServiceImpl implements IMessageQueueService{
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. @Override
  22. public void send(String queueName, String msg) {
  23. rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, msg);
  24. }
  25. @Override
  26. public void send(String queueName, String msg, long times) {
  27. DLXMessage dlxMessage = new DLXMessage(queueName,msg,times);
  28. MessagePostProcessor processor = new MessagePostProcessor(){
  29. @Override
  30. public Message postProcessMessage(Message message) throws AmqpException {
  31. message.getMessageProperties().setExpiration(times + "");
  32. return message;
  33. }
  34. };
  35. dlxMessage.setExchange(MQConstant.DEFAULT_EXCHANGE);
  36. rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSONUtils.toJson(dlxMessage), processor);
  37. }
  38. }

JSONUtils 為一個JSON工具類

新增消息實體,用於包裝消息:

[java] view plain copy
  1. package com.ks.modal.queue;
  2. import java.io.Serializable;
  3. /**
  4. *
  5. * @author victor
  6. * @desc rabbit 死信消息載體
  7. */
  8. public class DLXMessage implements Serializable {
  9. private static final long serialVersionUID = 9956432152000L;
  10. public DLXMessage() {
  11. super();
  12. }
  13. public DLXMessage(String queueName, String content, long times) {
  14. super();
  15. this.queueName = queueName;
  16. this.content = content;
  17. this.times = times;
  18. }
  19. public DLXMessage(String exchange, String queueName, String content, long times) {
  20. super();
  21. this.exchange = exchange;
  22. this.queueName = queueName;
  23. this.content = content;
  24. this.times = times;
  25. }
  26. private String exchange;
  27. private String queueName;
  28. private String content;
  29. private long times;
  30. //省略getter setter
  31. }

路由轉發隊列消費者實現,負責接收超時消息,進行轉發:

[java] view plain copy
  1. package com.ks.ons.processor.system;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import com.base.common.codec.JSONUtils;
  7. import com.ks.common.constant.MQConstant;
  8. import com.ks.common.service.queue.IMessageQueueService;
  9. import com.ks.modal.queue.DLXMessage;
  10. /**
  11. *
  12. * @author victor
  13. * @desc 死信接收處理消費者
  14. */
  15. @Component
  16. @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)
  17. public class TradeProcessor {
  18. @Autowired
  19. private IMessageQueueService messageQueueService;
  20. @RabbitHandler
  21. public void process(String content) {
  22. DLXMessage message = JSONUtils.toBean(content, DLXMessage.class);
  23. messageQueueService.send(message.getQueueName(), message.getContent());
  24. }
  25. }

啟動項目之後,管理界面如下:

技術分享

測試代碼片段:

[java] view plain copy
    1. messageQueueService.send(MQConstant.HELLO_QUEUE_NAME,"測試延遲發送消息",60000);

spring boot Rabbitmq集成,延時消息隊列實現