1. 程式人生 > >【轉】RabbitMQ入門_11_DLX

【轉】RabbitMQ入門_11_DLX

佇列中的訊息可能會成為死信訊息(dead lettered)。讓訊息成為死信訊息的事件有:

  • 訊息被取消確認(nack 或 reject),且設定為不重入佇列(requeue = false)
  • 訊息TTL過期
  • 佇列達到長度限制

死信訊息會被死信交換機(Dead Letter Exchange, DLX)重新發布。

gordon.study.rabbitmq.dlx.TestDlx.java

<span style="color:#333333"><code><span style="color:#0000ff">public</span> <span style="color:#0000ff">class</span> <span style="color:#a31515">TestDlx</span> {
 
    <span style="color:#0000ff">private</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">final</span> String DLX_EXCHANGE_NAME = <span style="color:#a31515">"exchangeDLX"</span>;
 
    <span style="color:#0000ff">private</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">final</span> String DLX_QUEUE_NAME = <span style="color:#a31515">"queueDLX"</span>;
 
    <span style="color:#0000ff">private</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">final</span> String QUEUE_NAME = <span style="color:#a31515">"queue"</span>;
 
    <span style="color:#0000ff">public</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">main</span>(String[] argv) <span style="color:#0000ff">throws</span> Exception {
        ConnectionFactory factory = <span style="color:#0000ff">new</span> ConnectionFactory();
        factory.setHost(<span style="color:#a31515">"localhost"</span>);
        Connection connection = factory.newConnection();
        Channel senderChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
 
        Map<String, Object> args = <span style="color:#0000ff">new</span> HashMap<String, Object>();
        args.put(<span style="color:#a31515">"x-message-ttl"</span>, 3000); <span style="color:green">// 設定佇列中訊息存活時間為3秒</span>
        args.put(<span style="color:#a31515">"x-max-length"</span>, 5); <span style="color:green">// 設定佇列最大訊息數量為5</span>
        args.put(<span style="color:#a31515">"x-dead-letter-exchange"</span>, DLX_EXCHANGE_NAME); <span style="color:green">// 設定DLX</span>
        senderChannel.queueDeclare(QUEUE_NAME, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">true</span>, args);
 
        senderChannel.queueDeclare(DLX_QUEUE_NAME, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">true</span>, <span style="color:#0000ff">null</span>);
        senderChannel.exchangeDeclare(DLX_EXCHANGE_NAME, <span style="color:#a31515">"direct"</span>, <span style="color:#0000ff">false</span>, <span style="color:#0000ff">true</span>, <span style="color:#0000ff">null</span>);
        <span style="color:green">// 將死信佇列繫結到死信交換機上,繫結鍵為 QUEUE_NAME。訊息傳送時使用的繫結鍵也會是 QUEUE_NAME</span>
        senderChannel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, QUEUE_NAME);
 
        <span style="color:green">// 釋出6個訊息</span>
        <span style="color:#0000ff">for</span> (<span style="color:#0000ff">int</span> i = 0; i < 6;) {
            String message = <span style="color:#a31515">"NO. "</span> + ++i;
            senderChannel.basicPublish(<span style="color:#a31515">""</span>, QUEUE_NAME, <span style="color:#0000ff">null</span>, message.getBytes(<span style="color:#a31515">"UTF-8"</span>));
        }
 
        <span style="color:green">// 監視死信佇列</span>
        Consumer dlxConsumer = <span style="color:#0000ff">new</span> DefaultConsumer(consumerChannel) {
            <span style="color:#2b91af">@Override</span>
            <span style="color:#0000ff">public</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">handleDelivery</span>(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, <span style="color:#0000ff">byte</span>[] body)
                    <span style="color:#0000ff">throws</span> IOException {
                String message = <span style="color:#0000ff">new</span> String(body, <span style="color:#a31515">"UTF-8"</span>);
                System.out.printf(<span style="color:#a31515">"consume: %s, envelop: %s, properties: %s\n"</span>, message, envelope, properties);
            }
        };
        consumerChannel.basicConsume(DLX_QUEUE_NAME, <span style="color:#0000ff">true</span>, dlxConsumer);
 
        Thread.sleep(100);
        GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, <span style="color:#0000ff">false</span>);
        consumerChannel.basicReject(resp.getEnvelope().getDeliveryTag(), <span style="color:#0000ff">false</span>);
    }
}
</code></span>

