1. 程式人生 > >springboot + rabbitmq 用了訊息確認機制,感覺掉坑裡了

springboot + rabbitmq 用了訊息確認機制,感覺掉坑裡了

>本文收錄在個人部落格:[www.chengxy-nds.top](http://www.chengxy-nds.top),技術資源共享,一起進步 最近部門號召大夥多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這 T M 就是為了刷`KPI`。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術交流還是很有助於個人成長的。 於是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點`KPI`,就是想和大夥一起學習學習! ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200629153339271.png#pic_center) 這次我分享的是 `springboot` + `rabbitmq` 如何實現訊息確認機制,以及在實際開發中的一點踩坑經驗,其實整體的內容比較簡單,有時候事情就是這麼神奇,越是簡單的東西就越容易出錯。 可以看到使用了 `RabbitMQ` 以後,我們的業務鏈路明顯變長了,雖然做到了系統間的解耦,但可能造成訊息丟失的場景也增加了。例如: - 訊息生產者 - > rabbitmq伺服器(訊息傳送失敗) - rabbitmq伺服器自身故障導致訊息丟失 - 訊息消費者 - > rabbitmq服務(消費訊息失敗) ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/2020062917144650.png#pic_center) 所以說能不使用中介軟體就儘量不要用,如果為了用而用只會徒增煩惱。開啟訊息確認機制以後,儘管很大程度上保證了訊息的準確送達,但由於頻繁的確認互動,`rabbitmq` 整體效率變低,吞吐量下降嚴重,不是非常重要的訊息真心不建議你用訊息確認機制。 --- 下邊我們先來實現`springboot` + `rabbitmq`訊息確認機制,再對遇到的問題做具體分析。 ## 一、準備環境 #### 1、引入 rabbitmq 依賴包 ```javascript org.springframework.boot
spring-boot-starter-amqp
``` #### 2、修改 application.properties 配置 配置中需要開啟 `傳送端`和 `消費端` 的訊息確認。 ```javascript spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 傳送者開啟 confirm 確認機制 spring.rabbitmq.publisher-confirms=true # 傳送者開啟 return 確認機制 spring.rabbitmq.publisher-returns=true #################################################### # 設定消費端手動 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支援重試 spring.rabbitmq.listener.simple.retry.enabled=true ``` #### 3、定義 Exchange 和 Queue 定義交換機 `confirmTestExchange` 和佇列 `confirm_test_queue` ,並將佇列繫結在交換機上。 ```javascript @Configuration public class QueueConfig { @Bean(name = "confirmTestQueue") public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean(name = "confirmTestExchange") public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirmTestExchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue( @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); } } ``` >
`rabbitmq` 的訊息確認分為兩部分:傳送訊息確認 和 訊息接收確認。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630105111986.png) ## 二、訊息傳送確認 傳送訊息確認:用來確認生產者 `producer` 將訊息傳送到 `broker` ,`broker` 上的交換機 `exchange` 再投遞給佇列 `queue`的過程中,訊息是否成功投遞。 訊息從 `producer` 到 `rabbitmq broker`有一個 `confirmCallback` 確認模式。 訊息從 `exchange` 到 `queue` 投遞失敗有一個 `returnCallback` 退回模式。 我們可以利用這兩個`Callback`來確保消的100%送達。 #### 1、 ConfirmCallback確認模式 訊息只要被 `rabbitmq broker` 接收到就會觸發 `confirmCallback` 回撥 。 ```javascript @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("訊息傳送異常!"); } else { log.info("傳送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } } ``` 實現介面 `ConfirmCallback` ,重寫其`confirm()`方法,方法內有三個引數`correlationData`、`ack`、`cause`。 - `correlationData`:物件內部只有一個 `id` 屬性,用來表示當前訊息的唯一性。 - `ack`:訊息投遞到`broker` 的狀態,`true`表示成功。 - `cause`:表示投遞失敗的原因。 但訊息被 `broker` 接收到只能表示已經到達 MQ伺服器,並不能保證訊息一定會被投遞到目標 `queue` 裡。所以接下來需要用到 `returnCallback` 。 #### 2、 ReturnCallback 退回模式 如果訊息未能投遞到目標 `queue` 裡將觸發回撥 `returnCallback` ,一旦向 `queue` 投遞訊息未成功,這裡一般會記錄下當前訊息的詳細投遞資料,方便後續做重發或者補償等操作。 ```javascript @Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===>
replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } } ``` 實現介面`ReturnCallback`,重寫 `returnedMessage()` 方法,方法有五個引數`message`(訊息體)、`replyCode`(響應code)、`replyText`(響應內容)、`exchange`(交換機)、`routingKey`(佇列)。 下邊是具體的訊息傳送,在`rabbitTemplate`中設定 `Confirm` 和 `Return` 回撥,我們通過`setDeliveryMode()`對訊息做持久化處理,為了後續測試建立一個 `CorrelationData`物件,新增一個`id` 為`10000000000`。 ```javascript @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { /** * 確保訊息傳送失敗後可以重新返回到佇列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消費者確認收到訊息後,手動ack回執回撥處理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 訊息投遞到佇列失敗回撥處理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 傳送訊息 */ rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); } ``` ## 三、訊息接收確認 訊息接收確認要比訊息傳送確認簡單一點,因為只有一個訊息回執(`ack`)的過程。使用`@RabbitHandler`註解標註的方法要增加 `channel`(通道)、`message` 兩個引數。 ```javascript @Slf4j @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到訊息:{}", msg); //TODO 具體業務 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("訊息已重複處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕訊息 } else { log.error("訊息即將再次返回佇列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } } ``` 消費訊息有三種回執方法,我們來分析一下每種方法的含義。 #### 1、basicAck `basicAck`:表示成功確認,使用此回執方法後,訊息會被`rabbitmq broker` 刪除。 ```javascript void basicAck(long deliveryTag, boolean multiple) ``` `deliveryTag`:表示訊息投遞序號,每次消費訊息或者訊息重新投遞後,`deliveryTag`都會增加。手動訊息確認模式下,我們可以對指定`deliveryTag`的訊息進行`ack`、`nack`、`reject`等操作。 `multiple`:是否批量確認,值為 `true` 則會一次性 `ack`所有小於當前訊息 `deliveryTag` 的訊息。 **舉個栗子:** 假設我先發送三條訊息`deliveryTag`分別是5、6、7,可它們都沒有被確認,當我發第四條訊息此時`deliveryTag`為8,`multiple`設定為 true,會將5、6、7、8的訊息全部進行確認。 #### 2、basicNack `basicNack` :表示失敗確認,一般在消費訊息業務異常時用到此方法,可以將訊息重新投遞入佇列。 ```javascript void basicNack(long deliveryTag, boolean multiple, boolean requeue) ``` `deliveryTag`:表示訊息投遞序號。 `multiple`:是否批量確認。 `requeue`:值為 `true` 訊息將重新入佇列。 #### 3、basicReject `basicReject`:拒絕訊息,與`basicNack`區別在於不能進行批量操作,其他用法很相似。 ```javascript void basicReject(long deliveryTag, boolean requeue) ``` `deliveryTag`:表示訊息投遞序號。 `requeue`:值為 `true` 訊息將重新入佇列。 ## 四、測試 傳送訊息測試一下訊息確認機制是否生效,從執行結果上看傳送者發訊息後成功回撥,消費端成功的消費了訊息。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630165233425.png) 用抓包工具`Wireshark` 觀察一下`rabbitmq` amqp協議互動的變化,也多了 `ack` 的過程。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630170130977.png) ## 五、踩坑日誌 #### 1、不訊息確認 這是一個非常沒技術含量的坑,但卻是非常容易犯錯的地方。 開啟訊息確認機制,消費訊息別忘了`channel.basicAck`,否則訊息會一直存在,導致重複消費。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630173925436.png) #### 2、訊息無限投遞 在我最開始接觸訊息確認機制的時候,消費端程式碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯後確認訊息, `int a = 1 / 0` 發生異常後將訊息重新投入佇列。 ```javascript @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消費者 2 號收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } ``` 但是有個問題是,業務程式碼一旦出現 `bug` 99.9%的情況是不會自動修復,一條訊息會被無限投遞進佇列,消費端無限執行,導致了死迴圈。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630173009948.gif#pic_center) 本地的`CPU`被瞬間打滿了,大家可以想象一下當時在生產環境導致服務宕機,我是有多慌。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630172405717.png?) 而且`rabbitmq management` 只有一條未被確認的訊息。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200630173925436.png) 經過測試分析發現,當訊息重新投遞到訊息佇列時,這條訊息不會回到佇列尾部,仍是在佇列頭部。 消費者會立刻消費這條訊息,業務處理再丟擲異常,訊息再重新入隊,如此反覆進行。導致訊息佇列處理出現阻塞,導致正常訊息也無法執行。 而我們當時的解決方案是,先將訊息進行應答,此時訊息佇列會刪除該條訊息,同時我們再次傳送該訊息到訊息佇列,異常訊息就放在了訊息佇列尾部,這樣既保證訊息不會丟失,又保證了正常業務的進行。 ```javascript channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 重新發送訊息到隊尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(msg)); ``` 但這種方法並沒有解決根本問題,錯誤訊息還是會時不時報錯,後面優化設定了訊息重試次數,達到了重試上限以後,手動確認,佇列刪除此訊息,並將訊息持久化入`MySQL`並推送報警,進行人工處理和定時任務做補償。 #### 3、重複消費 如何保證 MQ 的消費是冪等性,這個需要根據具體業務而定,可以藉助`MySQL`、或者`redis` 將訊息持久化,通過再訊息中的唯一性屬性校驗。 >`demo`的 `GitHub` 地址 https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm --- 原創不易,燃燒秀髮輸出內容,如果有一丟丟收穫,點個再看鼓勵一下吧! >整理了幾百本各類技術電子書,送給小夥伴們。關注公號回覆【`666`】自行領取。和一些小夥伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就掃碼加入我們吧! ![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly91c2VyLWdvbGQtY2RuLnhpdHUuaW8vMjAyMC8yLzQvMTcwMGU0Mjk1MDQzMjQ0Yg?x-oss-process=image/form