1. 程式人生 > >Rabbitmq可靠訊息投遞,訊息確認機制

Rabbitmq可靠訊息投遞,訊息確認機制

### 前言 我們知道,訊息從傳送到簽收的整個過程是 `Producer-->Broker/Exchange-->Broker/Queue-->Consumer`,因此如果只是要保證訊息的可靠投遞,我們需要考慮的僅是前兩個階段,因為訊息只要成功到達佇列,就算投遞成功。 - 比如投遞訊息時指定的Exchange不存在,那麼階段一就會失敗 - 如果投遞到Exchange成功,但是指定的路由件錯誤或者別的原因,訊息沒有從Exchange到達Queue,那就是第二階段出錯。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210118113550559.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d3YW5nX2Rldg==,size_16,color_FFFFFF,t_70#pic_center) 而從生產者和消費者角度來看,訊息成功投遞到佇列才算成功投遞,因此階段一和階段而都屬於生產者一方需要關注,階段三屬於消費者一方,這裡只考慮訊息的成功投遞,因此不考慮消費者的簽收部分。而Rabbitmq和springboot整合時,預設是**沒有開啟**訊息確認的。 ### 開啟訊息確認機制 #### 一、Producer --> Broker/Exchange ConfirmCallback ##### 1. 配置 ```shell # springboot2.2.0以前, # spring.rabbitmq.publisher-confirms=true # springboot2.2.0以後 spring.rabbitmq.publisher-confirm-type=correlated ``` 關於這個Type有三種取值: - `none`:預設值,不開啟confirmcallback機制 - `correlated`:開啟confirmcallback,釋出訊息時,可以指定一個CorrelationData,會被儲存到訊息頭中,訊息投遞到Broekr時觸發生產者指定的ConfirmCallback,這個值也會被返回,以進行對照處理,CorrelationData可以包含比較豐富的元資訊進行回撥邏輯的處理。無特殊需求,就設定為這個值。 - `simple`:這個比較複雜,[spring官方](https://docs.spring.io/spring-amqp/docs/current/reference/html/#scoped-operations)指出: > Normally, when using the template, a Channel is checked out of the cache (or created), used for the operation, and returned to the cache for reuse. In a multi-threaded environment, there is no guarantee that the next operation uses the same channel. There may be times, however, where you want to have more control over the use of a channel and ensure that a number of operations are all performed on the same channel. 2529 / 5000 翻譯結果 通常,使用模板時,會從快取中檢出(或建立)通道,以進行操作,然後將其返回到快取中以進行重用。在多執行緒環境中,不能保證下一個操作使用相同的通道。但是,有時您可能希望更好地控制通道的使用,並確保在同一通道上執行全部操作。 也就說這,這個`simple`模式:其一效果和`correlated`值一樣能觸發回撥方法,其二用於釋出訊息成功後使用rabbitTemplate呼叫`waitForConfirms`或`waitForConfirmsOrDie`方法等待`broker`節點返回傳送結果,需求根據返回結果來判定下一步的邏輯,執行更復雜的業務。要注意的點是`waitForConfirmsOrDie`方法如果返回false則會關閉`channel`,則接下來無法傳送訊息。 ##### 2. 如何使用 `SpringBoot`自動配置幫我們往容器中註冊了一個`RabbitTemplate`,但因為預設沒有開啟訊息確認機制,因此它在建立時並未配置`confirmCallback`屬性,我們需要手動為其建立一個 `RabbitTemplate.ConfirmCallback`。 因此我們需要拿到這個建立好的`RabbitTemplate`,再手動執行其`setConfirmCallback`方法。 拿到這個物件簡單,只要一個`@Autowired`,問題是我們要保證之後通過`@Autowired`拿到的`RabbitTemplate`是已經註冊了`ConfirmCallback`的。 我們手寫一個配置類/或者隨便什麼類,加上註解`@Component/@Service/@Controller/@Configuration`,無論哪個,只要能被自動建立並加入容器,然後我們寫一個方法,加上`@PostConstructor`表示建立這個物件完成時需要回調這個方法,我們在這個類中拿到`RabbitTemplate`,在這個方法中執行它的`setConfirmCallback`,這樣spring容器在建立我們這個配置類的時候將創好的`RabbitTemplate`進行了完善,而整個過程都是在spring'boot啟動過程中自動完成,就能保證我們之後使用`@Autowired`拿到的`RabbitTemplate`就是註冊號`confirmCallback`的。 ```java @Configuration @Slf4j public class RabbitConfig { @Autowired RabbitTemplate rabbitTemplate; // 方法名無所謂,主要是 @PostConstruct 指定它一定會被回撥 @PostConstruct public void setCallback() { /** * 為容器建立好的rabbitTemplate註冊confirmCallback * 訊息由生產者投遞到Broker/Exchange回撥 */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 傳送訊息時指定的唯一關聯資料(訊息id) * @param ack 這個訊息是否成功投遞到Exchange * @param cause 失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("訊息投遞到交換機成功:[correlationData={}]",correlationData); } else { log.error("訊息投遞到交換機失敗:[correlationData={},原因:{}]", correlationData, cause); } } }); } } ``` #### 二、Exchange-->Queue ConfirmCallback ##### 1. 配置 - 注意下面兩項**必須**同時配置,可以嘗試不配置第二項,通過測試能夠發現當訊息路由到Queue失敗(比如路由件錯誤)時,returnCallback並未被回撥。 ```shell # 開啟階段二(訊息從E->Q)的確認回撥 Exchange --> Queue returnCallback spring.rabbitmq.publisher-returns=true # 官方文件說此時這一項必須設定為true # 實際上這一項的作用是:訊息【未成功到達】佇列時,能監聽到到路由不可達的訊息,以非同步方式優先呼叫我們自己設定的returnCallback,預設情況下,這個訊息會被直接丟棄,無法監聽到 spring.rabbitmq.template.mandatory=true ``` ##### 2. 如何使用 和註冊confirmCallback的原理一樣,就不多贅述,直接看配置,需要注意的是 **這個回撥只會在訊息在從Exchange投遞到Queue【失敗】時被執行**。 ```java @Configuration @Slf4j public class RabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void setCallback() { rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 能夠看出來,引數中並沒有像confirmCallback那樣提供的boolean型別 的ack,因此這個回撥只是在【失敗】情況下觸發的 * @param message 傳送的訊息 * @param replyCode 回覆錯誤碼 * @param replyText 回覆錯誤內容 * @param exchange 傳送訊息時指定的交換機 * @param routingKey 傳送訊息時使用的路由件 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("路由到佇列失敗,訊息內容:{},交換機:{},路由件:{},回覆碼:{},回覆文字:{}", message, exchange, routingKey, replyCode, replyText); } }); }