1. 程式人生 > >rabbitMq持久化操作

rabbitMq持久化操作

簡介

這裡寫圖片描述

queueDeclare(String queue, 
            boolean durable, 
            boolean exclusive, 
            Map<String, Object> arguments);
  • 1
  • 2
  • 3
  • 4
  • queue: 佇列名稱

  • durable: 是否持久化, 佇列的宣告預設是存放到記憶體中的,如果rabbitmq重啟會丟失,如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中,當rabbitmq重啟之後會讀取該資料庫

  • exclusive:是否排外的,有兩個作用,一:當連線關閉時connection.close()該佇列是否會自動刪除;二:該佇列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個佇列,沒有任何問題,如果是排外的,會對當前佇列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)

    一般等於true的話用於一個佇列只能有一個消費者來消費的場景

  • autoDelete:是否自動刪除,當最後一個消費者斷開連線之後佇列是否自動被刪除,可以通過RabbitMQ Management,檢視某個佇列的消費者數量,當consumers = 0時佇列就會自動刪除

  • arguments:
    佇列中的訊息什麼時候會自動被刪除?

    • Message TTL(x-message-ttl):設定佇列中的所有訊息的生存週期(統一為整個佇列的所有訊息設定生命週期), 也可以在釋出訊息的時候單獨為某個訊息指定剩餘生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,訊息會被從隊裡中刪除,注意是訊息被刪除,而不是佇列被刪除, 特性Features=TTL, 單獨為某條訊息設定過期時間AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
      channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

    • Auto Expire(x-expires): 當佇列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp

    • Max Length(x-max-length): 限定佇列的訊息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如儲存最新的100條訊息, Feature=Lim

    • Max Length Bytes(x-max-length-bytes): 限定佇列最大佔用的空間大小, 一般受限於記憶體、磁碟的大小, Features=Lim B

    • Dead letter exchange(x-dead-letter-exchange): 當佇列訊息長度大於最大長度、或者過期的等,將從佇列中刪除的訊息推送到指定的交換機中去而不是丟棄掉,Features=DLX

    • Dead letter routing key(x-dead-letter-routing-key):將刪除的訊息推送到指定交換機的指定路由鍵的佇列中去, Feature=DLK

    • Maximum priority(x-max-priority):優先順序佇列,宣告佇列時先定義最大優先順序值(定義最大值一般不要太大),在釋出訊息的時候指定該訊息的優先順序, 優先順序更高(數值更大的)的訊息先被消費,

    • Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將訊息儲存到磁碟上,不放在記憶體中,當消費者開始消費的時候才載入到記憶體中
    • Master locator(x-queue-master-locator)

注意

關於佇列的宣告,如果使用同一套引數進行聲明瞭,就不能再使用其他引數來宣告,要麼刪除該佇列重新刪除,可以使用命令列刪除也可以在RabbitMQ Management上刪除,要麼給佇列重新起一個名字。

佇列持久化

重啟RabbitMQ伺服器(可以通過rabbitmqctl stop_app關閉伺服器,rabbitmqctl start_app重啟伺服器),可以登入RabbitMQ Management—> Queues中可以看到之前宣告的佇列還存在

boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, arguments);
  • 1
  • 2

訊息持久化

設定訊息持久化必須先設定佇列持久化,要不然佇列不持久化,訊息持久化,佇列都不存在了,訊息存在還有什麼意義。訊息持久化需要將交換機持久化、佇列持久化、訊息持久化,才能最終達到持久化的目的

方式一:設定deliveryMode=2

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

String message = "Hello RabbitMQ: ";
// 設定訊息持久化
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);  // 設定訊息是否持久化,1: 非持久化 2:持久化

channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

方式二:設定BasicProperties為MessageProperties.PERSISTENT_TEXT_PLAIN

channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

String message = "Hello RabbitMQ: ";
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
  • 1
  • 2
  • 3
  • 4
  • 5

Message TTL訊息剩餘生存時間

