RabbitMQ之TTL(Time-To-Live 過期時間)
1. 概述
RabbitMQ可以對訊息和佇列設定TTL. 目前有兩種方法可以設定。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就稱為dead message, 消費者將無法再收到該訊息。
2. 設定佇列屬性
通過佇列屬性設定訊息TTL的方法是在queue.declare方法中加入x-message-ttl引數,單位為ms.
例如:
package com.vms.test.zzh.rabbitmq.self;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.collections.map.HashedMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* Created by hidden on 2017/2/7.
*/
public class RBttlTest {
public static final String ip = "xx.xx.xx.73";
public static final int port = 5672;
public static final String username = "root";
public static final String password = "root";
public static final String queueName = "queue.ttl.test";
public static final String exchangeName = "exchange.ttl.test";
public static final String routingKey = "ttl";
public static final Boolean durable = true;
public static final Boolean exclusive = false;
public static final Boolean autoDelete = false;
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("vhost", "/");
argss.put("username","root");
argss.put("password", "root");
argss.put("x-message-ttl",6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
通過RabbitMQ的管理頁面可以看到有新的queue生成,並標記為TTL(上面的程式碼同時會將此queue設定為durable=true,以及包含相關引數,比如vhost=/),如下圖所示:
另外也可以同rabbitmq的命令列模式來設定:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
還可以通過HTTP介面呼叫:
$ curl -i -u guest:guest -H "content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://localhost:15672/api/queues/{vhost}/{queuename}
如果不設定TTL,則表示此訊息不會過期。如果將TTL設定為0,則表示除非此時可以直接將訊息投遞到消費者,否則該訊息會被立即丟棄,這個特性可以部分替代RabbitMQ3.0以前支援的immediate引數,之所以所部分代替,是應為immediate引數在投遞失敗會有basic.return方法將訊息體返回(這個功能可以利用死信佇列來實現)。
3. 設定訊息屬性
針對每條訊息設定TTL的方法是在basic.publish方法中加入expiration的屬性引數,單位為ms.
關鍵程式碼:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
也可以寫成:
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
具體程式碼如下所示:
public static void main(String[] args) throws InterruptedException {
sendTTLMessage();
TimeUnit.SECONDS.sleep(5);
consumeTTLMessage();
}
public static void sendTTLMessage(){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void consumeTTLMessage(){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [X] Received '" + message + "'");
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
當TimeUnit.SECONDS.sleep(5);設定為5s時可以消費到訊息,當設定為7s時,則消費不到訊息,因為此時已經超時了。
還可以通過HTTP介面呼叫如下:
$ curl -i -u guest:guest -H "content-type:application/json" -XPOST -d'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}' http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish
4. 對比
對於第一種設定佇列TTL屬性的方法,一旦訊息過期,就會從佇列中抹去,而第二種方法裡,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期時在即將投遞到消費者之前判定的,為什麼兩者得處理方法不一致?因為第一種方法裡,佇列中已過期的訊息肯定在佇列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期訊息即可,而第二種方法裡,每條訊息的過期時間不同,如果要刪除所有過期訊息,勢必要掃描整個佇列,所以不如等到此訊息即將被消費時再判定是否過期,如果過期,再進行刪除。
5. Queue TTL
queue.declare 命令中的 x-expires 引數控制 queue 被自動刪除前可以處於未使用狀態的時間。未使用的意思是 queue 上沒有任何 consumer ,queue 沒有被重新宣告,並且在過期時間段內未呼叫過 basic.get 命令。該方式可用於,例如,RPC-style 的回覆 queue, 其中許多 queue 會被創建出來,但是卻從未被使用。
伺服器會確保在過期時間到達後 queue 被刪除,但是不保證刪除的動作有多麼的及時。在伺服器重啟後,持久化的 queue 的超時時間將重新計算。
用於表示超期時間的 x-expires 引數值以毫秒為單位,並且服從和 x-message-ttl 一樣的約束條件,且不能設定為 0 。所以,如果該引數設定為 1000 ,則表示該 queue 如果在 1s之內未被使用則會被刪除。
下面的 Java 示例建立了一個 queue ,其會在 30 分鐘不使用的情況下判定為超時。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
參考資料
歡迎支援《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。