1. 程式人生 > >RabbitMQ的持久化機制

RabbitMQ的持久化機制

一.問題的引出 
RabbitMQ的一大特色是訊息的可靠性,那麼它是如何保證訊息可靠性的呢?——訊息持久化。為了保證RabbitMQ在退出,服務重啟或者crash等異常情況下,也不會丟失訊息,我們可以將Queue,Exchange,Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ訊息不會丟失。當然還是會有一些小概率事件會導致訊息丟失。

二.Queue的持久化 
1.檢視存在的佇列和訊息數量 
在windows環境下,在rabbitmq的安裝目錄/sbin下,通過rabbitmqctl.bat list_queues檢視 
這裡寫圖片描述

 
這邊啟動了兩個producer,分別生成兩個佇列hello 和 hello1,並且他們都有一個訊息存在 
重啟RabbitMQ Server,模擬故障 
這裡寫圖片描述 
可以看到重啟後兩個佇列都消失了. 
2.持久化佇列 
Queue的持久化是通過durable=true來實現的。 
一般程式中這麼使用:

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//關鍵的是第二個引數設定為true,即durable=true. channel.queueDeclare("queue.persistent.name", true, false, false, null);
  • 1
  • 2
  • 3
  • 4

Channel類中queueDeclare的完整定義如下:

 /**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

引數說明:

queue:queue的名稱

exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次申明它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:1. 排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一連線建立的排他佇列;2.“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同;3.即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的,這種佇列適用於一個客戶端傳送讀取訊息的應用場景。

autoDelete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。

queueDeclare相關的有4種方法,分別是:

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

其中需要說明的是queueDeclarePassive(String queue)可以用來檢測一個queue是否已經存在。如果該佇列存在,則會返回true;如果不存在,就會返回異常,但是不會建立新的佇列。

我們就hello佇列持久化,在宣告佇列名稱時,持久化佇列,生產端和消費端都要. 
我們重複上面的操作,但是給hello佇列做持久化,而hello1不做,並重啟rabbitmq. 
這裡寫圖片描述 
可以看到重啟後,hello佇列還在,hello1佇列消失了,但是原本hello中的一條訊息也沒有儲存下來。所以在這邊我們僅僅做到了訊息佇列的持久化,還沒有做訊息持久化。

三.Message的持久化 
如果將Queue的持久化標識durable設定為true,則代表是一個持久的佇列,那麼在服務重啟之後,也會存在,因為服務會把持久化的queue存放在硬碟上,當服務重啟的時候,會重新載入之前被持久化的queue。佇列是可以被持久化,但是裡面的訊息是否為持久化那還要看訊息的持久化設定。也就是說,重啟之前那個Queue裡面還有沒發出去的訊息的話,重啟之後那佇列裡面是不是還存在原來的訊息,這個就要取決於傳送者在傳送訊息時對訊息的設定了。 
如果要在重啟後保持訊息的持久化必須設定訊息是持久化的標識。

channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
  • 1

這裡的關鍵是:MessageProperties.PERSISTENT_TEXT_PLAIN 
首先看一下basicPublish的方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5

exchange表示exchange的名稱 
routingKey表示routingKey的名稱 
body代表傳送的訊息體 
有關mandatory和immediate的詳細解釋可以參考:RabbitMQ之mandatory和immediate 
這裡關鍵的是BasicProperties props這個引數了,這裡看下BasicProperties的定義:

public BasicProperties(
            String contentType,//訊息型別如:text/plain
            String contentEncoding,//編碼 Map<String,Object> headers, //這裡的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。 Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//優先順序 String correlationId, String replyTo,//反饋佇列 String expiration,//expiration到期時間 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

上面的實現程式碼使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那麼這個又是什麼呢?

public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以看到這其實就是講deliveryMode設定為2的BasicProperties的物件,為了方便程式設計而出現的一個東東。 換一種實現方式:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
  • 1
  • 2
  • 3
  • 4

設定了佇列和訊息的持久化之後,當broker服務重啟的之後,訊息依舊存在。單隻設定佇列持久化,重啟之後訊息會丟失;單隻設定訊息的持久化,重啟之後佇列消失,既而訊息也丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。 
再以上面例子,生產端生成一個訊息,並重啟rabbitmq. 
這裡寫圖片描述 
可以看到,經過佇列和訊息持久化後的hello, 在重啟的情況下,佇列和訊息都存在,沒有消失,消費端再重啟後也是能正常接收的.

四.Exchange的持久化 
上面闡述了佇列的持久化和訊息的持久化,如果不設定exchange的持久化對訊息的可靠性來說沒有什麼影響,但是同樣如果exchange不設定持久化,那麼當broker服務重啟之後,exchange將不復存在,那麼既而傳送方rabbitmq producer就無法正常傳送訊息。這裡博主建議,同樣設定exchange的持久化。exchange的持久化設定也特別簡單,方法如下:

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在宣告的時候講durable欄位設定為true即可。

五.關於Message的持久化的更進一步探討 
1.訊息什麼時候需要持久化? 
根據 官方博文(http://www.rabbitmq.com/blog/2011/01/20/rabbitmq-backing-stores-databases-and-disks/) 的介紹,RabbitMQ在兩種情況下會將訊息寫入磁碟: 
- 訊息本身在publish的時候就要求訊息寫入磁碟 
- 記憶體緊張,需要將部分記憶體中的訊息轉移到磁碟

2.訊息什麼時候會刷到磁碟? 
- 寫入檔案前會有一個Buffer,大小為1M(1048576),資料在寫入檔案時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到檔案(未必刷到磁碟) 
- 有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每隔25ms,Buffer裡的資料及未重新整理到磁碟的檔案內容必定會刷到磁碟 
- 每次訊息寫入後,如果沒有後續寫入請求,則會直接將已寫入的訊息刷到磁碟:使用Erlang的receive x after 0來實現,只要程序的信箱裡沒有訊息,則產生一個timeout訊息,而timeout會觸發刷盤操作

3.訊息在磁碟檔案中的格式 
訊息保存於$MNESIA/msg_store_persistent/x.rdq檔案中,其中x為數字編號,從0開始,每個檔案最大為16M(16777216),超過這個大小會生成新的檔案,檔案編號加1。訊息以以下格式存在於檔案中:

<<Size:64, MsgId:16/binary, MsgBody>>

MsgId為RabbitMQ通過rabbit_guid:gen()每一個訊息生成的GUID,MsgBody會包含訊息對應的exchange,routing_keys,訊息的內容,訊息對應的協議版本,訊息內容格式(二進位制還是其它)等等。

4.檔案何時刪除? 
當所有檔案中的垃圾訊息(已經被刪除的訊息)比例大於閾值(GARBAGE_FRACTION = 0.5)時,會觸發檔案合併操作(至少有三個檔案存在的情況下),以提高磁碟利用率。 
publish訊息時寫入內容,ack訊息時刪除內容(更新該檔案的有用資料大小),當一個檔案的有用資料等於0時,刪除該檔案。

 

5.將queue,exchange, message等都設定了持久化之後就能保證100%保證資料不丟失了嚒? 
答案是否定的。 
首先,從consumer端來說,如果這時autoAck=true,那麼當consumer接收到相關訊息之後,還沒來得及處理就crash掉了,那麼這樣也算資料丟失,這種情況也好處理,只需將autoAck設定為false(方法定義如下),然後在正確處理完訊息之後進行手動ack(channel.basicAck).

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • 1

其次,關鍵的問題是訊息在正確存入RabbitMQ之後,還需要有一段時間(這個時間很短,但不可忽視)才能存入磁碟之中,RabbitMQ並不是為每條訊息都做fsync的處理,可能僅僅儲存到cache中而不是物理磁碟上,在這段時間內RabbitMQ broker發生crash, 訊息儲存到cache但是還沒來得及落盤,那麼這些訊息將會丟失。那麼這個怎麼解決呢?首先可以引入RabbitMQ的mirrored-queue即映象佇列,這個相當於配置了副本,當master在此特殊時間內crash掉,可以自動切換到slave,這樣有效的保障了HA, 除非整個叢集都掛掉,這樣也不能完全的100%保障RabbitMQ不丟訊息,但比沒有mirrored-queue的要好很多,很多現實生產環境下都是配置了mirrored-queue的。還有要在producer引入事務機制或者Confirm機制來確保訊息已經正確的傳送至broker端,有關RabbitMQ的事務機制或者Confirm機制可以參考:RabbitMQ之訊息確認機制(事務+Confirm). 幸虧本文的主題是討論RabbitMQ的持久化而不是可靠性,不然就一發不可收拾了。RabbitMQ的可靠性涉及producer端的確認機制、broker端的映象佇列的配置以及consumer端的確認機制,要想確保訊息的可靠性越高,那麼效能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取捨。

相關參考連結:http://jzhihui.iteye.com/blog/1642324 
訊息中介軟體收錄集:https://blog.csdn.net/u013256816/article/details/54743481

    1. RabbitMQ訊息佇列(三):任務分發機制
    2. RabbitMQ之mandatory和immediate
    3. RabbitMQ之訊息確認機制(事務+Confirm)
    4. RabbitMQ持久化機制