【轉】RabbitMQ入門_11_DLX
佇列中的訊息可能會成為死信訊息(dead lettered)。讓訊息成為死信訊息的事件有:
- 訊息被取消確認(nack 或 reject),且設定為不重入佇列(requeue = false)
- 訊息TTL過期
- 佇列達到長度限制
死信訊息會被死信交換機(Dead Letter Exchange, DLX)重新發布。
gordon.study.rabbitmq.dlx.TestDlx.java
<span style="color:#333333"><code><span style="color:#0000ff">public</span> <span style="color:#0000ff">class</span> <span style="color:#a31515">TestDlx</span> { <span style="color:#0000ff">private</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">final</span> String DLX_EXCHANGE_NAME = <span style="color:#a31515">"exchangeDLX"</span>; <span style="color:#0000ff">private</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">final</span> String DLX_QUEUE_NAME = <span style="color:#a31515">"queueDLX"</span>; <span style="color:#0000ff">private</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">final</span> String QUEUE_NAME = <span style="color:#a31515">"queue"</span>; <span style="color:#0000ff">public</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">main</span>(String[] argv) <span style="color:#0000ff">throws</span> Exception { ConnectionFactory factory = <span style="color:#0000ff">new</span> ConnectionFactory(); factory.setHost(<span style="color:#a31515">"localhost"</span>); Connection connection = factory.newConnection(); Channel senderChannel = connection.createChannel(); Channel consumerChannel = connection.createChannel(); Map<String, Object> args = <span style="color:#0000ff">new</span> HashMap<String, Object>(); args.put(<span style="color:#a31515">"x-message-ttl"</span>, 3000); <span style="color:green">// 設定佇列中訊息存活時間為3秒</span> args.put(<span style="color:#a31515">"x-max-length"</span>, 5); <span style="color:green">// 設定佇列最大訊息數量為5</span> args.put(<span style="color:#a31515">"x-dead-letter-exchange"</span>, DLX_EXCHANGE_NAME); <span style="color:green">// 設定DLX</span> senderChannel.queueDeclare(QUEUE_NAME, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">true</span>, args); senderChannel.queueDeclare(DLX_QUEUE_NAME, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">true</span>, <span style="color:#0000ff">null</span>); senderChannel.exchangeDeclare(DLX_EXCHANGE_NAME, <span style="color:#a31515">"direct"</span>, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">true</span>, <span style="color:#0000ff">null</span>); <span style="color:green">// 將死信佇列繫結到死信交換機上,繫結鍵為 QUEUE_NAME。訊息傳送時使用的繫結鍵也會是 QUEUE_NAME</span> senderChannel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, QUEUE_NAME); <span style="color:green">// 釋出6個訊息</span> <span style="color:#0000ff">for</span> (<span style="color:#0000ff">int</span> i = 0; i < 6;) { String message = <span style="color:#a31515">"NO. "</span> + ++i; senderChannel.basicPublish(<span style="color:#a31515">""</span>, QUEUE_NAME, <span style="color:#0000ff">null</span>, message.getBytes(<span style="color:#a31515">"UTF-8"</span>)); } <span style="color:green">// 監視死信佇列</span> Consumer dlxConsumer = <span style="color:#0000ff">new</span> DefaultConsumer(consumerChannel) { <span style="color:#2b91af">@Override</span> <span style="color:#0000ff">public</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">handleDelivery</span>(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, <span style="color:#0000ff">byte</span>[] body) <span style="color:#0000ff">throws</span> IOException { String message = <span style="color:#0000ff">new</span> String(body, <span style="color:#a31515">"UTF-8"</span>); System.out.printf(<span style="color:#a31515">"consume: %s, envelop: %s, properties: %s\n"</span>, message, envelope, properties); } }; consumerChannel.basicConsume(DLX_QUEUE_NAME, <span style="color:#0000ff">true</span>, dlxConsumer); Thread.sleep(100); GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, <span style="color:#0000ff">false</span>); consumerChannel.basicReject(resp.getEnvelope().getDeliveryTag(), <span style="color:#0000ff">false</span>); } } </code></span>
程式碼第19行通過 x-dead-letter-exchange 引數定義了當前佇列指定的死信交換機是 DLX_EXCHANGE_NAME,當前佇列中所有的死信訊息都將交由 DLX_EXCHANGE_NAME 再一次分發到新的佇列。
程式碼第23行建立了死信交換機 DLX_EXCHANGE_NAME,可見,死信交換機就是普通的交換機。
從以上兩部分程式碼順序可知,死信交換機可以在被佇列引用後才建立。RabbitMQ 不會去驗證死信交換機設定是否有效,當死信訊息找不到指定的交換機時,死信訊息會被RabbitMQ安靜的丟棄,而不是丟擲異常。
既然死信交換機就是普通的交換機,那麼它就需要根據訊息的繫結鍵來分發訊息。死信訊息的繫結鍵遵守以下規則:當佇列指定了死信路由鍵(x-dead-letter-routing-key)引數時,死信訊息使用該引數指定的路由鍵作為自己的路由鍵;否則使用訊息原來的路由鍵
args.put("x-dead-letter-routing-key", "some-routing-key");
觀察日誌輸出:
consume: NO. 1, envelop: Envelope(deliveryTag=1, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type =null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:48 CST 2017, count=1, reason=maxlen, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
consume: NO. 2, envelop: Envelope(deliveryTag=3, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:48 CST 2017, count=1, reason=rejected, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
consume: NO. 3, envelop: Envelope(deliveryTag=4, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:51 CST 2017, count=1, reason=expired, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
......
死信訊息相比原來的訊息發生了一些改變。除了上面提到的 exchange 變為死信交換機名稱,routingKey 可能變為新的路由鍵(由 x-dead-letter-routing-key 引數決定),死信處理過程還在死信訊息頭中增加了 x-death 陣列資訊。每一次死信事件對應一個數組項,包含以下欄位,
- queue:本次死信事件發生前,訊息所屬佇列
- reason:死信原因,分為 rejected、expired 與 maxlen
- time:死信事件發生時間
- exchange:死信事件前,訊息釋出時指定的交換機
- routing-keys:死信事件前,訊息釋出時指定的路由鍵
- count:當前 queue 與 當前 reason 表述的死信事件發生的次數
- original-expiration:對於訊息 TTL,因為超時導致死信時,會移除 TTL(否則永遠觸發超時),該欄位記錄原來設定的訊息 TTL 值
當死信訊息再次觸發死信事件時,一般會產生一個新的陣列項,插到陣列的最前頭。但是,如果 x-death 陣列已經包含一個相同 queue 與 reason 的陣列項,則直接將該陣列項移到陣列最前頭,並將其 count 值加一。
未確認問題: 1. 當訊息被 reject 回佇列頭,同時又超過佇列長度限制時,怎麼處理? 試驗結果好像是直接變為 maxlen reason 的死信訊息
2. DLX很可能形成環(最簡單的場景就是DLX與原交換機相同),這時訊息有可能無限觸發死信事件嗎(例如超過佇列長度限制)? 官方說法是處在DLX環中的訊息,如果經歷了整個環都沒有觸發過 rejected reason 的死信事件,則拋棄該訊息。
這些問題有點偏,目前就不花時間研究了。