1. 程式人生 > >rabbit--消息持久化

rabbit--消息持久化

clust 也不能 產生 tro lose consumer false type sum

消息的可靠性是RabbitMQ的一大特色,那麽RabbitMQ是如何保證消息可靠性的呢——消息持久化。
為了保證RabbitMQ在退出或者crash等異常情況下數據沒有丟失,需要將queue,exchange和Message都持久化。

queue的持久化

queue的持久化是通過durable=true來實現的。

1 using (var connection = factory.CreateConnection())
2             {
3                 using (var channel = connection.CreateModel())
4                 {
5 channel.QueueDeclare(queue:"hello", durable: false, exclusive: false, autoDelete: false, arguments: null); 6 7 } 8 }

參數說明:

queue:queue名稱

exclusive:排他隊列;如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。

        這裏需要註意三點:1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;

                 2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;

               3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。

autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

quereDeclear相關的4種方法,如下:

技術分享圖片
1 Queue.DeclareOk queueDeclare() throws IOException;
2 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
3 Map<String, Object> arguments) throws IOException; 4 void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, 5 Map<String, Object> arguments) throws IOException; 6 Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
View Code

其中需要說明的是queueDeclarePassive(String queue)可以用來檢測一個queue是否已經存在

如果該隊列存在,則會返回true;如果不存在,就會返回異常,但是不會創建新的隊列。

消息的持久化

  如果將queue的持久化標識為durable設置true,則代表是一個持久化的隊列,那麽在服務重啟後,也會存在;因為服務會把持久化的queue存放在硬盤上。當服務重啟的時候,會重新設置之前持久化的queue。

  隊列是可以被持久化,但是裏面的消息是否為持久化那還要看消息的持久化設置

  也就是說重啟之前那個queue裏面還沒有發出去的消息的話,重啟之後那隊列裏面是不是還存在原來的消息,這個就要取決於發生著在發送消息時對消息的設置了。
如果要在重啟後保持消息的持久化必須設置消息是持久化的標識

  設置消息的持久化:

channel.BasicPublish(exchange: "test", routingKey: "task_queue", basicProperties: MessageProperties.PERSISTENT_TEXT_PLAIN, body: body);

參數解析:

  exchange:exchange名字

  routingKey:routingKey名字

  body:表示發送的消息體

  basicProperties:首先看下BasicProperties的定義

技術分享圖片
 1 public BasicProperties(
 2             String contentType,//消息類型如:text/plain
 3             String contentEncoding,//編碼
 4             Map<String,Object> headers,
 5             Integer deliveryMode,//1:nonpersistent 2:persistent
 6             Integer priority,//優先級
 7             String correlationId,
 8             String replyTo,//反饋隊列
 9             String expiration,//expiration到期時間
10             String messageId,
11             Date timestamp,
12             String type,
13             String userId,
14             String appId,
15             String clusterId)
View Code

deliveryMode=1表示不持久化;deliveryMode=2則表示持久化。

那麽MessageProperties.PERSISTENT_TEXT_PLAIN又是什麽鬼?

技術分享圖片
1 public static final BasicProperties PERSISTENT_TEXT_PLAIN =
2     new BasicProperties("text/plain",
3                         null,
4                         null,
5                         2,
6                         0, null, null, null,
7                         null, null, null, null,
8                         null, null);
View Code

可以看到這其實就是講deliveryMode設置為2的BasicProperties的對象,為了方便編程而出現的一個東東

另一種實現:

技術分享圖片
1 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
2 builder.deliveryMode(2);
3 AMQP.BasicProperties properties = builder.build();
4 channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
View Code

設置了隊列和消息的持久化之後,當broker服務重啟的之後,消息依舊存在。單只設置隊列持久化,重啟之後消息會丟失;單只設置消息的持久化,重啟之後隊列消失,既而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無意義。

exchange的持久化

上面闡述了隊列的持久化和消息的持久化,如果不設置exchange的持久化對消息的可靠性來說沒有什麽影響,但是同樣如果exchange不設置持久化,那麽當broker服務重啟之後,exchange將不復存在,那麽既而發送方rabbitmq producer就無法正常發送消息。同樣設置exchange的持久化。exchange的持久化設置也特別簡單,方法如下:

channel.ExchangeDeclare(exchange: "test", type: "direct/topic/header/fanout", durable: true);即在聲明的時候講durable字段設置為true即可。

技術分享圖片
 1 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
 2 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
 3                                    Map<String, Object> arguments) throws IOException;
 4 Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
 5 Exchange.DeclareOk exchangeDeclare(String exchange,
 6                                           String type,
 7                                           boolean durable,
 8                                           boolean autoDelete,
 9                                           boolean internal,
10                                           Map<String, Object> arguments) throws IOException;
11 void exchangeDeclareNoWait(String exchange,
12                            String type,
13                            boolean durable,
14                            boolean autoDelete,
15                            boolean internal,
16                            Map<String, Object> arguments) throws IOException;
17 Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
View Code

引申的問題

1.將queue,exchange, message等都設置了持久化之後就能保證100%保證數據不丟失了嚒?

答案是否定的。
  首先,從consumer端來說,如果這時autoAck=true,那麽當consumer接收到相關消息之後,還沒來得及處理就crash掉了,那麽這樣也算數據丟失,這種情況也好處理,只需將autoAck設置為true(方法定義如下),然後在正確處理完消息之後進行手動ack(channel.basicAck).

  其次,關鍵的問題是消息在正確存入RabbitMQ之後,還需要有一段時間(這個時間很短,但不可忽視)才能存入磁盤之中,RabbitMQ並不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤上,在這段時間內RabbitMQ broker發生crash, 消息保存到cache但是還沒來得及落盤,那麽這些消息將會丟失。

  那麽這個怎麽解決呢?首先可以引入RabbitMQ的mirrored-queue即鏡像隊列,這個相當於配置了副本,當master在此特殊時間內crash掉,可以自動切換到slave,這樣有效的保障了HA, 除非整個集群都掛掉,這樣也不能完全的100%保障RabbitMQ不丟消息,但比沒有mirrored-queue的要好很多,很多現實生產環境下都是配置了mirrored-queue的。還有要在producer引入事務機制或者Confirm機制來確保消息已經正確的發送至broker端。RabbitMQ的可靠性涉及producer端的確認機制、broker端的鏡像隊列的配置以及consumer端的確認機制,要想確保消息的可靠性越高,那麽性能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取舍。

2.消息什麽時候刷到磁盤?

  寫入文件前會有一個Buffer,大小為1M,數據在寫入文件時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。
  有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每隔25ms,Buffer裏的數據及未刷新到磁盤的文件內容必定會刷到磁盤。
每次消息寫入後,如果沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操作。

rabbit--消息持久化