rabbitmq重試機制
阿新 • • 發佈:2018-12-21
1、應答模式
NONE
可以稱之為自動回撥,即使無響應或者發生異常均會通知佇列消費成功,會丟失資料。
AUTO
自動檢測異常或者超時事件,如果發生則返回noack,訊息自動回到隊尾,但是這種方式可能出現訊息體本身有問題,返回隊尾其他佇列也不能消費,造成佇列阻塞。
MANUAL
手動回撥,在程式中我們可以對訊息異常記性捕獲,如果出現訊息體格式錯誤問題,手動回覆ack,接著再次呼叫傳送介面把訊息推到隊尾。
2、java示例
2.1 pom.xml
<dependencies> <!-- spring boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2.2 配置檔案application.yml
spring: application: name: rabbitmq rabbitmq: host: 47.105.92.141 port: 5672 username: admin password: pwd listener: simple: retry: enabled: true #是否開啟消費者重試(為false時關閉消費者重試,這時消費端程式碼異常會一直重複收到訊息) max-attempts: 5 #最大重試次數 initial-interval: 5000 #重試間隔時間(單位毫秒) max-interval: 1200000 #重試最大時間間隔(單位毫秒) multiplier: 5 #應用於前一重試間隔的乘法器。
2.3 配置類
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName: TopicRabbitConfig * @Description: * @author weiyb * @date 2018年2月26日 下午4:38:16 */ @Configuration public class TopicRabbitConfig { public final static String QUEUE_NAME = "spring-boot-queue"; public final static String EXCHANGE_NAME = "spring-boot-exchange"; public final static String ROUTING_KEY = "spring-boot-key"; /** * 建立佇列 * @return * @author weiyb */ @Bean("queueMessage") public Queue queueMessage() { return new Queue(QUEUE_NAME); } /** * 建立一個 topic 型別的交換器 * @return * @author weiyb */ @Bean("exchange") TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } /** * 使用路由鍵(routingKey)把佇列(Queue)繫結到交換器(Exchange) * @param queueMessage * @param exchange * @return * @author weiyb */ @Bean Binding bindingExchangeMessage(@Qualifier("queueMessage") Queue queueMessage,@Qualifier("exchange") TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(ROUTING_KEY); } }
2.4 消費者(監聽)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.spring.pro.config.exchange.TopicRabbitConfig;
import com.spring.pro.config.exchange.transaction.TopicRabbitConfigTransaction;
import com.spring.pro.model.User;
@Component
public class HelloReceiver {
private Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME)
public void process11(User user) {
logger.info("Receiver process11: " + JSON.toJSONString(user));
// 這裡丟擲異常,測試訊息重發機制
// throw new RuntimeException("***********");
}
}
2.5 model
import java.io.Serializable;
/**
* @Title: User.java
* @Package com.spring.pro.model
* @Description:
* @author ybwei
* @date 2018年11月22日 下午2:46:10
* @version V1.0
*/
public class User implements Serializable {
private static final long serialVersionUID = 337531670671807745L;
private String id;
private String name;
private int age;
public User() {
super();
}
public User(String id, String name, int age) {
super();
this.id = id;
this.name = name;
this.age = age;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
2.6 生產者
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.spring.pro.config.exchange.TopicRabbitConfig;
import com.spring.pro.model.User;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
User user=new User("1", "張三", 12);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEY, user);
}
}
2.7 測試
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.spring.pro.send.HelloSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello1() throws Exception {
helloSender.send1();
}
}
3、應用場景
- 常用的mq通訊。
- 上游向下游回調通知策略。配置檔案中的max-attempts(最大重試次數),multiplier(應用於前一重試間隔的乘法器)可以制定策略。
測試重試,第二種方式的日誌如下:
[INFO ] 2018-11-22 18:03:51.853 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver process11: {"age":12,"id":"1","name":"張三"}
[INFO ] 2018-11-22 18:03:56.881 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver process11: {"age":12,"id":"1","name":"張三"}
[INFO ] 2018-11-22 18:04:21.893 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver process11: {"age":12,"id":"1","name":"張三"}
[INFO ] 2018-11-22 18:06:26.907 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver process11: {"age":12,"id":"1","name":"張三"}
當前時間間隔=上次重試間隔*multiplier。
當前時間間隔<max-interval(重試最大時間間隔)。