統一設定佇列中的所有訊息的過期時間,例如設定10秒,10秒後這個佇列的訊息清零

方式一:為該佇列的所有訊息統一設定相同的宣告週期

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);

// 宣告佇列時指定佇列中的訊息過期時間
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", , message.getBytes("UTF-8"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

方式二:單獨為某條訊息單獨設定時間

// expiration: 設定單條訊息的過期時間

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

for(int i = 1; i <= 5; i++) {
   AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties()
       .builder().expiration( i * 1000 + "");

   channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Auto Expire自動過期

x-expires用於當多長時間沒有消費者訪問該佇列的時候,該佇列會自動刪除,可以設定一個延遲時間,如僅啟動一個生產者,10秒之後該佇列會刪除,或者啟動一個生產者,再啟動一個消費者,消費者執行結束後10秒,佇列也會被刪除

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-expires", 10000);

channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", , message.getBytes("UTF-8"));
  • 1
  • 2
  • 3
  • 4
  • 5

Max Length最大長度

x-max-length:用於指定佇列的長度,如果不指定,可以認為是無限長,例如指定佇列的長度是4,當超過4條訊息,前面的訊息將被刪除,給後面的訊息騰位

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-length", 4);

channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for(int i = 1; i <= 5; i++) {
     channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes("UTF-8"));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Max Length Bytes程式碼片段

x-max-length-bytes: 用於指定佇列儲存訊息的佔用空間大小,當達到最大值是會刪除之前的資料騰出空間

Map<String, Object> arguments = new HashMap<String, Object>();
rguments.put("x-max-length-bytes", 1024);

channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", , message.getBytes("UTF-8"));
  • 1
  • 2
  • 3
  • 4
  • 5

Maximum priority最大優先順序

x-max-priority: 設定訊息的優先順序,優先順序值越大,越被提前消費。

正常情況下不適用優先順序
Hello RabbitMQ: 1
Hello RabbitMQ: 2
Hello RabbitMQ: 3
Hello RabbitMQ: 4
Hello RabbitMQ: 5

使用優先順序順序正好相反
Hello RabbitMQ: 5
Hello RabbitMQ: 4
Hello RabbitMQ: 3
Hello RabbitMQ: 2
Hello RabbitMQ: 1

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-priority", 10);

channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for(int i = 1; i <= 5; i++) {
    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties()
        .builder().priority(i);
     channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

這裡寫圖片描述

Dead letter exchange(死亡交換機) 和 Dead letter routing key(死亡路由鍵)

當佇列中的訊息過期,或者達到最大長度而被刪除,或者達到最大空間時而被刪除時,可以將這些被刪除的資訊推送到其他交換機中,讓其他消費者訂閱這些被刪除的訊息,處理這些訊息

public void testBasicPublish() throws IOException, TimeoutException, InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    factory.setPort(AMQP.PROTOCOL.PORT);
    factory.setUsername("mengday");
    factory.setPassword("mengday");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 宣告一個接收被刪除的訊息的交換機和佇列
    String EXCHANGE_DEAD_NAME = "exchange.dead";
    String QUEUE_DEAD_NAME = "queue_dead";
    channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
    channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, );
    channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");

    String EXCHANGE_NAME = "exchange.fanout";
    String QUEUE_NAME = "queue_name";
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

    Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-message-ttl", 15000);
    arguments.put("x-max-length", 4);
    arguments.put("x-max-length-bytes", 1024);
    arguments.put("x-expires", 30000);

    arguments.put("x-dead-letter-exchange", "exchange.dead");
    arguments.put("x-dead-letter-routing-key", "routingkey.dead");
    channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    String message = "Hello RabbitMQ: ";
    for(int i = 1; i <= 5; i++) {
        channel.basicPublish(EXCHANGE_NAME, "", , (message + i).getBytes("UTF-8"));
    }


    channel.close();
    connection.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

剛開始由於佇列長度是4,總共傳送5條訊息,所以最早進入佇列的訊息1將被刪除掉,被推送到“死亡佇列”中,所以看到普通佇列的訊息為4條,死亡佇列的訊息為1條,是訊息1
這裡寫圖片描述

隨著時間的流逝,普通佇列中的訊息都該過期了,所以訊息2、3、4、5都被推送到死亡佇列,所以死亡佇列訊息是5條,普通佇列的訊息條數為0
這裡寫圖片描述

再隨著時間的流逝,普通佇列過了指定時間沒有被消費者訪問,這個佇列自動被刪除了,所以看不到普通隊列了,只有死亡佇列
這裡寫圖片描述

檢視死亡佇列的訊息可以得知,訊息一死亡的原因是maxlen達到了最大長度,訊息2、3、4、5都是因為生存時間到了導致死亡的
這裡寫圖片描述
這裡寫圖片描述

一個比較雜的綜合示例

關於消費者就不用程式碼來獲取訊息了,直接在RabbitMQ Management點選某個佇列的名字,然後Get Message(s) 即可獲取

該示例使用很多引數配置,可能實際使用不會像這樣用,因為這樣好像不太配套。

public class Producer {
    @Test
    public void testBasicPublish() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 宣告一個接收被刪除的訊息的交換機和佇列
        String EXCHANGE_DEAD_NAME = "exchange.dead";
        String QUEUE_DEAD_NAME = "queue_dead";
        channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, );
        channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");

        String EXCHANGE_NAME = "exchange.fanout";
        String QUEUE_NAME = "queue_name";
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);


        Map<String, Object> arguments = new HashMap<String, Object>();
        // 統一設定佇列中的所有訊息的過期時間
        arguments.put("x-message-ttl", 30000);
        // 設定超過多少毫秒沒有消費者來訪問佇列,就刪除佇列的時間
        arguments.put("x-expires", 20000);
        // 設定佇列的最新的N條訊息,如果超過N條,前面的訊息將從佇列中移除掉
        arguments.put("x-max-length", 4);
        // 設定佇列的內容的最大空間,超過該閾值就刪除之前的訊息
        arguments.put("x-max-length-bytes", 1024);
        // 將刪除的訊息推送到指定的交換機,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同時設定
        arguments.put("x-dead-letter-exchange", "exchange.dead");
        // 將刪除的訊息推送到指定的交換機對應的路由鍵
        arguments.put("x-dead-letter-routing-key", "routingkey.dead");
        // 設定訊息的優先順序,優先順序大的優先被消費
        arguments.put("x-max-priority", 10);
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");


        String message = "Hello RabbitMQ: ";
        for(int i = 1; i <= 5; i++) {
            // expiration: 設定單條訊息的過期時間
            AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i).expiration( i * 1000 + "");
            channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
        }


        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

執行效果

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

疑惑: 單獨使用arguments.put(“x-max-length”, 4); arguments.put(“x-dead-letter-exchange”, “exchange.dead”);arguments.put(“x-dead-letter-routing-key”, “routingkey.dead”);發現訊息1先會觸發maxlen條件,而被推送到queue_dead佇列中,由此可以得出,當達到最大長度時,先刪除的是先被新增到佇列的訊息。但是如果很多條件一塊同時使用可能現象不太好解釋,如上例如,實際結果是訊息5因為maxlen而被推送到死亡佇列中,訊息1、2、3、4都是由於expired過期導致的,難道不是訊息1由於maxlen被推送到希望佇列,而2、3、4、5是由於過期導致的嗎?還有上面程式碼如果將x-max-len該為3,在死亡佇列中獲取訊息的先後順序是4、5、3、2、1不是優先順序高的先被消費嗎,為啥不是5、4、3、2、1 難道是條件用的太多了,都亂了??? 明白的同學請留言,謝謝!

經過實際測試,引數單獨用或者和其他引數合理搭配使用都沒問題,如果是像上例一塊都用,大雜燴,搞不懂結果。

// 檢查佇列是否存在,不存在拋異常
channel.queueDeclarePassive(“queue_name”);