1. 程式人生 > >原始碼分析RocketMQ之訊息ACK機制(消費進度)

原始碼分析RocketMQ之訊息ACK機制(消費進度)

首先簡要闡述一下訊息消費進度首先消費者訂閱訊息消費佇列(MessageQueue),當生產者將訊息負載傳送到MessageQueue中時,消費訂閱者開始消費訊息,訊息消費過程中,為了避免重複消費,需要一個地方儲存消費進度(消費偏移量)。訊息模式主要分為叢集模式、廣播模式叢集模式:一條訊息被叢集中任何一個消費者消費廣播模式:每條訊息都被每一個消費者消費。

廣播模式,既然每條訊息要被每一個消費者消費,則消費進度可以與消費者儲存在一起,也就是本地儲存,但由於叢集模式下,一條訊息只能被叢集內的一個消費者消費,進度不能儲存在消費端,只能集中儲存在一個地方,比較合適的是在Broker端。接下來我們先分析一下訊息消費進度介面:OffsetStore

/**
 * Offset store interface
 */
public interface OffsetStore {
    /**
     * Load
     *
     * @throws MQClientException
     */
    void load() throws MQClientException;

    /**
     * Update the offset,store it in memory
     *
     * @param mq
     * @param offset
     * @param increaseOnly
     */
    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

    /**
     * Get offset from local storage
     *
     * @param mq
     * @param type
     * @return The fetched offset
     */
    long readOffset(final MessageQueue mq, final ReadOffsetType type);

    /**
     * Persist all offsets,may be in local storage or remote name server
     *
     * @param mqs
     */
    void persistAll(final Set<MessageQueue> mqs);

    /**
     * Persist the offset,may be in local storage or remote name server
     *
     * @param mq
     */
    void persist(final MessageQueue mq);

    /**
     * Remove offset
     *
     * @param mq
     */
    void removeOffset(MessageQueue mq);

    /**
     * @param topic
     * @return The cloned offset table of given topic
     */
    Map<MessageQueue, Long> cloneOffsetTable(String topic);

    /**
     * @param mq
     * @param offset
     * @param isOneway
     */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException;
}
入口程式碼:DefaultMQPushConsumerImpl#start()

根據訊息消費模式(叢集模式、廣播模式)會建立不同的OffsetStore方式。由於上篇文章,談到廣播模式訊息,如果返回CONSUME_LATER,竟然不會重試,而是直接丟棄,為什麼呢?由於這個原因,這次破天荒的從廣播模式的OffsetStore開始學習。1、LocalFileOffsetStore (廣播模式) 訊息進度以本地檔案方式儲存。 原始碼路徑:org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore 1.1 核心屬性與建構函式

LOCAL_OFFSET_STORE_DIR : offset儲存根目錄,預設為使用者主目錄,例如 /home/dingw,可以在消費者啟動的JVM引數中,通過-Drocketmq.client.localOffsetStoreDir=路徑 groupName : 消費組名稱 storePath : 具體的消費進度儲存檔名(全路徑) offsetTable 記憶體中的offfset進度保持,以MessageQueue為鍵,偏移量為值繼續看一下建構函式:
LocalFileOffsetStore 首先在DefaultMQPushConsumerImpl#start方法中創,並 執行load方法載入消費進度。接下來結束一下幾個關鍵的實現方法1.2 load()方法
public void load() throws MQClientException {
        OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
        if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
            offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

            for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
                AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                log.info("load consumer's offset, {} {} {}",
                    this.groupName,
                    mq,
                    offset.get());
            }
        }
    }
該方法,主要就是讀取offsets.json或offsets.json.bak中的內容,然後將json轉換成map

然後更新或獲取訊息佇列的消費進度,就是從記憶體(Map)或store中獲取,接下來看一下初次儲存offsets.json檔案
@Override
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            if (mqs.contains(entry.getKey())) {
                AtomicLong offset = entry.getValue();
                offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
            }
        }

        String jsonString = offsetSerializeWrapper.toJson(true);
        if (jsonString != null) {
            try {
                MixAll.string2File(jsonString, this.storePath);
            } catch (IOException e) {
                log.error("persistAll consumer offset Exception, " + this.storePath, e);
            }
        }
    }
