1. 程式人生 > >RabbitMQ之TTL(Time-To-Live 過期時間)

RabbitMQ之TTL(Time-To-Live 過期時間)

1. 概述

RabbitMQ可以對訊息和佇列設定TTL. 目前有兩種方法可以設定。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就稱為dead message, 消費者將無法再收到該訊息。

2. 設定佇列屬性

通過佇列屬性設定訊息TTL的方法是在queue.declare方法中加入x-message-ttl引數,單位為ms.
例如:

package com.vms.test.zzh.rabbitmq.self;

import
com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.apache.commons.collections.map.HashedMap; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * Created by hidden on 2017/2/7. */
public class RBttlTest { public static final String ip = "xx.xx.xx.73"; public static final int port = 5672; public static final String username = "root"; public static final String password = "root"; public static final String queueName = "queue.ttl.test"; public static final
String exchangeName = "exchange.ttl.test"; public static final String routingKey = "ttl"; public static final Boolean durable = true; public static final Boolean exclusive = false; public static final Boolean autoDelete = false; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Map<String, Object> argss = new HashMap<String, Object>(); argss.put("vhost", "/"); argss.put("username","root"); argss.put("password", "root"); argss.put("x-message-ttl",6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }

通過RabbitMQ的管理頁面可以看到有新的queue生成,並標記為TTL(上面的程式碼同時會將此queue設定為durable=true,以及包含相關引數,比如vhost=/),如下圖所示:
這裡寫圖片描述
另外也可以同rabbitmq的命令列模式來設定:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

還可以通過HTTP介面呼叫:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' 
http://localhost:15672/api/queues/{vhost}/{queuename}

如果不設定TTL,則表示此訊息不會過期。如果將TTL設定為0,則表示除非此時可以直接將訊息投遞到消費者,否則該訊息會被立即丟棄,這個特性可以部分替代RabbitMQ3.0以前支援的immediate引數,之所以所部分代替,是應為immediate引數在投遞失敗會有basic.return方法將訊息體返回(這個功能可以利用死信佇列來實現)。

3. 設定訊息屬性

針對每條訊息設定TTL的方法是在basic.publish方法中加入expiration的屬性引數,單位為ms.
關鍵程式碼:

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.deliveryMode(2);
        builder.expiration("6000");
        AMQP.BasicProperties  properties = builder.build();

        channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());

也可以寫成:

AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());

具體程式碼如下所示:

public static void main(String[] args) throws InterruptedException {
    sendTTLMessage();
    TimeUnit.SECONDS.sleep(5);
    consumeTTLMessage();
}

public static void sendTTLMessage(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(ip);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.deliveryMode(2);
        builder.expiration("6000");
        AMQP.BasicProperties  properties = builder.build();

        channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());

        channel.close();
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

public static void consumeTTLMessage(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(ip);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [X] Received '" + message + "'");

        channel.close();
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

當TimeUnit.SECONDS.sleep(5);設定為5s時可以消費到訊息,當設定為7s時,則消費不到訊息,因為此時已經超時了。

還可以通過HTTP介面呼叫如下:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}'  http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

4. 對比

對於第一種設定佇列TTL屬性的方法,一旦訊息過期,就會從佇列中抹去,而第二種方法裡,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期時在即將投遞到消費者之前判定的,為什麼兩者得處理方法不一致?因為第一種方法裡,佇列中已過期的訊息肯定在佇列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期訊息即可,而第二種方法裡,每條訊息的過期時間不同,如果要刪除所有過期訊息,勢必要掃描整個佇列,所以不如等到此訊息即將被消費時再判定是否過期,如果過期,再進行刪除。

5. Queue TTL

queue.declare 命令中的 x-expires 引數控制 queue 被自動刪除前可以處於未使用狀態的時間。未使用的意思是 queue 上沒有任何 consumer ,queue 沒有被重新宣告,並且在過期時間段內未呼叫過 basic.get 命令。該方式可用於,例如,RPC-style 的回覆 queue, 其中許多 queue 會被創建出來,但是卻從未被使用。

伺服器會確保在過期時間到達後 queue 被刪除,但是不保證刪除的動作有多麼的及時。在伺服器重啟後,持久化的 queue 的超時時間將重新計算。

用於表示超期時間的 x-expires 引數值以毫秒為單位,並且服從和 x-message-ttl 一樣的約束條件,且不能設定為 0 。所以,如果該引數設定為 1000 ,則表示該 queue 如果在 1s之內未被使用則會被刪除。

下面的 Java 示例建立了一個 queue ,其會在 30 分鐘不使用的情況下判定為超時。

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 1800000);  
channel.queueDeclare("myqueue", false, false, false, args);  

參考資料

歡迎支援《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。