1. 程式人生 > >rabbitmq重試機制

rabbitmq重試機制

1、應答模式

NONE

可以稱之為自動回撥,即使無響應或者發生異常均會通知佇列消費成功,會丟失資料。

AUTO

自動檢測異常或者超時事件,如果發生則返回noack,訊息自動回到隊尾,但是這種方式可能出現訊息體本身有問題,返回隊尾其他佇列也不能消費,造成佇列阻塞。

MANUAL

手動回撥,在程式中我們可以對訊息異常記性捕獲,如果出現訊息體格式錯誤問題,手動回覆ack,接著再次呼叫傳送介面把訊息推到隊尾。

2、java示例

2.1 pom.xml

<dependencies>
	<!-- spring boot -->
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
</dependencies>

2.2 配置檔案application.yml

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 47.105.92.141
    port: 5672
    username: admin
    password: pwd
    listener:
      simple:
        retry:
          enabled: true #是否開啟消費者重試(為false時關閉消費者重試,這時消費端程式碼異常會一直重複收到訊息)
          max-attempts: 5 #最大重試次數
          initial-interval: 5000 #重試間隔時間(單位毫秒) 
          max-interval: 1200000 #重試最大時間間隔(單位毫秒) 
          multiplier: 5 #應用於前一重試間隔的乘法器。

2.3 配置類

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** 
 * @ClassName: TopicRabbitConfig 
 * @Description: 
 * @author weiyb 
 * @date 2018年2月26日 下午4:38:16  
 */
@Configuration
public class TopicRabbitConfig {

	public final static String QUEUE_NAME = "spring-boot-queue";
	public final static String EXCHANGE_NAME = "spring-boot-exchange";
	public final static String ROUTING_KEY = "spring-boot-key";

	/**
	 * 建立佇列
	 * @return
	 * @author weiyb 
	 */
	@Bean("queueMessage")
	public Queue queueMessage() {
		return new Queue(QUEUE_NAME);
	}


	/**
	 * 建立一個 topic 型別的交換器
	 * @return
	 * @author weiyb 
	 */
	@Bean("exchange")
	TopicExchange exchange() {
		return new TopicExchange(EXCHANGE_NAME);
	}

	/**
	 * 使用路由鍵(routingKey)把佇列(Queue)繫結到交換器(Exchange)
	 * @param queueMessage
	 * @param exchange
	 * @return
	 * @author weiyb 
	 */
	@Bean
	Binding bindingExchangeMessage(@Qualifier("queueMessage") Queue queueMessage,@Qualifier("exchange") TopicExchange exchange) {
		return BindingBuilder.bind(queueMessage).to(exchange).with(ROUTING_KEY);
	}
}

2.4 消費者(監聽)

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.spring.pro.config.exchange.TopicRabbitConfig;
import com.spring.pro.config.exchange.transaction.TopicRabbitConfigTransaction;
import com.spring.pro.model.User;

@Component
public class HelloReceiver {
	private Logger logger = LoggerFactory.getLogger(getClass());

	@RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME)
	public void process11(User user) {
		logger.info("Receiver  process11: " + JSON.toJSONString(user));
		// 這裡丟擲異常,測試訊息重發機制
//		throw new RuntimeException("***********");
	}
}

2.5 model

import java.io.Serializable;

/**
 * @Title: User.java
 * @Package com.spring.pro.model
 * @Description:
 * @author ybwei
 * @date 2018年11月22日 下午2:46:10
 * @version V1.0
 */
public class User implements Serializable {

	private static final long serialVersionUID = 337531670671807745L;
	private String id;
	private String name;
	private int age;

	public User() {
		super();
	}

	public User(String id, String name, int age) {
		super();
		this.id = id;
		this.name = name;
		this.age = age;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}
}

2.6 生產者

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.spring.pro.config.exchange.TopicRabbitConfig;
import com.spring.pro.model.User;

@Component
public class HelloSender {

	@Autowired
	private AmqpTemplate rabbitTemplate;

	public void send1() {
		User user=new User("1", "張三", 12);
		this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEY, user);
	}

}

2.7 測試

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.spring.pro.send.HelloSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {

	@Autowired
	private HelloSender helloSender;

	@Test
	public void hello1() throws Exception {
		helloSender.send1();
	}

}

3、應用場景

  1. 常用的mq通訊。
  2. 上游向下游回調通知策略。配置檔案中的max-attempts(最大重試次數),multiplier(應用於前一重試間隔的乘法器)可以制定策略。

測試重試,第二種方式的日誌如下:

[INFO ] 2018-11-22 18:03:51.853 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver  process11: {"age":12,"id":"1","name":"張三"}
[INFO ] 2018-11-22 18:03:56.881 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver  process11: {"age":12,"id":"1","name":"張三"}
[INFO ] 2018-11-22 18:04:21.893 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver  process11: {"age":12,"id":"1","name":"張三"}
[INFO ] 2018-11-22 18:06:26.907 [SimpleAsyncTaskExecutor-1] c.spring.pro.receiver.HelloReceiver - Receiver  process11: {"age":12,"id":"1","name":"張三"}

當前時間間隔=上次重試間隔*multiplier。

當前時間間隔<max-interval(重試最大時間間隔)。