儲存邏輯很簡單,就沒必要一一分析,重點看一下,該方法在什麼時候呼叫:【MQClientInstance#startScheduledTask
} }儲存邏輯很簡單,就沒必要一一分析,重點看一下,該方法在什麼時候呼叫:【MQClientInstance#startScheduledTask順藤摸瓜,原因是一個定時任務,預設消費端啟動10秒後,每隔5s的頻率持久化一次廣播模式消費進度儲存容易,但其實還是不明白為什麼RocketMQ廣播模式,如果消費失敗,則丟棄,因為廣播模式有時候也必須確保每個消費者都成功消費,,通常的場景為,通過MQ重新整理本地快取等。2、叢集模式消費進度儲存(RemoteBrokerOffsetStore)在閱讀RemoteBrokerOffsetStore之前,我們先思考一下如下幾個問題:在叢集模式下,多個消費者會負載到不同的消費佇列上,因為訊息消費進度是基於訊息佇列進行儲存的,也就是不同的消費者之間的消費進度儲存是不會存在併發的,但是在同一個消費者,非順序訊息消費時,一個消費者(多個執行緒)併發消費訊息,比如m1 < m2,,但m2先消費完,此時是如何儲存的消費進度呢?舉個例子,如果m2的offset為5,而m1的offset為4,如果m2先消費完,儲存進度為5,那m1訊息消費完,儲存進度為4,這樣豈不亂來了。2.1 RemoteBrokerOffsetStore 核心屬性
private final static Logger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;      // MQ客戶端例項,該例項被同一個客戶端的消費者、生產者共用
    private final String groupName;                                 // MQ消費組
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();    // 消費進度儲存(記憶體中)

    public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {   // 構造方法
        this.mQClientFactory = mQClientFactory;
        this.groupName = groupName;
    }
有了廣播模式,本地訊息消費進度儲存等基本認識之後,我們重點來關注RemoteBrokerOffsetStore 核心方法2.2 updateOffset 更新offset
@Override
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {     // @1
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));    // @2
            }

            if (null != offsetOld) {   // @3
                if (increaseOnly) {          
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);   // @4
                } else {
                    offsetOld.set(offset);    // @5
                }
            }
        }
    }
程式碼@1:如果當前並沒有儲存該mq的offset,則把傳入的offset放入記憶體中(map)程式碼@3:如果offsetOld不為空,這裡如果不為空,說明同時對一個MQ消費佇列進行消費,併發執行程式碼@4,@5,根據increaseOnly更新原先的offsetOld的值,這個值是個區域性變數,但這裡到底有什麼用呢?2.3 readOffset 根據讀取來源,讀取消費佇列的消費進度
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
        if (mq != null) {
            switch (type) {
                case MEMORY_FIRST_THEN_STORE:                        // 先從記憶體中讀取,如果記憶體中不存在,再嘗試從磁碟中讀取                     
                case READ_FROM_MEMORY: {                                    // 從記憶體中讀取
                    AtomicLong offset = this.offsetTable.get(mq);
                    if (offset != null) {
                        return offset.get();
                    } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                        return -1;
                    }
                }
                case READ_FROM_STORE: {                                         // 從磁碟中讀取
                    try {
                        long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);     
                        AtomicLong offset = new AtomicLong(brokerOffset);
                        this.updateOffset(mq, offset.get(), false);
                        return brokerOffset;
                    }
                    // No offset in broker
                    catch (MQBrokerException e) {
                        return -1;
                    }
                    //Other exceptions
                    catch (Exception e) {
                        log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                        return -2;
                    }
                }
                default:
                    break;
            }
        }

        return -1;
    }
這裡主要關注從磁碟中讀取消費進度,核心入口方法:fetchConsumeOffsetFromBroker
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {

            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if (findBrokerResult != null) {
            QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());

            return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }
