1. 程式人生 > >Rabbitmq消息持久化

Rabbitmq消息持久化

配置 timeout 進程 gray 產生 let 還需 建立 simple

消息的可靠性是RabbitMQ的一大特色,那麽RabbitMQ是如何保證消息可靠性的呢——消息持久化。?
為了保證RabbitMQ在退出或者crash等異常情況下數據沒有丟失,需要將queueexchangeMessage都持久化。

queue的持久化

queue的持久化是通過durable=true來實現的。?
一般程序中這麽使用:

/**

* amqp_queue_declare

*

* @param [in] state connection state TCP連接

* @param [in] channel the channel to do the RPC on

信道

* @param [in] queue queue 隊列名稱

* @param [in] passive passive

* @param [in] durable durable 是否持久化

* @param [in] exclusive exclusive 是否排他

* @param [in] auto_delete auto_delete 是否自動刪除

* @param [in] arguments arguments

* @returns amqp_queue_declare_ok_t

*/

AMQP_PUBLIC_FUNCTION

amqp_queue_declare_ok_t *

AMQP_CALL amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable

, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments)

{

amqp_queue_declare_t req;

req.ticket = 0;

req.queue = queue;

req.passive = passive;

req.durable = durable;

req.exclusive = exclusive;

req.auto_delete = auto_delete;

req.nowait = 0;

req.arguments = arguments;

?

return amqp_simple_rpc_decoded(state, channel, AMQP_QUEUE_DECLARE_METHOD, AMQP_QUEUE_DECLARE_OK_METHOD, &req);

}

@param [in] durable 在申明一個隊列時,有一個參數amqp_boolean_t durable用來指定該隊列中的消息是否持久化

@param [in] exclusive 排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這裏需要註意三點:1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;2."首次",如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景;

@param [in] auto_delete 自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列;

消息的持久化

如果將queue的持久化標識durable設置為true,則代表是一個持久的隊列,那麽在服務重啟之後,也會存在,因為服務會把持久化的queue存放在硬盤上,當服務重啟的時候,會重新創建之前被持久化的queue。隊列是可以被持久化,但是裏面的消息是否為持久化那還要看消息的持久化設置。也就是說,重啟之前那個queue裏面還有沒有發出去的消息的話,重啟之後那隊列裏面是不是還存在原來的消息,這個就要取決於發生著在發送消息時對消息的設置了。?
如果要在重啟後保持消息的持久化必須設置消息的持久化的標識;

amqp_basic_properties_t props;

props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;

props.content_type = amqp_cstring_bytes("text/plain");

props.delivery_mode = 2; /* persistent delivery mode */

die_on_error(amqp_basic_publish(conn,

1,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(routingkey),

0,

0,

&props,

amqp_cstring_bytes("test message")),

"Publishing");

發送消息之前要先設置一個消息屬性結構體amqp_basic_properties_tdelivery_mode指定消息的持久化設置;

delivery_mode=2將消息設置為持久化模式,隊列中這條消息將會被持久化保存;

設置了隊列和消息的持久化之後,當broker服務重啟的之後,消息依舊存在。單只設置隊列持久化,重啟之後消息會丟失;單只設置消息的持久化,

重啟之後隊列消失,既而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無意義。

exchange的持久化

上面闡述了隊列的持久化和消息的持久化,如果不設置exchange的持久化對消息的可靠性來說沒有什麽影響,但是同樣如果exchange不設置持久化,

那麽當broker服務重啟之後,exchange將不復存在,那麽既而發送方rabbitmq producer就無法正常發送消息。這裏博主建議,同樣設置exchange的持久化。

exchange的持久化設置也特別簡單;

/**

* amqp_exchange_declare

*

* @param [in] state connection state

* @param [in] channel the channel to do the RPC on

* @param [in] exchange exchange

* @param [in] type type

* @param [in] passive passive

* @param [in] durable durable

* @param [in] auto_delete auto_delete

* @param [in] internal internal

* @param [in] arguments arguments

* @returns amqp_exchange_declare_ok_t

*/

AMQP_PUBLIC_FUNCTION

amqp_exchange_declare_ok_t *

AMQP_CALL amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t auto_delete, amqp_boolean_t internal, amqp_table_t arguments)

{

amqp_exchange_declare_t req;

req.ticket = 0;

req.exchange = exchange;

req.type = type;

req.passive = passive;

req.durable = durable;

req.auto_delete = auto_delete;

req.internal = internal;

req.nowait = 0;

req.arguments = arguments;

?

return amqp_simple_rpc_decoded(state, channel, AMQP_EXCHANGE_DECLARE_METHOD, AMQP_EXCHANGE_DECLARE_OK_METHOD, &req);

}

參數和隊列創建的類似不做詳細解釋;

進一步討論

1.queueexchange, message等都設置了持久化之後就能保證100%保證數據不丟失了嚒??
答案是否定的。?
首先,從consumer端來說,如果這時autoAck=true,那麽當consumer接收到相關消息之後,還沒來得及處理就crash掉了,那麽這樣也算數據丟失,這種情況也好處理,只需將autoAck設置為false(方法定義如下),然後在正確處理完消息之後進行手動ackchannel.basicAck);

其次,關鍵的問題是消息在正確存入RabbitMQ之後,還需要有一段時間(這個時間很短,但不可忽視)才能存入磁盤之中,RabbitMQ並不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤上,在這段時間內RabbitMQ broker發生crash, 消息保存到cache但是還沒來得及落盤,那麽這些消息將會丟失。那麽這個怎麽解決呢?首先可以引入RabbitMQmirrored-queue即鏡像隊列,這個相當於配置了副本,當master在此特殊時間內crash掉,可以自動切換到slave,這樣有效的保障了HA, 除非整個集群都掛掉,這樣也不能完全的100%保障RabbitMQ不丟消息,但比沒有mirrored-queue的要好很多,很多現實生產環境下都是配置了mirrored-queue的。還有要在producer引入事務機制或者Confirm機制來確保消息已經正確的發送至broker端,有關RabbitMQ的事務機制或者Confirm機制可以參考:RabbitMQ之消息確認機制(事務+Confirm;幸虧本文的主題是討論RabbitMQ的持久化而不是可靠性,不然就一發不可收拾了。RabbitMQ的可靠性涉及producer端的確認機制、broker端的鏡像隊列的配置以及consumer端的確認機制,要想確保消息的可靠性越高,那麽性能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取舍。

2.消息什麽時候刷到磁盤??
寫入文件前會有一個Buffer,大小為1M,數據在寫入文件時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。?
有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25msBuffer裏的數據及未刷新到磁盤的文件內容必定會刷到磁盤。?
每次消息寫入後,如果沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlangreceive x after 0實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操作。

?

?

Rabbitmq消息持久化