1. 程式人生 > >springboot整合rabbit,支持消息確認機制

springboot整合rabbit,支持消息確認機制

relative pom.xml .so .net 測試類 .config ble closed 並且

安裝

推薦一篇博客 https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

項目結構

技術分享圖片

POM.XML

技術分享圖片
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>com.example</groupId> 7 <artifactId>rabbitmq</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 <packaging>jar</packaging> 10 11 <name>rabbitmq</name> 12
<description>Spring Boot 整合RabbitMQ</description> 13 14 <parent> 15 <groupId>org.springframework.boot</groupId> 16 <artifactId>spring-boot-starter-parent</artifactId> 17 <version>2.0.5.RELEASE</version> 18 <
relativePath/> <!-- lookup parent from repository --> 19 </parent> 20 21 <properties> 22 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 23 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 24 <java.version>1.8</java.version> 25 </properties> 26 27 <dependencies> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter</artifactId> 31 </dependency> 32 33 <!-- rabbitmq --> 34 <dependency> 35 <groupId>org.springframework.boot</groupId> 36 <artifactId>spring-boot-starter-amqp</artifactId> 37 </dependency> 38 39 <dependency> 40 <groupId>org.springframework.boot</groupId> 41 <artifactId>spring-boot-starter-test</artifactId> 42 <scope>test</scope> 43 </dependency> 44 </dependencies> 45 46 <build> 47 <plugins> 48 <plugin> 49 <groupId>org.springframework.boot</groupId> 50 <artifactId>spring-boot-maven-plugin</artifactId> 51 </plugin> 52 </plugins> 53 </build> 54 55 56 </project>
POM.XML

application.yml

需要將publisher-confrems設為true,啟動確認回調, 將 publisher-returns設為true 確認返回回調

技術分享圖片

rabbitmq配置類--RabbitConfig

第一部分, 定義隊列

技術分享圖片

第二部分,設置一些消息處理策略

技術分享圖片

技術分享圖片
 1 package com.example.rabbitmq;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.core.Queue;
 6 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 
10 import javax.annotation.Resource;
11 
12 /**
13  * rabbitMq 配置類
14  * @author milicool
15  * Created on 2018/9/14
16  */
17 @Configuration
18 public class RabbitConfig {
19     @Resource
20     private RabbitTemplate rabbitTemplate;
21 
22     /**
23      * 定義一個hello的隊列
24      * Queue 可以有4個參數
25      *      1.隊列名
26      *      2.durable       持久化消息隊列 ,rabbitmq重啟的時候不需要創建新的隊列 默認true
27      *      3.auto-delete   表示消息隊列沒有在使用時將被自動刪除 默認是false
28      *      4.exclusive     表示該消息隊列是否只在當前connection生效,默認是false
29      */
30     @Bean
31     public Queue helloQueue() {
32         return new Queue("queue-test");
33     }
34 
35     /** ======================== 定制一些處理策略 =============================*/
36 
37     /**
38      * 定制化amqp模版
39      *
40      * ConfirmCallback接口用於實現消息發送到RabbitMQ交換器後接收ack回調   即消息發送到exchange  ack
41      * ReturnCallback接口用於實現消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調  即消息發送不到任何一個隊列中  ack
42      */
43     @Bean
44     public RabbitTemplate rabbitTemplate() {
45         Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
46 
47         // 消息發送失敗返回到隊列中, yml需要配置 publisher-returns: true
48         rabbitTemplate.setMandatory(true);
49 
50         // 消息返回, yml需要配置 publisher-returns: true
51         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
52             String correlationId = message.getMessageProperties().getCorrelationIdString();
53             log.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {}  路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
54         });
55 
56         // 消息確認, yml需要配置 publisher-confirms: true
57         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
58             if (ack) {
59                 // log.debug("消息發送到exchange成功,id: {}", correlationData.getId());
60             } else {
61                 log.debug("消息發送到exchange失敗,原因: {}", cause);
62             }
63         });
64 
65         return rabbitTemplate;
66     }
67 }
配置類

生產者

 1 /**
 2  * 生產者
 3  * @author milicool
 4  * Created on 2018/9/14
 5  */
 6 @Component
 7 public class Producer {
 8 
 9     @Autowired
10     private RabbitTemplate rabbitTemplate;
11 
12     /**
13      * 給hello隊列發送消息
14      */
15     public void send() {
16         for (int i =0; i< 100; i++) {
17             String msg = "hello, 序號: " + i;
18             System.out.println("Producer, " + msg);
19             rabbitTemplate.convertAndSend("queue-test", msg);
20         }
21     }
22 
23 }

消費者

 1 /**
 2  * 消費者
 3  * @author milicool
 4  * Created on 2018/9/14
 5  */
 6 @Component
 7 public class Comsumer {
 8     private Logger log = LoggerFactory.getLogger(Comsumer.class);
 9 
10     @RabbitListener(queues = "queue-test")
11     public void process(Message message, Channel channel) throws IOException {
12         // 采用手動應答模式, 手動確認應答更為安全穩定
13         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
14         log.info("receive: " + new String(message.getBody()));
15     }
16 }

測試類

 1 @RunWith(SpringRunner.class)
 2 @SpringBootTest
 3 public class RabbitmqApplicationTests {
 4 
 5     @Autowired
 6     private Producer producer;
 7 
 8     @Test
 9     public void contextLoads() {
10         producer.send();
11     }
12 
13 }

測試結果

測試結果太長,沒有截取全部,可以查看到消費者接收到了全部消息,如果有的消息在沒有接收完,消息將被持久化,下次啟動時消費

技術分享圖片

web端查看

技術分享圖片

感謝閱讀 o(∩_∩)o

技術分享圖片

springboot整合rabbit,支持消息確認機制