RocketMQ原始碼分析之ConsumeQueue
一:前言
上週末寫了兩篇文章講到服務端Broker在收到訊息後是如何儲存訊息的:
但是除了負責儲存訊息之外,Broker還要負責建立消費佇列。關於消費佇列,其實在講訊息傳送的時候《 RocketMQ原始碼分析之訊息傳送 》,我就畫過一張簡單的圖。

每個 Topic 在Broker 端都會有多個消費佇列,Producer每次都會選擇一個ComsumeQueue傳送訊息,Consumer同樣也會每次都選擇一個ComsumeQueue拉取訊息進行消費。
那麼這些ConsumeQueue究竟是什麼?有什麼作用?又是何時建立的呢?本篇文章我們就一起來分析下吧。
二:ConsumeQueue介紹
每個ConsumeQueue都有一個id,id 的值為0到TopicConfig配置的佇列數量。比如某個Topic的消費佇列數量為4,那麼四個ConsumeQueue的id就分別為0、1、2、3。
ConsumeQueue是不負責儲存訊息的,只是負責記錄它所屬Topic的訊息在CommitLog中的偏移量,這樣當消費者從Broker拉取訊息的時候,就可以快速根據偏移量定位到訊息。
ConsumeQueue本身同樣是利用MappedFileQueue進行記錄偏移量資訊的,可見MappedFileQueue的設計多麼美妙,它沒有與訊息進行耦合,而是設計成一個通用的儲存功能。
ConsumeQueue更新訊息偏移量的整體過程大概如下圖所示,其中涉及了幾個概念。
- ReputMessageService
- CommitLogDispatcherBuildConsumeQueue

上面這個圖比較粗糙,畫的很隨意,具體的原始碼分析還請大家接著往下看。
三:ReputMessageService服務
之前在講到訊息儲存的時候,提到每個Broker在初始化的時候都會初始化一個MessageStore負責儲存訊息,而MessageStore在初始化的時候,同樣會啟動一個ReputMessageService, ReputMessageService就是用來更新ConsumeQueue中訊息偏移的。
ReputMessageService本身是一個執行緒,它啟動後便會在迴圈中不斷呼叫doReput()方法,用來通知ConsumeQueue進行更新。
@Override public void run() { while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } }
doReput()中主要分為以下幾步:
1:獲取CommitLog中儲存的新訊息。
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
reputFromOffset記錄了本次需要拉取的訊息在CommitLog中的偏移。這裡將reputFromOffset傳遞給CommitLog,獲取CommitLog在reputFromOffset處儲存的訊息。
2:如果第一步獲取的訊息不為空,則表明有新訊息被儲存到CommitLog中,此時便會通知ConsumeQueue更新訊息偏移。
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); ...... DefaultMessageStore.this.doDispatch(dispatchRequest);
3:更新reputFromOffset,設定為下次需要拉取的訊息在CommitLog中的偏移。
this.reputFromOffset = result.getStartOffset(); ...... int size = dispatchRequest.getMsgSize(); ...... this.reputFromOffset += size;
上面的重點在第二步中,這裡呼叫 DefaultMessageStore.this.doDispatch(dispatchRequest) 來通知ConsumeQueue。
DefaultMessageStore中儲存了一個dispatcherList,其中存放了幾個CommitLogDispatcher物件,它們都是用來監聽CommitLog中新訊息儲存的。
this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
doDispatch()會遍歷CommitLogDispatcher,呼叫它們的dispatch()方法。其中專門用來通知ConsumeQueue的Dispatcher是 CommitLogDispatcherBuildConsumeQueue 。
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }
當ReputMessageService呼叫了CommitLogDispatcherBuildConsumeQueue的dispatch()後,CommitLogDispatcherBuildConsumeQueue便會呼叫 DefaultMessageStore.this.putMessagePositionInfo(request):
public void putMessagePositionInfo(DispatchRequest dispatchRequest) { ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); cq.putMessagePositionInfoWrapper(dispatchRequest); }
putMessagePositionInfo()邏輯有兩步:
1:呼叫findConsumeQueue(),根據訊息的topic以及訊息所屬的ConsumeQueueId,找到對應的ConsumeQueue。
findConsumeQueue()會先從consumeQueueTable中查詢topic的ConsumeQueueMap,如果未找到,便會為Topic建立一個新的ConcurrentMap<Integer/* queueId */, ConsumeQueue>,存放到表中。
接著在從Topic的ConcurrentMap中,根據QueueId,查詢ConsumeQueue,如果未找到,便也會建立一個新的ConsumeQueue,存放到Map中 。ConsumeQueue便是此時被建立的。
2:當找到訊息對應的ConsumeQueue後,便呼叫ConsumeQueue的putMessagePositionInfoWrapper()方法,更新ConsumeQueue。
四:ConsumeQueue的更新
上面主要講了ReputMessageService是如何通知ConsumeQueue的,現在我們就要看看ConsumeQueue收到通知後是如何更新的,更新邏輯就在putMessagePositionInfoWrapper()中。
putMessagePositionInfoWrapper()中呼叫了putMessagePositionInfo(),並引入了重試機制。
我們來看看putMessagePositionInfo()中的主要邏輯:
1:判斷訊息是否已經被處理過
if (offset <= this.maxPhysicOffset) { return true; }
maxPhysicOffset記錄了上一次ConsumeQueue更新的訊息在CommitLog中的偏移量,如果本次訊息偏移量小於maxPhysicOffset,則表明訊息已經被更新過,直接返回。
2:初始儲存偏移量所用的記憶體
this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode);
這裡將訊息的偏移量、大小、tagsCode等資訊,都暫存到了一塊ByteBuffer中。
3:獲取此次儲存訊息偏移量所用的MappedFile
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
前面我們講到,ConsumeQueue本身也是採用MappedFileQueue用來儲存偏移量的,這裡便是獲取MappedFileQueue中的最後一個MappedFile,用來儲存訊息。
ConsumeQueue 中用來儲存訊息偏移量的結構大小為 CQ_STORE_UNIT_SIZE,為20個位元組,cqOffse t為 ConsumeQueue 中已經記錄了多少條訊息的偏移量,所以 expectLogicOffset 即為當前需要儲存的訊息偏移量結構,在ConsumeQueue的MappedFileQueue中的位置。
4:更新maxPhysicOffset,並將暫存在ByteBuffer中的訊息偏移資訊,追加到MappedFile中。
this.maxPhysicOffset = offset; return mappedFile.appendMessage(this.byteBufferIndex.array());
此時,訊息偏移量就被成功儲存到ConsumeQueue中了。
五:總結
通過上面的分析,我們知道了ConsumeQueue的作用及更新流程,總結來說,它就是用來記錄訊息在CommitLog中偏移量的,便於Consumer快速定位訊息。
訊息偏移量的更新分為下面幾步:
1:ReputMessageService不斷從CommitLog中查詢是否有新儲存的訊息;
2:如果有新訊息,便通過Dispatcher通知ConsumeQueue;
3:ConsumeQueue收到通知後會將訊息偏移量儲存到自身的MappedFile中。
後面文章我還會繼續剖析 RocketMQ。
歡迎大家點個贊,關注下!