1. 程式人生 > >RocketMQ-訊息重試,訊息冪等去重,訊息模式

RocketMQ-訊息重試,訊息冪等去重,訊息模式

訊息重試

Rocketmq提供了訊息重試機制,這是一些其他訊息佇列沒有的功能。我們可以依靠這個優秀的機制,而不用在開發中增加更多的業務程式碼去實現

Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer 消費訊息失敗通常可以認為有以下幾種情況

    由於訊息本身的原因,例如反序列化失敗,訊息資料本身無法處理(例如話費充值,當前訊息的手機號被登出,無法充值)等。
    這種錯誤通常需要跳過這條訊息,再消費其他訊息,而這條失敗的訊息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過 10s 秒後再重試。

    由於依賴的下游應用服務不可用,例如 db 連線不可用,外系統網路不可達等。遇到這種錯誤,即使跳過當前失敗的訊息,消費其他訊息同樣也會報錯。
    這種情況建議應用 sleep 30s,再消費下一條訊息,這樣可以減輕 Broker 重試訊息的壓力。

訊息重試可以分為生產者和消費者兩端

生產者:訊息重投重試(保證資料的高可靠性)

    Producer 的 send 方法本身支援內部重試,重試邏輯如下

    1

這裡寫圖片描述
這裡寫圖片描述
以上是官方3.2.6版本使用者指南給出的方案

消費者:訊息處理異常(broker端到consumer端各種問題,比如網路原因閃斷,消費處理失敗,ACK返回失敗等等問題)

我們更多的關注點在於消費者這邊的訊息重試。消費處理失敗的情況需要進行訊息重試,如果是網路原因閃斷或者ACK返回失敗等原因,涉及到rocketmq前面說的叢集模式,涉及到了訊息去重。

如果有關注消費者中註冊訊息監聽器MessageListenerConcurrently中重寫的consumeMessage就會發現返回值是ConsumeConcurrentlyStatus(順序消費的話,返回值不同)。就會發現該類有兩個常量屬性ConsumeConcurrentlyStatus.CONSUME_SUCCESS和ConsumeConcurrentlyStatus.RECONSUME_LATER

你可以try{}catch(){}訊息消費的過程,如果業務上出現任何消費失敗的情況下,catch到後,返回ConsumeConcurrentlyStatus.RECONSUME_LATER。consumer會將該ACK狀態碼返回給broker,broker便會稍後在進行訊息傳送。

訊息重試是有指定時間了,預設第一次1s 第二次5s,以次類推給出的訊息重試的時間間隔為:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    1

我們可以有兩種方式得到訊息已經重試多少次,從而對其進行業務判斷,是否終止訊息的重試

    從在consumeMessage中獲取的Message物件中呼叫getReconsumeTimes()方法,即msg.getReconsumeTimes()方法

    MessageConst類有定義很多與訊息相關的常量,比如訊息原始id(PROPERTY_ORIGIN_MESSAGE_ID),比如訊息重試的次數(PROPERTY_RECONSUME_TIME)
    所以我們可以呼叫msg.getProperties().get()方法,傳入常量鍵名,從眾多的properties中獲取咱們需要的屬性

訊息冪等,去重

broker不可避免會發送重複的訊息給consumer。比如網路原因閃斷,ACK返回失敗等情況出現,將會導致訊息重複。consumer必須保證處理的訊息時唯一性

訊息重複消費的原因

在於回饋機制。正常情況下,消費者在消費訊息時候,消費完畢後,會發送一個ACK確認資訊給訊息佇列(broker),訊息佇列(broker)就知道該訊息被消費了,就會將該訊息從訊息佇列中刪除。

不同的訊息佇列傳送的確認資訊形式不同,例如RabbitMQ是傳送一個ACK確認訊息,RocketMQ是返回一個CONSUME_SUCCESS成功標誌,kafka實際上有個offset的概念。

造成重複消費的原因?,就是因為網路原因閃斷,ACK返回失敗等等故障,確認資訊沒有傳送到訊息佇列,導致訊息佇列不知道自己已經消費過該訊息了,再次將該訊息分發給其他的消費者。(因為訊息重試等機制的原因,如果一個consumer斷了,rocketmq有consumer叢集,會將該訊息重新發給其他consumer)

訊息去重

去重原則:1.冪等性 2.業務去重

冪等性:(處理必須唯一) 無論這個業務請求被(consumer)執行多少次,我們的資料庫的結果都是唯一的,不可變的。

去重策略:去重表機制,業務拼接去重策略(比如唯一流水號)

1.建立一個訊息表,拿到這個訊息做資料庫的insert操作。給這個訊息做一個唯一主鍵(primary key)或者唯一約束,那麼就算出現重複消費的情況,就會導致主鍵衝突。

高併發下去重:採用Redis去重(key天然支援原子性並要求不可重複),但是由於不在一個事務,要求有適當的補償策略

2.利用redis事務,主鍵(我們必須把全量的操作資料都存放在redis裡,然後定時去和資料庫做資料同步)—-即消費處理後,該處理本來應該儲存在資料庫的,先儲存在redis
3.利用redis和關係型資料庫一起做去重機制

4.拿到這個訊息做redis的set的操作.redis就是天然冪等性
5.準備一個第三方介質,來做消費記錄。以redis為例,給訊息分配一個全域性id,只要消費過該訊息,將 < id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。
訊息模式

RocketMQ不遵循JMS規範,可以理解為沒有類似於ActiveMQ的createQueue和createTopic語法,也就是沒有點對點和釋出訂閱模型。但是支援叢集和廣播兩種消費模式

前面有講過關於rocketmq的叢集模式和廣播模式,他們的設定都是在consumer端設定其messageModel屬性

叢集模式:設定消費端物件屬性:MessageModel.CLUSTERING,這種方式可以達到類似於ActiveMQ水平擴充套件負責均衡消費訊息的實現,但是不一樣的是它是天然負載均衡的。該模式可以先啟動生產端,再啟動消費端,消費端仍然可以消費到生產端的訊息,不過時間不一定.預設該模式

廣播模式:設定消費端物件屬性:MessageModel.BROADCASTING,這種模式就是相當於生產端傳送資料到MQ,多個消費端都可以獲得到資料。這個模式消費端必須先開啟

GroupName,無論是生產端還是消費端,都必須指定一個GroupName,這個組名稱,應用於維護應用系統級別上的。比如生產端一定同一個ProducerGroupName,應用系統會保證唯一性,這個組下的Producer通常傳送一類訊息,且傳送邏輯一致。同理消費端也如此

Topic主題,每個主題代表一個邏輯上儲存的概念。在MQ上,會有多個與之對應的Queue佇列,這個是物理儲存的概念

String group_name = "message_consumer";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(你的namesrvAddr);
consumer.subscribe("TopicTest","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
---------------------
作者:沒有不憂傷的故事
來源:CSDN
原文:https://blog.csdn.net/qq_32020035/article/details/82113751
版權宣告:本文為博主原創文章,轉載請附上博文連結!