RabbitMQ生產端訊息可靠性投遞方案分析
之前在上面2篇文章中,講到過RabbitMQ的安裝,基本概念和用法。我們來回顧一下RabbitMQ核心基礎概念。
-
Server:又稱之為Broker,接受客戶端的連線,實現AMQP實體服務。
-
Connection:連線,應用程式與Broker的網路連線。
-
Channel:網路通道,幾乎所有的操作都在Channel中進行,Channel是進行訊息讀寫的通道。客戶端可以建立多個Channel,每個Channel代表一個會話任務。
如果每一次訪問RabbitMQ都建立一個Connection,在訊息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連線,如果應用程式支援多執行緒,通常每個thread建立單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了作業系統建立TCP connection的開銷。
-
Message:訊息,伺服器和應用程式之間傳送的資料,由Message Properties和Body組成。Properties可以對訊息進行修飾,比如訊息的優先順序,延遲等高階特性,Body就是訊息體內容。
-
Virtual Host:虛擬地址,用於進行邏輯隔離,最上層的訊息路由。一個Virtual Host裡面可以有若干個Exchange和Queue,同一個Virtual Host裡面不能有相同名稱的Exchange或者Queue。
-
Exchange:交換機,接收訊息,根據路由鍵轉發訊息到繫結的佇列。
-
Binding:Exchange和Queue之間的虛擬連線,binding中可以包含routing key。
-
Routing key:一個路由規則,虛擬機器可以用它來確定如何路由一個特定訊息。
-
Queue:也可以稱之為Message Queue(訊息佇列),儲存訊息並將它們轉發到消費者。
通過下面2張圖,我們能大概能明白AMQP協議模型和訊息流轉過程。在Exchange和Message Queue上面還有Virtual host。記住同一個Virtual Host裡面不能有相同名稱的ExChange和Message Queue。

image.png

image.png
接著我們看下面的圖,這是RabbitMQ訊息可靠性投遞的解決方案之一。

