1. 程式人生 > >RabbitMQ之死信佇列

RabbitMQ之死信佇列

DLX, Dead-Letter-Exchange。利用DLX, 當訊息在一個佇列中變成死信(dead message)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。訊息變成死信一向有一下幾種情況:

  • 訊息被拒絕(basic.reject/ basic.nack)並且requeue=false
  • 佇列達到最大長度

DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性,當這個佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange上去,進而被路由到另一個佇列,可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0以前支援的immediate引數(可以參考

RabbitMQ之mandatory和immediate)的功能。

核心程式碼實現:通過在queueDeclare方法中加入“x-dead-letter-exchange”實現。

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue"
, false, false, false, args);

你也可以為這個DLX指定routing key,如果沒有特殊指定,則使用原佇列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key");

還可以使用policy來配置:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
public static void createQueue(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Map<String, Object> argss = new HashMap<String, Object>(); argss.put("vhost", "/"); argss.put("username","root"); argss.put("password", "root"); argss.put("x-message-ttl",6000); argss.put("x-dead-letter-exchange","exchange.dlx.test"); argss.put("x-dead-letter-routing-key","queue.dlx.test"); channel.queueDeclare("queue.dlx.test", durable, exclusive, autoDelete, argss); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }

通過RabbitMQ的管理介面可以看到:
這裡寫圖片描述
queue.dlx.test這個queue中有個“DLX”和“DLK”的標記. DLX關聯的是exchangeName, DLK關聯的是routingKey.

詳細說明:
在RabbitMQ中有兩個exchange: exchange.dlx.self和exchange.dlx.test,兩個queue:queue.dlx.test和%DLX%queue.dlx.test
exchange.dlx.self是正常情況下,生產者傳送訊息到此exchange中,繫結關係如圖:
這裡寫圖片描述
exchang.dlx.test是產生死信之後,原queue[queue.dlx.test]的死信傳送到此exchange中,繫結關係如圖:
這裡寫圖片描述

資料首先發送到 exchange[exchange.dlx.self],根據routingkey[dlx]路由到queue.dlx.test,如果正常情況下,消費者可以消費queue.dlx.test的內容。但是如果queue.dlx.test中有訊息變成了dead message即死信了,那麼這個死信則會通過exchangeName=exchange.dlx.test, routingKey=”queue.dlx.test”路由到死信佇列%DLX%queue.dlx.test中,如果要消費這個dead message, 此時消費者必須消費%DLX%queue.dlx.test中的內容而不是queue.dlx.test中的內容。

如果不指定x-dead-letter-routing-key引數,則使用原來的routingkey

參考資料

歡迎支援《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。