Rabbit 高階操作

1.過期時間TTL

過期時間TTL表示可以對訊息設定預期的時間,在這個時間內都可以被消費者接收穫取;過了時間之後訊息將自動被刪除。

RabbitMQ可以對訊息和佇列設定TTL。目前有兩種方法可以設定。

  • 第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。
  • 第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。

如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就稱為dead message被投遞到死信佇列, 消費者將無法再收到該訊息。

1.1 設定佇列TTL

1.1.1 配置檔案方式

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--定義過期佇列及其屬性,不存在則自動建立-->
<rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該佇列的訊息如果沒有消費都將在6秒之後被刪除-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
</rabbit:queue-arguments>
</rabbit:queue> </beans>

1.1.2 配置類方式

@Configuration
public class RabbitMQConfig {
@Bean("my_ttl_queue")
public Queue queue() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 6000); // 佇列中的訊息未被消費 10 秒後過期
return new Queue("my_ttl_queue", true, false, false, map);
}
}

1.1.3 建立測試類

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 測試過期佇列訊息
* 投遞到該佇列的訊息如果沒有消費都將在6秒之後被刪除
*/
@Test
public void ttlQueueTest(){
//路由鍵與佇列同名
rabbitTemplate.convertAndSend("my_ttl_queue", "傳送到過期佇列my_ttl_queue,6秒內不消費則過期。");
}
}

執行之後,可以進入網站中檢視該佇列:

剛執行時:

執行6s後:

1.2 設定訊息TTL

/**
* 過期訊息
* 該訊息投遞任何交換機或佇列中的時候;如果到了過期時間則將從該佇列中刪除
*/
@Test
public void ttlMessageTest(){
MessageProperties messageProperties = new MessageProperties();
//設定訊息的過期時間,5秒
messageProperties.setExpiration("5000");
Message message = new Message("測試過期訊息,5秒鐘過期".getBytes(), messageProperties);
//路由鍵與佇列同名
rabbitTemplate.convertAndSend("my_ttl_queue", message);
}

這樣可以給這條message 設定過期時間,如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。所以會 5s 過期。

2.死信佇列

DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱。當訊息在一個佇列中變成死信(dead message)之後,它能被重新發送到另一個交換機中,這個交換機就是DLX ,繫結DLX的佇列就稱之為死信佇列。

訊息變成死信,可能是由於以下的原因:

  • 訊息被拒絕
  • 訊息過期
  • 佇列達到最大長度

DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的佇列上被指定,實際上就是設定某一個佇列的屬性。當這個佇列中存在死信時,Rabbitmq就會自動地將這個訊息重新發布到設定的DLX上去,進而被路由到另一個佇列,即死信佇列。

要想使用死信佇列,只需要在定義佇列的時候設定佇列引數 x-dead-letter-exchange 指定交換機即可。

2.1 配置檔案方法

2.1.1 訊息過期情況

定義死信佇列
<!--定義定向交換機中的持久化死信佇列,不存在則自動建立-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定義廣播型別交換機;並繫結佇列-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
<rabbit:bindings>
<!--繫結路由鍵my_ttl_dlx,可以將過期的訊息轉移到my_dlx_queue佇列-->
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
佇列設定死信交換機
<!--定義過期佇列及其屬性,不存在則自動建立-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該佇列的訊息如果沒有消費都將在6秒之後被投遞到死信交換機-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--設定當訊息過期後投遞到對應的死信交換機-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定義定向交換機 根據不同的路由key投遞訊息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>

2.1.2 訊息佇列過長情況

定義死信佇列
<!--定義定向交換機中的持久化死信佇列,不存在則自動建立-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定義廣播型別交換機-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
<rabbit:bindings>
<!--繫結路由鍵my_max_dlx-->
<rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
佇列設定死信交換機
<!--定義限制長度的佇列及其屬性,不存在則自動建立-->
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該佇列的訊息最多2個訊息,如果超過則最早的訊息被刪除投遞到死信交換機-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--設定當訊息過期後投遞到對應的死信交換機-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定義定向交換機 根據不同的路由key投遞訊息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>

2.2 配置類方式

2.2.1 訊息過期情況