image.png
1.將訊息落地到業務db和Message db。
2.採用Confirm方式傳送訊息至MQ Broker,返回結果的過程是非同步的。Confirm訊息,是指生產者投遞訊息後,如果Broker收到訊息後,會給生產者一個ACK。生產者通過ACK,可以確認這條訊息是否正常傳送到Broker,這種方式是訊息可靠性投遞的核心。
3、4:在這裡將訊息分成3種狀態。status=0表示訊息正在投遞中,status=1表示訊息投遞成功,status=2表示訊息投遞了3次還是失敗。生產者接收Broker返回的Confirm確認訊息結果,然後根據結果更新訊息的狀態。將status的狀態從投遞中改成投遞成功即可。
5.在訊息Confirm過程中,可能由於網路閃斷問題或者是Broker端出現異常,導致回送訊息失敗或者出現異常。這時候,就需要生產者對訊息進行可靠性投遞,保證投遞到Broker的訊息可靠不丟失。還有一種極端情況值得我們考慮,那就是網路閃斷。我們的訊息成功投遞到Broker,但是在回送ACK確認訊息時,由於網路閃斷,生產者沒有收到。此時我們再重新投遞此訊息可能會造成消費端重複消費訊息了。這時候需要消費端去做冪等處理(生成全域性訊息ID,判斷此訊息是否消費過)。對於沒有投遞成功的訊息,我們可以設定一個重新投遞時間。比如一個訊息在5分鐘內,status狀態還是0,也就是這個訊息還沒有成功投遞到Broker端。這時候我們需要一個定時任務,每隔幾分鐘從Message db中拉取status為0的訊息。
6.將拉取的訊息執行重新投遞操作。
7.設定最大訊息投遞次數。當一個訊息被投遞了3次,還是不成功,那麼將status置為2。最後交給人工解決處理此類問題或者將訊息轉存到失敗表。
下面講解一下涉及到訊息可靠性的知識點和一些配置了。
application-dev.properties
#rabbtisMQ配置 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root spring.rabbitmq.virtual-host=/ #消費者數量 spring.rabbitmq.listener.simple.concurrency=10 #最大消費者數量 spring.rabbitmq.listener.simple.max-concurrency=10 #消費者每次從佇列獲取的訊息數量 spring.rabbitmq.listener.simple.prefetch=1 #消費者自動啟動 spring.rabbitmq.listener.simple.auto-startup=true #消費失敗,自動重新入隊 #重試次數超過最大限制之後是否丟棄(true不丟棄時需要寫相應程式碼將該訊息加入死信佇列) #true,自動重新入隊,要寫相應程式碼將該訊息加入死信佇列 #false,丟棄 spring.rabbitmq.listener.simple.default-requeue-rejected=false #是否開啟消費者重試(為false時關閉消費者重試,這時消費端程式碼異常會一直重複收到訊息) spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=1000 spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.retry.multiplier=1.0 spring.rabbitmq.listener.simple.retry.max-interval=10000 #啟動傳送重試策略 spring.rabbitmq.template.retry.enabled=true #初始重試間隔為1s spring.rabbitmq.template.retry.initial-interval=1000 #重試的最大次數 spring.rabbitmq.template.retry.max-attempts=3 #重試間隔最多10s spring.rabbitmq.template.retry.max-interval=10000 #每次重試的因子是1.0 等差 spring.rabbitmq.template.retry.multiplier=1.0 # #RabbitMQ的訊息確認有兩種。 #一種是訊息傳送確認。這種是用來確認生產者將訊息傳送給交換器,交換器傳遞給佇列的過程中, # 訊息是否成功投遞。 #傳送確認分為兩步,一是確認是否到達交換器,二是確認是否到達佇列。 #第二種是消費接收確認。這種是確認消費者是否成功消費了佇列中的訊息。 # 確認訊息傳送成功,通過實現ConfirmCallBack介面,訊息傳送到交換器Exchange後觸發回撥 spring.rabbitmq.publisher-confirms=true # 實現ReturnCallback介面,如果訊息從交換器傳送到對應佇列失敗時觸發 # (比如根據傳送訊息時指定的routingKey找不到佇列時會觸發) spring.rabbitmq.publisher-returns=true # 訊息消費確認,可以手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual #在訊息沒有被路由到合適佇列情況下會將訊息返還給訊息釋出者 #當mandatory標誌位設定為true時,如果exchange根據自身型別和訊息routingKey無法找到一個合適的queue儲存訊息, # 那麼broker會呼叫basic.return方法將訊息返還給生產者;當mandatory設定為false時, # 出現上述情況broker會直接將訊息丟棄;通俗的講,mandatory標誌告訴broker代理伺服器至少將訊息route到一個佇列中, # 否則就將訊息return給傳送者; spring.rabbitmq.template.mandatory=true
要確保RabbitMQ訊息的可靠要保證以下3點:
1.publisher Confirms:要確保生產者的訊息到broker的可靠性。可能會發生訊息投遞到broker過程中,broker掛了的情況。
2.Exchange,Queue,Message持久化:RabbitMQ是典型的記憶體式訊息堆積。我們需要把message儲存到磁碟中。如果是未持久化的訊息儲存在記憶體中,broker掛了那麼訊息會丟失。
3.consumer acknowledgement:消費者確認模式有3種:none(沒有訊息會發送應答),auto(自動應答),manual(手動應答)。為了保證訊息可靠性,我們設定手動應答,這是為什麼呢?採用自動應答的方式,每次消費端收到訊息後,不管是否處理完成,Broker都會把這條訊息置為完成,然後從Queue中刪除。如果消費端消費時,丟擲異常。也就是說消費端沒有成功消費該訊息,從而造成訊息丟失。為了確保訊息被消費者正確處理,我們採用手動應答(呼叫basicAck、basicNack、basicReject方法),只有在訊息得到正確處理下,再發送ACK。
RabbitMQ訊息確認有2種:訊息傳送確認,消費接收確認。訊息傳送確認是確認生產者將訊息傳送到Exchange,Exchange分發訊息至Queue的過程中,訊息是否可靠投遞。第一步是否到達Exchange,第二步確認是否到達Queue。
實現ConfirmCallBack介面,訊息傳送到Exchange後觸發回撥。
// 訊息傳送到交換器Exchange後觸發回撥 private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("生產端confirm..."); log.info("correlationData=" + correlationData); String messageId = correlationData.getId(); if (ack) { //confirm返回成功,更新訊息投遞狀態 brokerMessageLogMapper.updateMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date()); } else { // 失敗則進行具體的後續操作,重試或者補償等手段。 log.info("異常處理..."); } } };
實現ReturnCallBack介面,訊息從Exchange傳送到指定的Queue失敗觸發回撥
// 如果訊息從交換器傳送到對應佇列失敗時觸發 private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("message=" + message.toString()); log.info("replyCode=" + replyCode); log.info("replyText=" + replyText); log.info("exchange=" + exchange); log.info("routingKey=" + routingKey); } };
訊息確認機制開啟,需要配置以下資訊
spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.listener.simple.acknowledge-mode=manual
之前說過手動應答可以呼叫basicAck,basicNack,basicReject方法,下面來講講。
手動確認訊息,當multiple為false,只確認當前的訊息。當multiple為true,批量確認所有比當前deliveryTag小的訊息。deliveryTag是用來標識Channel中投遞的訊息。RabbitMQ保證在每個Channel中,訊息的deliveryTag是從1遞增。