程式碼第19行通過 x-dead-letter-exchange 引數定義了當前佇列指定的死信交換機是 DLX_EXCHANGE_NAME,當前佇列中所有的死信訊息都將交由 DLX_EXCHANGE_NAME 再一次分發到新的佇列。

程式碼第23行建立了死信交換機 DLX_EXCHANGE_NAME,可見,死信交換機就是普通的交換機

從以上兩部分程式碼順序可知,死信交換機可以在被佇列引用後才建立。RabbitMQ 不會去驗證死信交換機設定是否有效,當死信訊息找不到指定的交換機時,死信訊息會被RabbitMQ安靜的丟棄,而不是丟擲異常

既然死信交換機就是普通的交換機,那麼它就需要根據訊息的繫結鍵來分發訊息。死信訊息的繫結鍵遵守以下規則:當佇列指定了死信路由鍵(x-dead-letter-routing-key)引數時,死信訊息使用該引數指定的路由鍵作為自己的路由鍵;否則使用訊息原來的路由鍵

。所以,示例程式碼中,死信訊息的路由鍵是程式碼第30行釋出訊息時指定的路由鍵 QUEUE_NAME。如果想修改死信訊息路由鍵,可以在第19行下面增加

        args.put("x-dead-letter-routing-key", "some-routing-key");

觀察日誌輸出:

consume: NO. 1, envelop: Envelope(deliveryTag=1, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type
=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:48 CST 2017, count=1, reason=maxlen, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) consume: NO. 2, envelop: Envelope(deliveryTag=3, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:48 CST 2017, count=1, reason=rejected, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) consume: NO. 3, envelop: Envelope(deliveryTag=4, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:51 CST 2017, count=1, reason=expired, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) ......

死信訊息相比原來的訊息發生了一些改變。除了上面提到的 exchange 變為死信交換機名稱,routingKey 可能變為新的路由鍵(由 x-dead-letter-routing-key 引數決定),死信處理過程還在死信訊息頭中增加了 x-death 陣列資訊。每一次死信事件對應一個數組項,包含以下欄位,

  • queue:本次死信事件發生前,訊息所屬佇列
  • reason:死信原因,分為 rejected、expired 與 maxlen
  • time:死信事件發生時間
  • exchange:死信事件前,訊息釋出時指定的交換機
  • routing-keys:死信事件前,訊息釋出時指定的路由鍵
  • count:當前 queue 與 當前 reason 表述的死信事件發生的次數
  • original-expiration:對於訊息 TTL,因為超時導致死信時,會移除 TTL(否則永遠觸發超時),該欄位記錄原來設定的訊息 TTL 值

當死信訊息再次觸發死信事件時,一般會產生一個新的陣列項,插到陣列的最前頭。但是,如果 x-death 陣列已經包含一個相同 queue 與 reason 的陣列項,則直接將該陣列項移到陣列最前頭,並將其 count 值加一。  

未確認問題: 1. 當訊息被 reject 回佇列頭,同時又超過佇列長度限制時,怎麼處理? 試驗結果好像是直接變為 maxlen reason 的死信訊息

2. DLX很可能形成環(最簡單的場景就是DLX與原交換機相同),這時訊息有可能無限觸發死信事件嗎(例如超過佇列長度限制)? 官方說法是處在DLX環中的訊息,如果經歷了整個環都沒有觸發過 rejected reason 的死信事件,則拋棄該訊息。

這些問題有點偏,目前就不花時間研究了。