分散式事務:訊息可靠傳送
接上文分散式事務:基於可靠訊息服務 介紹了整體中介軟體的設計思路,有些內容沒有展開。故此,本文詳細講解下如何將訊息可靠傳送到Rabbitmq。
在上文簡單提到了如何將訊息進行可靠傳送,因為shine-mq
是無縫整合spring-boot-starter
的,所以rabbitmq的操作也是基於spring的rabbitTemplate
來完成的。
rabbitTemplate
提供了setConfirmCallback
方法,可以在訊息傳送到RabbitMQ交換器後,進行ack的回撥。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { ... //訊息能投入正確的訊息佇列,並持久化,返回的ack為true if (ack) { log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData); ... } ... }); 複製程式碼
在此之前還需要設定CachingConnectionFactory
//設定生成者確認機制 rabbitConnectionFactory.setPublisherConfirms(true); 複製程式碼
如果還需要設定setReturnCallback
(訊息傳送到RabbitMQ交換器,但無相應Exchange時的回撥),那就還需要設定rabbitTemplate
//使用return-callback時必須設定mandatory為true rabbitTemplate.setMandatory(true); 複製程式碼
熟悉Rabbitmq的同學可能知道,Rabbitmq有兩種機制來實現訊息的可靠釋出。
- 通過事務機制,這個上篇文章分析過,在這個模式下,rabbitmq的效率很低,不適合。
-
Confirm模式,這個模式下會有三種方式,分別是:
rabbitTemplate
所以我們知道了rabbitTemplate
提供的確認機制是一種非同步機制,並不能同步的發現問題,也就是說在極端的網路條件下是會出現訊息丟失的。
所以shine-mq
通過增加一個Coordinator
(協調者)來實現。Coordinator
會儲存2個狀態,一個是prepare(攜帶回查id
),這個狀態在前文說過是用來保證上游服務的任務狀態的。
而另一個狀態ready,就是來保證訊息的可靠投遞。
首先shine-mq
是使用@DistributedTrans
來開啟。在這個註解的切面裡,先持久化ready狀態。
@Around(value = "@annotation(trans)") public void around(ProceedingJoinPoint pjp, DistributedTrans trans) throws Throwable { ... try { EventMessage message = new EventMessage(exchange, routeKey, SendTypeEnum.DISTRIBUTED.toString(), checkBackId, coordinatorName, msgId); //將訊息持久化 coordinator.setReady(msgId, checkBackId.toString(), message); rabbitmqFactory.setCorrelationData(msgId, coordinatorName, message, null); rabbitmqFactory.addDLX(exchange, exchange, routeKey, null, null); if (flag) { rabbitmqFactory.add(MqConstant.DEAD_LETTER_QUEUE, MqConstant.DEAD_LETTER_EXCHANGE, MqConstant.DEAD_LETTER_ROUTEKEY, null, null); flag = false; } rabbitmqFactory.getTemplate().send(message, 0, 0, SendTypeEnum.DISTRIBUTED); } catch (Exception e) { log.error("Message failed to be sent : ", e); throw e; } } 複製程式碼
然後在回撥中刪除該狀態:
//訊息傳送到RabbitMQ交換器後接收ack回撥 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (correlationData != null) { log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause); String msgId = correlationData.getId(); CorrelationDataExt ext = (CorrelationDataExt) correlationData; Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator()); //可以自定義實現的回撥 coordinator.confirmCallback(correlationData, ack); //訊息能投入正確的訊息佇列,並持久化,返回的ack為true if (ack) { log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData); //刪除ready狀態 coordinator.delStatus(msgId); } else { ... } } }); 複製程式碼
因為儲存ready是在上游服務任務執行之後的,所以只要有超時的ready記錄未被清理掉,daemon(守護執行緒)
只管撈起來進行重發就行,因為Mq的可靠性投遞就已經要求下游服務是需要保證冪等性了。
最後還有個極端的情況,就是ready訊息儲存的時候因為網路抖動該訊息丟失了,這時候也沒有關係,因為有prepare狀態會進行回查,該狀態只有在ready儲存後才會觸發刪除。
如果對你有幫助,那就幫忙點個星星把 ^.^
github地址: github.com/7le/shine-m…
Github 不要吝嗇你的star ^.^更多精彩 戳我