1. 程式人生 > >RocketMQ原理(4)——訊息ACK機制及消費進度管理

RocketMQ原理(4)——訊息ACK機制及消費進度管理

https://zhuanlan.zhihu.com/p/25265380

consumer的每個例項是靠佇列分配來決定如何消費訊息的。那麼消費進度具體是如何管理的,又是如何保證訊息成功消費的(RocketMQ有保證訊息肯定消費成功的特性(失敗則重試)?

本文將詳細解析訊息具體是如何ack的,又是如何保證消費肯定成功的。

由於以上工作所有的機制都實現在PushConsumer中,所以本文的原理均只適用於RocketMQ中的PushConsumer即Java客戶端中的DefaultPushConsumer。 若使用了PullConsumer模式,類似的工作如何ack,如何保證消費等均需要使用方自己實現。

注:廣播消費和叢集消費的處理有部分區別,以下均特指叢集消費(CLSUTER),廣播(BROADCASTING)下部分可能不適用。

保證消費成功

PushConsumer為了保證訊息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為訊息消費成功。中途斷電,丟擲異常等都不會認為成功——即都會重新投遞。

消費的時候,我們需要注入一個消費回撥,具體sample程式碼如下:

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            doMyJob();//執行真正消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

業務實現消費回撥的時候,當且僅當此回撥函式返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批訊息(預設是1條)是消費完成的。(具體如何ACK見後面章節)

如果這時候訊息消費失敗,例如資料庫異常,餘額不足扣款失敗等一切業務認為訊息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批訊息消費失敗了。

為了保證訊息是肯定被至少消費成功一次,RocketMQ會把這批訊息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(預設是10秒,業務可設定)後,再次投遞到這個ConsumerGroup。而如果一直這樣重複消費都持續失敗到一定次數(預設16次),就會投遞到DLQ死信佇列。應用可以監控死信佇列來做人工干預。

注:

  1. 如果業務的回撥沒有處理好而丟擲異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。
  2. 當使用順序消費的回撥MessageListenerOrderly時,由於順序消費是要前者消費成功才能繼續消費,所以沒有RECONSUME_LATER的這個狀態,只有SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停佇列的其餘消費,直到原訊息不斷重試成功為止才能繼續消費。

啟動的時候從哪裡消費

當新例項啟動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度(consumer offset),按照這個進度發起自己的第一次Pull請求。

如果這個消費進度在Broker並沒有儲存起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:

CONSUME_FROM_LAST_OFFSET //預設策略,從該佇列最尾開始消費,即跳過歷史訊息
CONSUME_FROM_FIRST_OFFSET //從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前

所以,社群中經常有人問:“為什麼我設了CONSUME_FROM_LAST_OFFSET,歷史的訊息還是被消費了”? 原因就在於只有全新的消費組才會使用到這些策略,老的消費組都是按已經儲存過的消費進度繼續消費。

對於老消費組想跳過歷史訊息可以採用以下兩種方法:

  1. 程式碼按照日期判斷,太老的訊息直接return CONSUME_SUCCESS過濾。
  2. 程式碼判斷訊息的offset和MAX_OFFSET相差很遠,認為是積壓了很多,直接return CONSUME_SUCCESS過濾。
  3. 消費者啟動前,先調整該消費組的消費進度,再開始消費。可以人工使用命令resetOffsetByTime,或呼叫內部的運維介面,祥見ResetOffsetByTimeCommand.java

訊息ACK機制

RocketMQ是以consumer group+queue為單位是管理消費進度的,以一個consumer offset標記這個這個消費組在這條queue上的消費進度。

如果某已存在的消費組出現了新消費例項的時候,依靠這個組的消費進度,就可以判斷第一次是從哪裡開始拉取的。

每次訊息成功後,本地的消費進度會被更新,然後由定時器定時同步到broker,以此持久化消費進度。

但是每次記錄消費進度的時候,只會把一批訊息中最小的offset值為消費進度值,如下圖:



這鐘方式和傳統的一條message單獨ack的方式有本質的區別。效能上提升的同時,會帶來一個潛在的重複問題——由於消費進度只是記錄了一個下標,就可能出現拉取了100條訊息如 2101-2200的訊息,後面99條都消費結束了,只有2101消費一直沒有結束的情況。

在這種情況下,RocketMQ為了保證訊息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才會一下子更新到2200。

在這種設計下,就有消費大量重複的風險。如2101在還沒有消費完成的時候消費例項突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的例項的時候,新的例項從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批訊息實際上已經被消費過還是會投遞一次。

對於這個場景,3.2.6之前的RocketMQ無能為力,所以業務必須要保證訊息消費的冪等性,這也是RocketMQ官方多次強調的態度。

實際上,從原始碼的角度上看,RocketMQ可能是考慮過這個問題的,截止到3.2.6的版本的原始碼中,可以看到為了緩解這個問題的影響面,DefaultMQPushConsumer中有個配置consumeConcurrentlyMaxSpan

/**
 * Concurrently max span offset.it has no effect on sequential consumption
 */
private int consumeConcurrentlyMaxSpan = 2000;

這個值預設是2000,當RocketMQ發現本地快取的訊息的最大值-最小值差距大於這個值(2000)的時候,會觸發流控——也就是說如果頭尾都卡住了部分訊息,達到了這個閾值就不再拉取訊息。

但作用實際很有限,像剛剛這個例子,2101的消費是死迴圈,其他消費非常正常的話,是無能為力的。一旦退出,在不人工干預的情況下,2101後所有訊息全部重複。

Ack卡進度解決方案

對於這個卡消費進度的問題,最顯而易見的解法是設定一個超時時間,達到超時時間的那個消費當作消費失敗處理。

後來RocketMQ顯然也發現了這個問題,而RocketMQ在3.5.8之後也就是採用這樣的方案去解決這個問題。

  1. 在pushConsumer中 有一個consumeTimeout欄位(預設15分鐘),用於設定最大的消費超時時間。消費前會記錄一個消費的開始時間,後面用於比對。
  2. 消費者啟動的時候,會定期掃描所有消費的訊息,達到這個timeout的那些訊息,就會觸發sendBack並ack的操作。這裡掃描的間隔也是consumeTimeout(單位分鐘)的間隔。

核心原始碼如下:

//ConsumeMessageConcurrentlyService.java
public void start() {
    this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            cleanExpireMsg();
        }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
    Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<MessageQueue, ProcessQueue> next = it.next();
        ProcessQueue pq = next.getValue();
        pq.cleanExpiredMsg(this.defaultMQPushConsumer);
    }
}

//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
    if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
        return;
    }

    int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
    for (int i = 0; i < loop; i++) {
        MessageExt msg = null;
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                    msg = msgTreeMap.firstEntry().getValue();
                } else {

                    break;
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getExpiredMsg exception", e);
        }

        try {

            pushConsumer.sendMessageBack(msg, 3);
            log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                        try {
                            msgTreeMap.remove(msgTreeMap.firstKey());
                        } catch (Exception e) {
                            log.error("send expired msg exception", e);
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }
        } catch (Exception e) {
            log.error("send expired msg exception", e);
        }
    }
}

通過原始碼看這個方案,其實可以看出有幾個不太完善的問題:

  1. 消費timeout的時間非常不精確。由於掃描的間隔是15分鐘,所以實際上觸發的時候,訊息是有可能卡住了接近30分鐘(15*2)才被清理。
  2. 由於定時器一啟動就開始排程了,中途這個consumeTimeout再更新也不會生效。