1. 簡介
死信佇列,簡稱:DLX
,Dead Letter Exchange
(死信交換機),當訊息成為Dead message
後,可以被重新發送到另外一個交換機,這個交換機就是DLX
。
(一般會將DLX和與其binding 的 Queue,一併稱為死信佇列或DLX,習慣而已,不必糾結)
那麼什麼情況下會成為Dead message
?
- 佇列的長度達到閾值。
- 消費者拒接消費訊息,
basicNack/basicReject
,並且不把訊息重新放入原目標佇列,requeue=false
。 - 原佇列存在訊息過期設定,訊息到達超時時間未被消費。
流程講解,如圖所示(以第三種情況為例):
Producer
傳送一條訊息到Exchange
並路由到設有過期時間(假設30分鐘)的Queue
中。- 當訊息的存活時間超過了30分鐘後,
Queue
會將訊息轉發給DLX
。 DLX
接收到Dead message
後,將Dead message
路由到與其繫結的Queue
中。- 此時消費者監聽此死信佇列並消費此訊息。
死信佇列有什麼用呢?
- 取消訂單(比如下單30分鐘後未付款,則取消訂單,回滾庫存),或者新使用者註冊,隔段時間進行簡訊問候等。
- 將消費者拒絕的訊息傳送到死信佇列,然後將訊息進行持久化,後續可以做業務分析或者處理。
2. TTL
因為要實現延遲訊息,我們先得知道如何設定過期時間。這裡指演示
TTL
:Time To Live
(存活時間/過期時間),當訊息到達存活時間後,還沒有被消費,會被自動清除。
RabbitMQ可以對訊息設定過期時間,也可以對整個佇列(Queue)設定過期時間。
設定佇列過期時間使用引數:x-message-ttl,單位:ms(毫秒),會對整個佇列訊息統一過期。
設定訊息過期時間使用引數:expiration。單位:ms(毫秒),當該訊息在佇列頭部時(消費時),會單獨判斷這一訊息是否過期。
如果兩者都進行了設定,以時間短的為準。
2.1 佇列設定TTL
2.1.1 引入所需依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.1.2 application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 預設的虛擬主機
virtual-host: /
# rabbit 使用者名稱密碼
username: admin
password: admin123
2.1.3 RabbitConfig
- 宣告一個過期時間為30s的
Queue
。 - 宣告一個交換機(這裡宣告的是主題交換機,交換機型別無所謂,只要訊息能路由到
Queue
即可)。 - 設定繫結關係。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 設定過期佇列
*
* @author ludangxin
* @date 2021/9/15
*/
@Configuration
public class RabbitTtlConfig {
public static final String EXCHANGE_NAME = "TTL_EXCHANGE";
public static final String QUEUE_NAME = "TTL_QUEUE";
@Bean(QUEUE_NAME)
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).ttl(30000).build();
}
@Bean(EXCHANGE_NAME)
public Exchange exchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Binding binding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
}
}
2.1.4 Producer
import com.ldx.rabbitmq.config.RabbitTtlConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 具有過期時間的訊息 生產者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class TtlProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", "這是一條有生命週期的訊息。");
}
}
2.1.5 測試程式碼
@Autowired
private TtlProducer ttlProducer;
@Test
public void sendMsg() {
ttlProducer.sendMsg();
}
2.1.6 啟動測試
執行測試程式碼後,到RabbitMQ 控制檯中檢視佇列即訊息情況。
如圖所示,訊息存活30s未被消費後,訊息被遺棄。
2.2 訊息設定TTL
2.2.1 Producer
我們將Producer程式碼稍加修改,給訊息設定10s的過期時間,觀察訊息到底是存活30s還是10s。
import com.ldx.rabbitmq.config.RabbitTtlConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 具有過期時間的訊息 生產者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class TtlProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
MessageProperties mp = new MessageProperties();
mp.setExpiration("10000");
Message message = new Message("這是一條有生命週期的訊息。".getBytes(), mp);
rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", message);
}
}
2.2.2 啟動測試
如圖所示,訊息只存活了10s。
我們將過期時間設定成40s後,但訊息還是隻存活了30s。說明當同時設定了過期時間時,是以時間短的為準。
3. TTL + DLX
接下來我們通過設定過期時間和死信佇列來實現延遲佇列
的功能。
首先羅列下實現步驟:
- 宣告一個
Exchange
與TTl Queue
,並且繫結關係,實現生成死信的邏輯。 - 宣告一個
DLX
與Queue
,此步驟的Queue
是為了接收死信並讓Consumer
進行監聽消費的。 - 將
TTl Queue
與DLX
進行繫結,使訊息成為死信後能轉發給DLX
。
3.1 RabbitConfig
其實DLX與普通的Exchange沒有什麼區別,只不過是“生產”死信的Queue
指定了訊息成為死信後轉發到DLX。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 死信佇列配置
*
* @author ludangxin
* @date 2021/9/15
*/
@Configuration
public class RabbitDeadLetterConfig {
public static final String QUEUE_NAME_TTL = "QUEUE_NAME_TTL_1";
public static final String EXCHANGE_NAME_TTL = "EXCHANGE_NAME_TTL_1";
public static final String QUEUE_NAME_DEAD_LETTER = "QUEUE_NAME_DEAD_LETTER";
public static final String EXCHANGE_NAME_DLX = "EXCHANGE_NAME_DLX";
public static final String ROUTING_KEY_DLX = "EXPIRE.#";
public static final String ROUTING_KEY_DEAD_LETTER = "EXPIRE.10";
public static final String ROUTING_KEY_TTL = "EXPIRE_TTL_10";
/**
* 1. Queue 佇列
*/
@Bean(QUEUE_NAME_TTL)
public Queue queue() {
/*
* 1. 設定佇列的過期時間 30s
* 2. 繫結DLX
* 3. 設定routing key(注意:這裡設定的是路由到死信Queue的路由,並不是設定binding關係的路由)
*/
return QueueBuilder.durable(QUEUE_NAME_TTL).ttl(10000).deadLetterExchange(EXCHANGE_NAME_DLX).deadLetterRoutingKey(ROUTING_KEY_DEAD_LETTER).build();
}
/**
* 2. exchange
*/
@Bean(EXCHANGE_NAME_TTL)
public Exchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME_TTL).durable(true).build();
}
/**
* 3. 佇列和互動機繫結關係 Binding
*/
@Bean
public Binding bindExchange(@Qualifier(QUEUE_NAME_TTL) Queue queue, @Qualifier(EXCHANGE_NAME_TTL) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_TTL).noargs();
}
/**
* 4. 死信佇列
*/
@Bean(QUEUE_NAME_DEAD_LETTER)
public Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_NAME_DEAD_LETTER).build();
}
/**
* 5. dlx
*/
@Bean(EXCHANGE_NAME_DLX)
public Exchange exchangeDlx() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
}
/**
* 6. 佇列和互動機繫結關係 Binding
*/
@Bean
public Binding bindDlxExchange(@Qualifier(QUEUE_NAME_DEAD_LETTER) Queue queue, @Qualifier(EXCHANGE_NAME_DLX) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DLX).noargs();
}
}
3.2 Producer
import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 延遲訊息生產者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class DelayProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
String msg = "這是一條有生命週期的訊息,傳送時間為:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Message message = new Message(msg.getBytes());
rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.EXCHANGE_NAME_TTL, RabbitDeadLetterConfig.ROUTING_KEY_TTL, message);
}
}
3.3 Consumer
import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 延遲訊息消費者
*
* @author ludangxin
* @date 2021/9/9
*/
@Slf4j
@Component
public class DelayConsumer {
@RabbitListener(queues = {RabbitDeadLetterConfig.QUEUE_NAME_DEAD_LETTER})
public void dlxQueue(Message message){
log.info(new String(message.getBody()) + ",訊息接收時間為:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
}
3.4 測試程式碼
@Autowired
private DelayProducer delayProducer;
@Test
@SneakyThrows
public void sendDlxMsg() {
delayProducer.sendMsg();
// 使程序阻塞,方便Consumer監聽輸出Message
System.in.read();
}
3.5 啟動測試
輸出日誌內容如下:
2021-09-15 23:51:22.795 INFO 8122 --- [ntContainer#0-1] com.ldx.rabbitmq.consumer.DelayConsumer : 這是一條有生命週期的訊息,傳送時間為:2021-09-15 23:51:12,訊息接收時間為:2021-09-15 23:51:22