rabbitMq持久化操作
簡介
queueDeclare(String queue,
boolean durable,
boolean exclusive,
Map<String, Object> arguments);
- 1
- 2
- 3
- 4
queue: 佇列名稱
durable: 是否持久化, 佇列的宣告預設是存放到記憶體中的,如果rabbitmq重啟會丟失,如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中,當rabbitmq重啟之後會讀取該資料庫
exclusive:是否排外的,有兩個作用,一:當連線關閉時connection.close()該佇列是否會自動刪除;二:該佇列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個佇列,沒有任何問題,如果是排外的,會對當前佇列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method:
#method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)
autoDelete:是否自動刪除,當最後一個消費者斷開連線之後佇列是否自動被刪除,可以通過RabbitMQ Management,檢視某個佇列的消費者數量,當consumers = 0時佇列就會自動刪除
arguments:
佇列中的訊息什麼時候會自動被刪除?Message TTL(x-message-ttl):設定佇列中的所有訊息的生存週期(統一為整個佇列的所有訊息設定生命週期), 也可以在釋出訊息的時候單獨為某個訊息指定剩餘生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,訊息會被從隊裡中刪除,注意是訊息被刪除,而不是佇列被刪除, 特性Features=TTL, 單獨為某條訊息設定過期時間AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));Auto Expire(x-expires): 當佇列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp
Max Length(x-max-length): 限定佇列的訊息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如儲存最新的100條訊息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定佇列最大佔用的空間大小, 一般受限於記憶體、磁碟的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 當佇列訊息長度大於最大長度、或者過期的等,將從佇列中刪除的訊息推送到指定的交換機中去而不是丟棄掉,Features=DLX
Dead letter routing key(x-dead-letter-routing-key):將刪除的訊息推送到指定交換機的指定路由鍵的佇列中去, Feature=DLK
Maximum priority(x-max-priority):優先順序佇列,宣告佇列時先定義最大優先順序值(定義最大值一般不要太大),在釋出訊息的時候指定該訊息的優先順序, 優先順序更高(數值更大的)的訊息先被消費,
- Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將訊息儲存到磁碟上,不放在記憶體中,當消費者開始消費的時候才載入到記憶體中
- Master locator(x-queue-master-locator)
注意
關於佇列的宣告,如果使用同一套引數進行聲明瞭,就不能再使用其他引數來宣告,要麼刪除該佇列重新刪除,可以使用命令列刪除也可以在RabbitMQ Management上刪除,要麼給佇列重新起一個名字。
佇列持久化
重啟RabbitMQ伺服器(可以通過rabbitmqctl stop_app關閉伺服器,rabbitmqctl start_app重啟伺服器),可以登入RabbitMQ Management—> Queues中可以看到之前宣告的佇列還存在
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, arguments);
- 1
- 2
訊息持久化
設定訊息持久化必須先設定佇列持久化,要不然佇列不持久化,訊息持久化,佇列都不存在了,訊息存在還有什麼意義。訊息持久化需要將交換機持久化、佇列持久化、訊息持久化,才能最終達到持久化的目的
方式一:設定deliveryMode=2
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
// 設定訊息持久化
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2); // 設定訊息是否持久化,1: 非持久化 2:持久化
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
方式二:設定BasicProperties為MessageProperties.PERSISTENT_TEXT_PLAIN
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
- 1
- 2
- 3
- 4
- 5
Message TTL訊息剩餘生存時間
統一設定佇列中的所有訊息的過期時間,例如設定10秒,10秒後這個佇列的訊息清零
方式一:為該佇列的所有訊息統一設定相同的宣告週期
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
// 宣告佇列時指定佇列中的訊息過期時間
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", , message.getBytes("UTF-8"));
- 1
- 2
- 3
- 4
- 5
- 6
方式二:單獨為某條訊息單獨設定時間
// expiration: 設定單條訊息的過期時間
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
for(int i = 1; i <= 5; i++) {
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties()
.builder().expiration( i * 1000 + "");
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Auto Expire自動過期
x-expires用於當多長時間沒有消費者訪問該佇列的時候,該佇列會自動刪除,可以設定一個延遲時間,如僅啟動一個生產者,10秒之後該佇列會刪除,或者啟動一個生產者,再啟動一個消費者,消費者執行結束後10秒,佇列也會被刪除
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-expires", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", , message.getBytes("UTF-8"));
- 1
- 2
- 3
- 4
- 5
Max Length最大長度
x-max-length:用於指定佇列的長度,如果不指定,可以認為是無限長,例如指定佇列的長度是4,當超過4條訊息,前面的訊息將被刪除,給後面的訊息騰位
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-length", 4);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for(int i = 1; i <= 5; i++) {
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes("UTF-8"));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
Max Length Bytes程式碼片段
x-max-length-bytes: 用於指定佇列儲存訊息的佔用空間大小,當達到最大值是會刪除之前的資料騰出空間
Map<String, Object> arguments = new HashMap<String, Object>();
rguments.put("x-max-length-bytes", 1024);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", , message.getBytes("UTF-8"));
- 1
- 2
- 3
- 4
- 5
Maximum priority最大優先順序
x-max-priority: 設定訊息的優先順序,優先順序值越大,越被提前消費。
正常情況下不適用優先順序
Hello RabbitMQ: 1
Hello RabbitMQ: 2
Hello RabbitMQ: 3
Hello RabbitMQ: 4
Hello RabbitMQ: 5
使用優先順序順序正好相反
Hello RabbitMQ: 5
Hello RabbitMQ: 4
Hello RabbitMQ: 3
Hello RabbitMQ: 2
Hello RabbitMQ: 1
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for(int i = 1; i <= 5; i++) {
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties()
.builder().priority(i);
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Dead letter exchange(死亡交換機) 和 Dead letter routing key(死亡路由鍵)
當佇列中的訊息過期,或者達到最大長度而被刪除,或者達到最大空間時而被刪除時,可以將這些被刪除的資訊推送到其他交換機中,讓其他消費者訂閱這些被刪除的訊息,處理這些訊息
public void testBasicPublish() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告一個接收被刪除的訊息的交換機和佇列
String EXCHANGE_DEAD_NAME = "exchange.dead";
String QUEUE_DEAD_NAME = "queue_dead";
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, );
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
String EXCHANGE_NAME = "exchange.fanout";
String QUEUE_NAME = "queue_name";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 15000);
arguments.put("x-max-length", 4);
arguments.put("x-max-length-bytes", 1024);
arguments.put("x-expires", 30000);
arguments.put("x-dead-letter-exchange", "exchange.dead");
arguments.put("x-dead-letter-routing-key", "routingkey.dead");
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
for(int i = 1; i <= 5; i++) {
channel.basicPublish(EXCHANGE_NAME, "", , (message + i).getBytes("UTF-8"));
}
channel.close();
connection.close();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
剛開始由於佇列長度是4,總共傳送5條訊息,所以最早進入佇列的訊息1將被刪除掉,被推送到“死亡佇列”中,所以看到普通佇列的訊息為4條,死亡佇列的訊息為1條,是訊息1
隨著時間的流逝,普通佇列中的訊息都該過期了,所以訊息2、3、4、5都被推送到死亡佇列,所以死亡佇列訊息是5條,普通佇列的訊息條數為0
再隨著時間的流逝,普通佇列過了指定時間沒有被消費者訪問,這個佇列自動被刪除了,所以看不到普通隊列了,只有死亡佇列
檢視死亡佇列的訊息可以得知,訊息一死亡的原因是maxlen達到了最大長度,訊息2、3、4、5都是因為生存時間到了導致死亡的
一個比較雜的綜合示例
關於消費者就不用程式碼來獲取訊息了,直接在RabbitMQ Management點選某個佇列的名字,然後Get Message(s) 即可獲取
該示例使用很多引數配置,可能實際使用不會像這樣用,因為這樣好像不太配套。
public class Producer {
@Test
public void testBasicPublish() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告一個接收被刪除的訊息的交換機和佇列
String EXCHANGE_DEAD_NAME = "exchange.dead";
String QUEUE_DEAD_NAME = "queue_dead";
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, );
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
String EXCHANGE_NAME = "exchange.fanout";
String QUEUE_NAME = "queue_name";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map<String, Object> arguments = new HashMap<String, Object>();
// 統一設定佇列中的所有訊息的過期時間
arguments.put("x-message-ttl", 30000);
// 設定超過多少毫秒沒有消費者來訪問佇列,就刪除佇列的時間
arguments.put("x-expires", 20000);
// 設定佇列的最新的N條訊息,如果超過N條,前面的訊息將從佇列中移除掉
arguments.put("x-max-length", 4);
// 設定佇列的內容的最大空間,超過該閾值就刪除之前的訊息
arguments.put("x-max-length-bytes", 1024);
// 將刪除的訊息推送到指定的交換機,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同時設定
arguments.put("x-dead-letter-exchange", "exchange.dead");
// 將刪除的訊息推送到指定的交換機對應的路由鍵
arguments.put("x-dead-letter-routing-key", "routingkey.dead");
// 設定訊息的優先順序,優先順序大的優先被消費
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
for(int i = 1; i <= 5; i++) {
// expiration: 設定單條訊息的過期時間
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i).expiration( i * 1000 + "");
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
執行效果
疑惑: 單獨使用arguments.put(“x-max-length”, 4); arguments.put(“x-dead-letter-exchange”, “exchange.dead”);arguments.put(“x-dead-letter-routing-key”, “routingkey.dead”);發現訊息1先會觸發maxlen條件,而被推送到queue_dead佇列中,由此可以得出,當達到最大長度時,先刪除的是先被新增到佇列的訊息。但是如果很多條件一塊同時使用可能現象不太好解釋,如上例如,實際結果是訊息5因為maxlen而被推送到死亡佇列中,訊息1、2、3、4都是由於expired過期導致的,難道不是訊息1由於maxlen被推送到希望佇列,而2、3、4、5是由於過期導致的嗎?還有上面程式碼如果將x-max-len該為3,在死亡佇列中獲取訊息的先後順序是4、5、3、2、1不是優先順序高的先被消費嗎,為啥不是5、4、3、2、1 難道是條件用的太多了,都亂了??? 明白的同學請留言,謝謝!
經過實際測試,引數單獨用或者和其他引數合理搭配使用都沒問題,如果是像上例一塊都用,大雜燴,搞不懂結果。
// 檢查佇列是否存在,不存在拋異常
channel.queueDeclarePassive(“queue_name”);