這裡,主要是首先根據mq的broker名稱獲取broker地址,然後傳送請求,我們重點關注一下消費進度是儲存在broker哪個地方:Broker端的offset管理參照 ConsumerOffsetManager,,儲存邏輯其實與廣播模式差不多,就不深入研究了,重點說一下offset儲存的路徑:/rocketmq_home/store/config/consumerOffset.json綜上所述,我們瞭解到的情況是,廣播模式,存放在消費者本地,叢集模式,儲存在Broker,儲存檔案,存放的是JSON。也就是OffsetStore提供儲存消費進度方法,也就是 {“consumeGroup" : [ {”ConsumeQueue1“:offset} ] }現在我們思考如下問題:下面討論還是基於非順序訊息: 1、叢集模式,一個消費組是多個執行緒消費該佇列中的訊息,併發執行,例如在q1中存在 m1,m2,m3,m4,m5 最後消費成功的順序有可能是 m1,m3,m2,m5,m4,如果消費訊息,就將該訊息的offset存入offset中,豈不是會亂,如果一批拉取了多條訊息,消費進度是如何儲存的。要解決上述問題,我們移步到到呼叫offsetStore.updateStore方法,重點看一下那塊邏輯:ConsumeMessageConcurrentlyService#processConsumeResult
也就是訊息處理後,然後移除該批處理訊息,然後返回要更新的offset。那我們重點看一下removeMessage方法:
public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
            log.error("removeMessage exception", t);
        }

        return result;
    }
主要一下,msgTreeMap的型別,TreeMap,按訊息的offset升序排序,返回的result,如果treemap中不存在任何訊息,那就返回該處理佇列最大的偏移量+1,如果移除自己本批訊息後,處理佇列中,還存在訊息,則返回該處理佇列中最小的偏移量,也就是此時返回的偏移量有可能不是訊息本身的偏移量,而是處理佇列中最小的偏移量有點:防止訊息丟失(也就是沒有消費到)缺點:會造成訊息重複消費訊息消費進度暫時就到這裡。

未完待續:

相關推薦

原始碼分析RocketMQ訊息ACK機制消費進度

首先簡要闡述一下訊息消費進度首先消費者訂閱訊息消費佇列(MessageQueue),當生產者將訊息負載傳送到MessageQueue中時,消費訂閱者開始消費訊息,訊息消費過程中,為了避免重複消費,需要一

RocketMQ原理4——訊息ACK機制消費進度管理

https://zhuanlan.zhihu.com/p/25265380 consumer的每個例項是靠佇列分配來決定如何消費訊息的。那麼消費進度具體是如何管理的,又是如何保證訊息成功消費的(RocketMQ有保證訊息肯定消費成功的特性(失敗則重試)? 本文將詳細解

原始碼分析RocketMQ訊息消費

