四種途徑提升RabbitMQ傳輸訊息資料的可靠性(一)
前言
RabbitMQ雖然有對佇列及訊息等的一些持久化設定,但其實光光只是這一個是不能夠保障資料的可靠性的,下面我們提出這樣的質疑:
(1)RabbitMQ生產者是不知道自己釋出的訊息是否已經正確達到伺服器呢,如果中間發生網路異常等情況呢?訊息必然會丟失!
(2)RabbitMQ如果沒有設定佇列持久化,RabbitMQ伺服器重後佇列的元資料會丟失,訊息自然也會丟失!
(3)RabbitMQ如果消費者設定自動確認,即autoAck為true,那麼不管消費者發生什麼情況,該訊息會自動從佇列中移除,實際上消費者有可能掛掉,訊息必然會丟失!
(4)RabbitMQ中的訊息如果沒有匹配到佇列時,那麼訊息也會丟失!
本文其實也就是結合以上四個方面進行講解的,主要參考《RabbitMQ實戰指南》(有需要PDF電子書的可以評論或者私信我),本文截圖也來自其中,另外可以對一些RabbitMQ的概念的認識可以參考我的上兩篇博文 認識RabbitMQ交換機模型 、 RabbitMQ是如何運轉的?
一、設定mandotory引數、AE備份交換器
針對前言中的第(4)個問題,我們可以通過設定mandotory引數與AE備份交換器來解決
1、mandotory引數
1)當為true時, 交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,此時RabbitMQ會呼叫Basic.Return命令將訊息返回給生產者,訊息將不會丟失
2)當為false時,訊息將會被直接丟棄。
3) RabbitMQ通過addReturnListener新增ReturnLisener監聽器監聽獲取沒有被正確路由到合適佇列的訊息 。
channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes()); channel.addReturnListener(new ReturnListener(){ public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); System.out.println("Basic.Return 返回的結果是: " + message); } });
2、AE備份交換器
Alternate Exchange,簡稱AE,不設定mandatory引數,那麼訊息將會被丟失,設定mandatory引數的話,需要新增ReturnListner監聽器,增加複雜程式碼,如果 既不想增加程式碼又不想訊息丟失,則使用AE,將沒有被路由的訊息儲存於RabbitMQ中。當mandatory引數用AE一起使用時,mandatory將失效。 在介紹AE之前,也認識RabbitMQ對於訊息的過期時間TTL設定以及佇列的過期時間TTL設定
2.1 TTL過期時間設定
可以對佇列設定TTL與訊息設定TTL,其中訊息設定TTL經常用於死信佇列、延遲佇列等高階應用中。
1)設定訊息TTL
設定TTL過期時間一般有兩種當時:一是通過佇列屬性,對佇列中所有訊息設定相同的TTL。二就是對訊息本身單獨設定,每條訊息TTL不同。如果一起使用時候,TTL小的為準,當一旦超過設定的TTL時間時,就會變成“死信”。
方式一: 針對每條訊息設定TTL是通過增加expiration的屬性引數實現的,不可能像方式二一樣掃描整個佇列再判斷是否過期, 只有當該訊息即將被消費時再判定是否過期即可刪除,也就是訊息即使已經過期,但不一定立馬被刪除!
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 持久化訊息 builder deliveryMode(2); // 設定 TTL=60000ms builder expiration( 60000 ); AMQP.BasicProperties properties = builder. build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());
方式二: 通過佇列屬性設定訊息TTL是增加x-message-ttl引數實現的, 只需要掃描整個佇列頭部即可立即刪除,也就是訊息一旦過期就會被刪除!
Map<String, Object> argss = new HashMap<String , Object>(); argss.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;
2)設定佇列TTL
通過在佇列中 新增引數x-message-ttl引數實現 , 設定佇列被自動刪除前處於未被使用狀態的時間 ,注意是佇列的使用狀態,並不是訊息是否被消費的狀態
設定ttl=30min的佇列,時間一到RabbitMQ會保證佇列被刪除,但是不會保證刪除的速度有多快。
Map<String, Object> args = new HashMap<String, Object>{); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args);
2.2 AE備份交換器的使用
宣告交換器的時候,新增alternate-exchange引數實現,或通過策略實現。前者優先順序高。從程式碼角度需要以下三個步驟,具體程式碼如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("a1ternate-exchange", "myAe"); channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args); channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ; channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11); channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key"); channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);
1)宣告normalExchange型別為direct的交換器、型別為fanout的myAe備份交換器;並且normalExchange的備份交換器為myAe(備份交換器建議使用fanout型別交換器)
2)宣告normalQueue佇列,宣告unrouteQueue佇列;
3)通過路由鍵normalKey繫結normalExchange與normalQueue,不適用路由鍵繫結unrouteQueue與myAe
二、消費者手動確認
針對前言中第(3)個問題,我們需要在消費者消費完訊息後手動進行確認,保證訊息資料不丟失!
1、autoAck引數設定
1) 當autoAck引數為false時,手動確認:
RabbitMQ會等待消費者顯式地回覆確認訊號後從記憶體中移去訊息 (實際上是先標示刪除標記,之後再刪除),這是一般推薦使用的方式, 因為使用手動確認有足夠的時間處理訊息 ,不需要擔心消費者程序掛掉之後訊息丟失問題。此時的訊息就會分為兩個部分:一是等待投遞給消費者的訊息;二是已經投遞給消費者但還沒有收到消費者確認訊號的訊息。
2) 當autoAck為true時,自動確認:
RabbitMQ會自動隱式地回覆確認訊號後從記憶體中移去訊息, RabbitMQ不需要管消費者是否真正消費了這些訊息,RabbitMQ會自動把傳送出去的訊息置為確認,然後直接從記憶體中刪除。
2、重新投遞
問:如果選擇手動確認,即autoAck為false時,消費者由於某些原因斷開了,那麼訊息的確認會受到影響,那麼此時的訊息會丟失嗎?
這也就是一開始提出來的問題,其實是不必擔心訊息會被丟失,因為RabbitMQ如果一直沒收到消費者的確認訊號,並且消費此訊息的消費者已經斷開, 則RabbitMQ會重新安排訊息進入佇列等待給下一個消費者 。也就是RabbitMQ不會設定訊息的過期時間(當然也可以設定過期時間,但與之有關係方式訊息丟失的特性是死信佇列), 它只判斷是否需要重新安排入佇列重新投遞,而判斷的唯一標準是消費此訊息的消費者連線是否已經斷開 ,即RabbitMQ會允許消費一條訊息的時間很久很久。
3、消費者拒絕訊息
1)使用channel.basicReject方法,但只能拒絕一條。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:訊息的唯一標識
requeue:表示是否可以拒絕的訊息重新存入佇列
2)使用channel.basicNack。不同於前者,此方法可以批量拒絕。
void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
multiple:設定為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的訊息。
3)問:關鍵在於,消費者拒絕消費訊息後怎麼處理?是丟棄,還是重新回到佇列呢?
當引數requeue設定為true時候,可以重新進入佇列,投遞給下一個消費者。如果為false,訊息就會把佇列中訊息立馬移除,再結合啟用“死信佇列”,防止訊息丟失並且可以分析異常情況的發生。
三、生產者確認機制
針對前言的第(1)個問題,我們可以通過生產者的確認訊息機制來解決,主要分為兩種:第一是事務機制、第二是傳送方確認機制
1、事務機制
與事務機制相關的有三種方法, 分別是channel.txSelect設定當前通道為事務模式、channel.txCommit提交事務和channel.txRollback事務回滾。如果事務提交成功,則訊息一定是到達了RabbitMQ中,如果事務提交之前由於傳送異常或者其他原因,捕獲後可以進行channel.txRollback回滾。
// 將通道設定為事務模式,開啟事務 channel.txSelect(); // 傳送持久化訊息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes()); // 事務提交 channel.txCommit();
發生異常之後事務回滾
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes()); channel.txCommit(); } catch (Exception e){ e.printStackTrace(); channel.txRollback(); }
2、確認機制
確認機制相對來說,相比較程式碼來說比較複雜。主要有單條確認、批量確認、非同步批量確認
-----------------------------------------未完待續,寫不動了,休息會再補充下一篇-------------------------------------------