1. 程式人生 > >RabbitMQ之訊息持久化(佇列持久化、訊息持久化)

RabbitMQ之訊息持久化(佇列持久化、訊息持久化)

訊息的可靠性是RabbitMQ的一大特色,那麼RabbitMQ是如何保證訊息可靠性的呢——訊息持久化。 
為了保證RabbitMQ在退出或者crash等異常情況下資料沒有丟失,需要將queue,exchange和Message都持久化。

queue的持久化

queue的持久化是通過durable=true來實現的。 
一般程式中這麼使用:

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name"
, true, false, false, null);
  • 1
  • 2
  • 3

關鍵的是第二個引數設定為true,即durable=true.

Channel類中queueDeclare的完整定義如下:

    /**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if
we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for
the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

引數說明:

queue:queue的名稱

exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次申明它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:1. 排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一連線建立的排他佇列;2.“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同;3.即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的,這種佇列適用於一個客戶端傳送讀取訊息的應用場景。

autoDelete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。

queueDeclare相關的有4種方法,分別是:

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

其中需要說明的是queueDeclarePassive(String queue)可以用來檢測一個queue是否已經存在。如果該佇列存在,則會返回true;如果不存在,就會返回異常,但是不會建立新的佇列。

訊息的持久化

如過將queue的持久化標識durable設定為true,則代表是一個持久的佇列,那麼在服務重啟之後,也會存在,因為服務會把持久化的queue存放在硬碟上,當服務重啟的時候,會重新什麼之前被持久化的queue。佇列是可以被持久化,但是裡面的訊息是否為持久化那還要看訊息的持久化設定。也就是說,重啟之前那個queue裡面還沒有發出去的訊息的話,重啟之後那佇列裡面是不是還存在原來的訊息,這個就要取決於發生著在傳送訊息時對訊息的設定了。 
如果要在重啟後保持訊息的持久化必須設定訊息是持久化的標識。

設定訊息的持久化:

channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
  • 1

這裡的關鍵是:MessageProperties.PERSISTENT_TEXT_PLAIN 
首先看一下basicPublish的方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5

exchange表示exchange的名稱 
routingKey表示routingKey的名稱 
body代表傳送的訊息體 
有關mandatory和immediate的詳細解釋可以參考:RabbitMQ之mandatory和immediate.

這裡關鍵的是BasicProperties props這個引數了,這裡看下BasicProperties的定義:

public BasicProperties(
            String contentType,//訊息型別如:text/plain
            String contentEncoding,//編碼
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//優先順序
            String correlationId,
            String replyTo,//反饋佇列
            String expiration,//expiration到期時間
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

這裡的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。

上面的實現程式碼使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那麼這個又是什麼呢?

public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
                        null,
                        null,
                        2,
                        0, null, null, null,
                        null, null, null, null,
                        null, null);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以看到這其實就是講deliveryMode設定為2的BasicProperties的物件,為了方便程式設計而出現的一個東東。 
換一種實現方式:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
  • 1
  • 2
  • 3
  • 4

設定了佇列和訊息的持久化之後,當broker服務重啟的之後,訊息依舊存在。單隻設定佇列持久化,重啟之後訊息會丟失;單隻設定訊息的持久化,重啟之後佇列消失,既而訊息也丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。

exchange的持久化

上面闡述了佇列的持久化和訊息的持久化,如果不設定exchange的持久化對訊息的可靠性來說沒有什麼影響,但是同樣如果exchange不設定持久化,那麼當broker服務重啟之後,exchange將不復存在,那麼既而傳送方rabbitmq producer就無法正常傳送訊息。這裡博主建議,同樣設定exchange的持久化。exchange的持久化設定也特別簡單,方法如下:

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在宣告的時候講durable欄位設定為true即可。

進一步討論

1.將queue,exchange, message等都設定了持久化之後就能保證100%保證資料不丟失了嚒? 
答案是否定的。 
首先,從consumer端來說,如果這時autoAck=true,那麼當consumer接收到相關訊息之後,還沒來得及處理就crash掉了,那麼這樣也算資料丟失,這種情況也好處理,只需將autoAck設定為true(方法定義如下),然後在正確處理完訊息之後進行手動ack(channel.basicAck).

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • 1

其次,關鍵的問題是訊息在正確存入RabbitMQ之後,還需要有一段時間(這個時間很短,但不可忽視)才能存入磁碟之中,RabbitMQ並不是為每條訊息都做fsync的處理,可能僅僅儲存到cache中而不是物理磁碟上,在這段時間內RabbitMQ broker發生crash, 訊息儲存到cache但是還沒來得及落盤,那麼這些訊息將會丟失。那麼這個怎麼解決呢?首先可以引入RabbitMQ的mirrored-queue即映象佇列,這個相當於配置了副本,當master在此特殊時間內crash掉,可以自動切換到slave,這樣有效的保障了HA, 除非整個叢集都掛掉,這樣也不能完全的100%保障RabbitMQ不丟訊息,但比沒有mirrored-queue的要好很多,很多現實生產環境下都是配置了mirrored-queue的。還有要在producer引入事務機制或者Confirm機制來確保訊息已經正確的傳送至broker端,有關RabbitMQ的事務機制或者Confirm機制可以參考:RabbitMQ之訊息確認機制(事務+Confirm). 幸虧本文的主題是討論RabbitMQ的持久化而不是可靠性,不然就一發不可收拾了。RabbitMQ的可靠性涉及producer端的確認機制、broker端的映象佇列的配置以及consumer端的確認機制,要想確保訊息的可靠性越高,那麼效能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取捨。

2.訊息什麼時候刷到磁碟? 
寫入檔案前會有一個Buffer,大小為1M,資料在寫入檔案時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到檔案(未必刷到磁碟)。 
有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25ms,Buffer裡的資料及未重新整理到磁碟的檔案內容必定會刷到磁碟。 
每次訊息寫入後,如果沒有後續寫入請求,則會直接將已寫入的訊息刷到磁碟:使用Erlang的receive x after 0實現,只要程序的信箱裡沒有訊息,則產生一個timeout訊息,而timeout會觸發刷盤操作。

欲瞭解更多訊息中介軟體的內容,可以關注:訊息中介軟體收錄集

參考資料