image.png
當消費端處理訊息異常時,我們可以選擇處理失敗訊息的方式。如果requeue為true,失敗訊息會重新進入Queue,試想一下,如果消費者在消費時發生異常,那麼就不會對這一次訊息進行ACK,進而發生回滾訊息的操作,使訊息始終放在Queue的頭部,然後不斷的被處理和回滾,導致佇列陷入死迴圈,為了解決這種問題,我們可以引入重試機制(當重試次數超過最大值,丟棄該訊息)或者是死信佇列+重試佇列。
requeue為false,丟棄該訊息。

image.png
和basicNack用法一樣。

image.png
為了配合Return機制,我們要配置 spring.rabbitmq.template.mandatory=true
。它的作用是在訊息沒有被路由到合適的佇列情況下,Broker會將訊息返回給生產者。當mandatory為true時,如果Exchange根據型別和訊息Routing Key無法路由到一個合適的Queue儲存訊息,那麼Broker會呼叫Basic.Return回撥給handleReturn(),再回調給ReturnCallback,將訊息返回給生產者。當mandatory為false時,丟棄該訊息。
@Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException { ReturnCallback returnCallback = this.returnCallback; if (returnCallback == null) { Object messageTagHeader = properties.getHeaders().remove(RETURN_CORRELATION_KEY); if (messageTagHeader != null) { String messageTag = messageTagHeader.toString(); final PendingReply pendingReply = this.replyHolder.get(messageTag); if (pendingReply != null) { returnCallback = new ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { pendingReply.returned(new AmqpMessageReturnedException("Message returned", message, replyCode, replyText, exchange, routingKey)); } }; } else if (logger.isWarnEnabled()) { logger.warn("Returned request message but caller has timed out"); } } else if (logger.isWarnEnabled()) { logger.warn("Returned message but no callback available"); } } if (returnCallback != null) { properties.getHeaders().remove(PublisherCallbackChannel.RETURN_CORRELATION_KEY); MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties( properties, null, this.encoding); Message returnedMessage = new Message(body, messageProperties); returnCallback.returnedMessage(returnedMessage, replyCode, replyText, exchange, routingKey); } }
當訊息路由不到合適的Queue,會在回撥給ReturnCallck這些資訊。

image.png
如果消費端忘記了ACK,這些訊息會一直處於Unacked 狀態。由於RabbitMQ訊息消費沒有超時機制,也就是程式不重啟,訊息會一直處於Unacked狀態。當消費端程式關閉時,這些處於Unack狀態的訊息會重新恢復成Ready狀態。這時候會出現一種情況:當消費端程式開啟時,由於Broker端積壓了大量的訊息,又可能會讓消費端崩潰。所以我們要對消費端進行限流處理。RabbitMQ提供了一種qos(Quality of Service,服務質量保證)功能,即在非自動ACK前提下,如果一定數量的訊息未被ACK前,不進行新訊息的訊息。

image.png
spring.rabbitmq.listener.simple.prefetch=1