1、訊息消費關注點 1)訊息消費方式:拉取、推送 2)消費者組與消費模式 consumerGroup;       MessageModel messageModel;    多個消費者組成一個消費組,兩種模式:叢集(訊息被其中任何一個訊息者消費)、廣播模式(全部消費者消費

RabbitMQ 訊息確認機制事務+Confirm

概述 在 Rabbitmq 中我們可以通過持久化來解決因為伺服器異常而導致丟失的問題,除此之外我們還會遇到一個問題:生產者將訊息傳送出去之後,訊息到底有沒有正確到達 Rabbit 伺服器呢?如果不錯得數處理,我們是不知道的,(即 Rabbit 伺服器不會反饋任何訊息給生產者),也就是預設的情況下是不知道訊息

RabbitMQ訊息確認機制事務+Confirm

概述 在使用RabbitMQ的時候,我們可以通過訊息持久化操作來解決因為伺服器的異常奔潰導致的訊息丟失,除此之外我們還會遇到一個問題,當訊息的釋出者在將訊息傳送出去之後,訊息到底有沒有正確到達broker代理伺服器呢?如果不進行特殊配置的話,預設情況下發布操作是不會返回任何

Android訊息處理機制Handler的本質-Message和Looper到底是什麼?

目錄 Android之訊息處理機制(二) 以下皆為乾貨,比較幹,需要讀者細細理解。  前面(一)已經解釋了Handler的基本機制了,下面來概括一下本質。 一、MessageQueue        MessageQueue其實就

Android softap連線斷開訊息通知機制Android O

版權宣告:本文為博主原創文章,部落格地址:https://blog.csdn.net/h784707460/article/details/79788344,未經博主允許不得轉載。 基於使用者需求、功耗優化等,Softap功能常常會在原生Android基礎上做一些定製,比如:STA的接

java多執行緒等待喚醒機制wait-notify

wait()、notify()、notifyAll()方法 Object類裡面提供了這幾個方法: wait():讓當前執行緒處於等待(阻塞狀態),直到其他執行緒呼叫此物件的notify()或noti

spark原始碼解讀2水塘抽樣演算法Reservoir Sampling

spark原始碼解讀系列環境:spark-1.5.2、hadoop-2.6.0、scala-2.10.4 1.理解   問題定義可以簡化如下:在不知道檔案總行數的情況下,如何從檔案中隨機的抽取一行?   首先想到的是我們做過類似的題目嗎?當然,在知

《深入理解SPARK:核心思想與原始碼分析》——SparkContext的初始化仲篇——SparkUI、環境變數及排程

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。 本文展現第3章第二部分的內容:

《深入理解Spark:核心思想與原始碼分析》——SparkContext的初始化伯篇——執行環境與元資料清理器

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。本文展現第3章第一部分的內容: 第3章

QT 訊息處理機制執行流程

程式主體由  QApplication app(argc,argv)開始按順序初始化程式;在 return app.exec();處進入訊息迴圈app.exec()函式監聽事件,有event()函式分發事件

原始碼學習系列SpringBoot自動配置篇一

原始碼學習系列之SpringBoot自動配置原始碼學習(篇一) ok,本部落格嘗試跟一下Springboot的自動配置原始碼,做一下筆記記錄,自動配置是Springboot的一個很關鍵的特性,也容易被忽略的屬性,因為這個屬性被包括在@SpringBootApplication註解裡,所以不去跟一下原始碼都不知

原始碼學習系列SpringBoot自動配置篇二

原始碼學習系列之SpringBoot自動配置(篇二)之HttpEncodingAutoConfiguration 原始碼分析 繼上一篇部落格原始碼學習系列之SpringBoot自動配置(篇一)之後,本部落格繼續跟一下SpringBoot的自動配置原始碼 ok,先複習一下上一篇的內容,從前面的學習,我們知道了S

rocketmq原始碼分析broker核心MessageStore訊息接受十八

這章我們從broker接受到訊息後的處理,從原始碼加註解的角度解析整體處理及技術,整體的處理步驟如下: sendMessage

rocketmq原始碼分析broker核心MessageStore訊息拉取十九

根據訊息的拉取程式碼,broker端的大體操作步驟如下,主要進行pullMessage     1,構建net

Android架構分析Android訊息處理機制

作者:劉昊昱  Android版本:4.4.2 在上一篇文章中我們看了一個使用Handler處理Message訊息的例子,本文我們來分析一下其背後隱藏的Android訊息處理機制。 我們可能比較熟悉Windows作業系統的訊息處理模型: while(GetMessage

Android架構分析Android訊息處理機制

作者:劉昊昱  Android版本:4.4.2 本文我們來分析AndroidUI執行緒即主執行緒是怎樣實現對訊息的處理的。 UI執行緒的實現類定義在frameworks/base/core/java/android/app/ActivityThread.java檔案中。

RocketMQ原始碼分析】深入訊息儲存1

![](https://antzyun.oss-cn-beijing.aliyuncs.com/img204d5b68da5e7e26d371c966fbf81d8.jpg) 最近在學習RocketMQ相關的東西,在學習之餘沉澱幾篇筆記。 RocketMQ有很多值得關注的設計點,訊息傳送、訊息消費、路由中

Java程式設計師從笨鳥到菜鳥八十一細談Spring深入原始碼分析SpringHibernateTemplate

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!