@Configuration
public class RabbitMQConfig {
/**
* 宣告死信交換機
*
* @return 返回
*/
@Bean("my_dlx_exchange")
public DirectExchange myDlxExchange() {
return new DirectExchange("my_dlx_exchange", true, false, new HashMap<>());
}
/**
* 宣告死信佇列
*
* @return 返回
*/
@Bean("my_dlx_queue")
public Queue myDlxQueue() {
return new Queue("my_dlx_queue", true, false, false, new HashMap<>());
}
/**
* 繫結佇列,設定路由key
*
* @param queue 引數
* @param directExchange 引數
* @return 返回
*/
@Bean
public Binding bindingDead(@Qualifier("my_dlx_queue") Queue queue, @Qualifier("my_dlx_exchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("my_ttl_dlx");
}
/**
* 宣告過期佇列
*
* @return 返回
*/
@Bean("my_ttl_dlx_queue")
public Queue myTtlDlxQueue() {
Map<String, Object> map = new HashMap<>();
//6s後佇列過期
map.put("x-message-ttl", 6000);
//訊息過期後,進入死信交換機
map.put("x-dead-letter-exchange", "my_dlx_exchange");
return new Queue("my_ttl_dlx_queue", true, false, false, map);
}
/**
* 宣告交換機
*
* @return 返回
*/
@Bean("my_normal_exchange")
public DirectExchange myNormalExchange() {
return new DirectExchange("my_normal_exchange", true, false, new HashMap<>());
}
/**
* 交換機繫結過期佇列
*
* @param queue 引數
* @param directExchange 引數
* @return 返回
*/
@Bean
public Binding binding(@Qualifier("my_ttl_dlx_queue") Queue queue, @Qualifier("my_normal_exchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("my_ttl_dlx");
}
}

2.2.2 訊息過長情況

@Configuration
public class RabbitMQConfig {
/**
* 宣告死信交換機
*
* @return 返回
*/
@Bean("my_dlx_exchange")
public DirectExchange myDlxExchange() {
return new DirectExchange("my_dlx_exchange", true, false, new HashMap<>());
}
/**
* 宣告死信佇列
*
* @return 返回
*/
@Bean("my_dlx_queue")
public Queue myDlxQueue() {
return new Queue("my_dlx_queue", true, false, false, new HashMap<>());
}
/**
* 繫結佇列,設定路由key
*
* @param queue 引數
* @param directExchange 引數
* @return 返回
*/
@Bean
public Binding bindingDead(@Qualifier("my_dlx_queue") Queue queue, @Qualifier("my_dlx_exchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("my_max_dlx");
}
/**
* 宣告過長佇列
*
* @return 返回
*/
@Bean("my_max_dlx_queue")
public Queue myMaxDlxQueue(){
Map<String, Object> map = new HashMap<>();
//設定訊息過長
map.put("x-max-length",2);
//訊息過長後,進入死信佇列
map.put("x-dead-letter-exchange","my_dlx_exchange");
return new Queue("my_max_dlx_queue",true,false,false,map);
}
/**
* 宣告交換機
*
* @return 返回
*/
@Bean("my_normal_exchange")
public DirectExchange myNormalExchange() {
return new DirectExchange("my_normal_exchange", true, false, new HashMap<>());
}
/**
* 訊息過長 交換機繫結過期佇列
*
* @param queue 引數
* @param directExchange 引數
* @return 返回
*/
@Bean
public Binding binding2(@Qualifier("my_max_dlx_queue") Queue queue, @Qualifier("my_normal_exchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("my_max_dlx");
}
}

2.3 死信佇列測試

2.3.1 測試訊息過期情況

/**
* 過期訊息投遞到死信佇列
* 投遞到一個正常的佇列,但是該佇列有設定過期時間,到過期時間之後訊息會被投遞到死信交換機(佇列)
*/
@Test
public void dlxTTLMessageTest(){
rabbitTemplate.convertAndSend("my_normal_exchange", "my_ttl_dlx", "測試過期訊息;6秒過期後會被投遞到死信交換機");
}

剛執行時:

6s之後:

2.3.2 測試訊息過長情況

/**
* 超過佇列長度訊息投遞到死信佇列
* 投遞到一個正常的佇列,但是該佇列有設定最大訊息數,到最大訊息數之後佇列中最早的訊息會被投遞到死信交換機(佇列)
*/
@Test
public void dlxMaxMessageTest(){
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"佇列my_max_dlx_queue的最大長度為2;訊息超過後會被投遞到死信交換機;這是第1個訊息");
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"佇列my_max_dlx_queue的最大長度為2;訊息超過後會被投遞到死信交換機;這是第2個訊息");
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"佇列my_max_dlx_queue的最大長度為2;訊息超過後會被投遞到死信交換機;這是第3個訊息");
}

傳送了三條資訊,而設定的訊息佇列長度為2,這樣最先發送的第1個訊息會進入死信佇列:

3.延遲佇列

延遲佇列儲存的物件是對應的延遲訊息;所謂“延遲訊息” 是指當訊息被髮送以後,並不想讓消費者立刻拿到訊息,而是等待特定時間後,消費者才能拿到這個訊息進行消費。在RabbitMQ中延遲佇列可以通過 過期時間 + 死信佇列 來實現;

3.1 生產者

public class Producer {
//交換機名稱
static final String MY_DLX_EXCHANGE = "my_dlx_exchange";
static final String MY_NORMAL_EXCHANGE = "my_normal_exchange";
//佇列名稱
static final String MY_DLX_QUEUE = "my_dlx_queue";
static final String MY_TTL_DLX = "my_ttl_dlx";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告死信交換機
channel.exchangeDeclare(MY_DLX_EXCHANGE, BuiltinExchangeType.DIRECT);
//宣告一個佇列
channel.queueDeclare(MY_DLX_QUEUE, true, false, false, null);
//繫結
channel.queueBind(MY_DLX_QUEUE, MY_DLX_EXCHANGE, "my_ttl_dlx");
//宣告工作交換機
channel.exchangeDeclare(MY_NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//宣告工作佇列
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 6000);
map.put("x-dead-letter-exchange", MY_DLX_EXCHANGE);
channel.queueDeclare("my_ttl_dlx", true, false, false, map);
//繫結
channel.queueBind(MY_TTL_DLX, MY_NORMAL_EXCHANGE, "my_ttl_dlx"); String message = LocalDateTime.now() + ",延遲6s的訊息";
channel.basicPublish(MY_NORMAL_EXCHANGE, "my_ttl_dlx", null, message.getBytes());
System.out.println("傳送訊息為::" + message); channel.close();
connection.close();
}
}

3.2 消費者

public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Producer.MY_DLX_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Producer.MY_DLX_QUEUE,true,false,false,null);
channel.queueBind(Producer.MY_DLX_QUEUE,Producer.MY_DLX_EXCHANGE,"my_ttl_dlx");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者收到訊息:" + new String(body,"utf-8")+",當前時間:"+ LocalDateTime.now());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Producer.MY_DLX_QUEUE, false, consumer);
}
}

3.3 測試

先啟動 消費者 ,開啟監聽,在啟動 生產者 傳送訊息。

這時候可能出現錯誤:

這個問題是因為你建立的交換機已經存在,可以去rabbitmq網站中,刪除對應交換機。

再次執行就成功了:

個人部落格為:

MoYu's HomePage