image.png
下面貼訊息可靠性解決方案程式碼了。
配置任務排程中心
@Configuration @EnableScheduling public class TaskSchedulerConfig implements SchedulingConfigurer { protected ThreadPoolExecutor threadPoolExecutor; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskExecutor()); } @Bean(destroyMethod = "shutdown") public ThreadPoolExecutor taskExecutor() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("task-executor-pool-%d").build(); this.threadPoolExecutor = new ScheduledThreadPoolExecutor(10, namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); return threadPoolExecutor; } }
執行重新投遞status為0的訊息。這裡也可以使用corn表示式設定觸發任務排程的時間。關於fixedRate和fixedDelay概念總有人搞混。fixedRate任務兩次執行時間間隔是任務的開始點,而fixedDelay的間隔是前次任務的結束和下一次任務開始的間隔。
@Component @Slf4j public class RetryMessageTask { @Autowired private RabbitmqOrderSender rabbitmqOrderSender; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; @Scheduled(initialDelay = 5000, fixedDelay = 30000) public void trySendMessage() { log.info("定時投遞status為0的訊息..."); List<BrokerMessageLog> brokerMessageLogList = brokerMessageLogMapper.listStatusAndTimeoutMessage(); brokerMessageLogList.forEach(brokerMessageLog -> { if (brokerMessageLog.getTryCount() >= 3) { log.info("投遞3次還是失敗..."); brokerMessageLogMapper.updateMessageLogStatus(brokerMessageLog.getMessageId(), Constants.ORDER_SEND_FAIL, new Date()); } else { log.info("投遞失敗..."); brokerMessageLogMapper.updateReSendMessage(brokerMessageLog.getMessageId(), new Date()); Order order = JSON.parseObject(brokerMessageLog.getMessage(), Order.class); try { rabbitmqOrderSender.sendOrder(order); } catch (Exception e) { log.error("重新投遞訊息傳送異常...:" + e.getMessage()); } } }); } }
訊息生產端
@Component @Slf4j public class RabbitmqOrderSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; // 訊息傳送到交換器Exchange後觸發回撥 private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("生產端confirm..."); log.info("correlationData=" + correlationData); String messageId = correlationData.getId(); if (ack) { //confirm返回成功,更新訊息投遞狀態 brokerMessageLogMapper.updateMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date()); } else { // 失敗則進行具體的後續操作,重試或者補償等手段。 log.info("異常處理..."); } } }; // 如果訊息從交換器傳送到對應佇列失敗時觸發 private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("message=" + message.toString()); log.info("replyCode=" + replyCode); log.info("replyText=" + replyText); log.info("exchange=" + exchange); log.info("routingKey=" + routingKey); } }; public void sendOrder(Order order) { log.info("生產端傳送訊息..."); rabbitTemplate.setConfirmCallback(this.confirmCallback); rabbitTemplate.setReturnCallback(this.returnCallback); CorrelationData correlationData = new CorrelationData(order.getMessageId()); rabbitTemplate.convertAndSend(MQConfig.ORDER_DIRECT_EXCAHNGE, MQConfig.ORDER_QUEUE,order, correlationData); } }
訊息消費端
@Component @Slf4j public class RabbitmqOrderReceiver { @RabbitListener(queues = MQConfig.ORDER_QUEUE) public void receive(@Payload Order order, Channel channel, @Headers Map<String, Object> headers, Message message) throws IOException, InterruptedException { log.info("消費端接收訊息..."); log.info("message=" + message.toString()); log.info("order=" + order); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); log.info("deliveryTag=" + deliveryTag); // 手工ack channel.basicAck(deliveryTag, false); } }
當我們傳送訊息時,故意將Exchange設定成一個不存在的值。訊息路由不到合適的Exchange,Confirm機制回送的ACK會返回false,走異常處理。這個訊息的狀態不會更新成1。然後定時任務會拉取status為0的訊息,進行重新投遞,投遞了3次訊息還未成功,將status置為2。

image.png
接下來,我們測試一波。
@Test public void test() { Order order = new Order(); order.setId("36"); order.setName("cmazxiaoma測試訂單-36"); order.setMessageId(UUIDUtil.uuid()); rabbitmqOrderService.createOrder(order); }
訊息投遞失敗。

image.png
定時任務重新投遞訊息失敗。

image.png
將失敗的訊息重新投遞3次還是失敗。

image.png
更新Message db資訊,將重新投遞3次還是失敗的訊息狀態置為2。

image.png
接著我們把消費端手動ACK的程式碼註釋掉,再讓生產端傳送訊息。看看會出現什麼情況。

image.png
我們會發現Queue堆積了該訊息。

image.png
我們關掉RabbitMQ Server,看看此訊息是否會持久化。
[root@VM_0_11_centos log]# ps -ef|grep rabbitmq root13283 102910 13:42 pts/100:00:00 grep --color=auto rabbitmq root2305111 Nov06 ?00:09:29 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /root -- -pa /usr/local/rabbitmq/ebin -noshell -noinput -s rabbit boot -sname rabbit@VM_0_11_centos -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos.log"} -rabbit sasl_error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos-sasl.log"} -rabbit enabled_plugins_file "/usr/local/rabbitmq/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/rabbitmq/plugins" -rabbit plugins_expand_dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput [root@VM_0_11_centos log]# kill -9 23051
[root@VM_0_11_centos sbin]# rabbitmq-server -detached Warning: PID file not written; -detached was passed. [root@VM_0_11_centos sbin]# ps -ef|grep rabbitmq root135001 31 13:44 ?00:00:02 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /root -- -pa /usr/local/rabbitmq/ebin -noshell -noinput -s rabbit boot -sname rabbit@VM_0_11_centos -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos.log"} -rabbit sasl_error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos-sasl.log"} -rabbit enabled_plugins_file "/usr/local/rabbitmq/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/rabbitmq/plugins" -rabbit plugins_expand_dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput root13597 102910 13:44 pts/100:00:00 grep --color=auto rabbitmq
執行 rabbitmqctl list_queues name messages_ready messages_unacknowledged
命令,查詢Queue情況,發現Message持久化了。

image.png

image.png
斷開消費者程式,我們可以看到訊息從Unacked狀態轉換成Ready了。

image.png
尾言
不管是神還是惡魔都不會對不抗爭的人伸出援手