RocketMQ一個新的消費組初次啟動時從何處開始消費呢?
目錄
- 1、丟擲問題
- 1.1 環境準備
- 1.2 訊息傳送者程式碼
- 1.3 消費端驗證程式碼
- 2、探究CONSUME_FROM_MAX_OFFSET實現原理
- 2.1 CONSUME_FROM_LAST_OFFSET計算邏輯
- 2.2 CONSUME_FROM_FIRST_OFFSET
- 2.4 CONSUME_FROM_TIMESTAMP
- 3、猜想與驗證
- 4、解決方案
- ---
@(本文目錄)
1、丟擲問題
一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條訊息開始消費呢?
首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼簾,從字面意思來看是設定消費者從哪裡開始消費,正是解開該問題的”鑰匙“。ConsumeFromWhere列舉類圖如下:
- CONSUME_FROM_MAX_OFFSET
從消費佇列最大的偏移量開始消費。 - CONSUME_FROM_FIRST_OFFSET
從消費佇列最小偏移量開始消費。 - CONSUME_FROM_TIMESTAMP
從指定的時間戳開始消費,預設為消費者啟動之前的30分鐘處開始消費。可以通過DefaultMQPushConsumer#setConsumeTimestamp。
是不是點小激動,還不快試試。
需求:新的消費組啟動時,從佇列最後開始消費,即只消費啟動後傳送到訊息伺服器後的最新訊息。
1.1 環境準備
本示例所用到的Topic路由資訊如下:
Broker的配置如下(broker.conf)
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog namesrvAddr=127.0.0.1:9876 autoCreateTopicEnable=false mapedFileSizeCommitLog=10240 mapedFileSizeConsumeQueue=2000
其中重點修改瞭如下兩個引數:
- mapedFileSizeCommitLog
單個commitlog檔案的大小,這裡使用10M,方便測試用。 - mapedFileSizeConsumeQueue
單個consumequeue佇列長度,這裡使用1000,表示一個consumequeue檔案中包含1000個條目。
1.2 訊息傳送者程式碼
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 300; i++) {
try {
Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
通過上述,往TopicTest傳送300條訊息,傳送完畢後,RocketMQ Broker儲存結構如下:
1.3 消費端驗證程式碼
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
執行上述程式碼後,按照期望,應該是不會消費任何訊息,只有等生產者再發送訊息後,才會對訊息進行消費,事實是這樣嗎?執行效果如圖所示:
令人意外的是,竟然從佇列的最小偏移量開始消費了,這就“尷尬”了。難不成是RocketMQ的Bug。帶著這個疑問,從原始碼的角度嘗試來解讀該問題,並指導我們實踐。
2、探究CONSUME_FROM_MAX_OFFSET實現原理
對於一個新的消費組,無論是叢集模式還是廣播模式都不會儲存該消費組的消費進度,可以理解為-1,此時就需要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先我們需要找到其對應的處理入口。我們知道,訊息消費者從Broker伺服器拉取訊息時,需要進行消費佇列的負載,即RebalanceImpl。
溫馨提示:本文不會詳細介紹RocketMQ訊息佇列負載、訊息拉取、訊息消費邏輯,只會展示出通往該問題的簡短流程,如想詳細瞭解訊息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。
RebalancePushImpl#computePullFromWhere
public long computePullFromWhere(MessageQueue mq) {
long result = -1; // @1
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: { // @2
// 省略部分程式碼
break;
}
case CONSUME_FROM_FIRST_OFFSET: { // @3
// 省略部分程式碼
break;
}
case CONSUME_FROM_TIMESTAMP: { //@4
// 省略部分程式碼
break;
}
default:
break;
}
return result; // @5
}
程式碼@1:先解釋幾個區域性變數。
- result
最終的返回結果,預設為-1。 - consumeFromWhere
訊息消費者開始消費的策略,即CONSUME_FROM_LAST_OFFSET等。 - offsetStore
offset儲存器,消費組訊息偏移量儲存實現器。
程式碼@2:CONSUME_FROM_LAST_OFFSET(從佇列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。
程式碼@3:CONSUME_FROM_FIRST_OFFSET(從佇列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。
程式碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。
程式碼@5:返回最後計算的偏移量,從該偏移量出開始消費。
2.1 CONSUME_FROM_LAST_OFFSET計算邏輯
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1
if (lastOffset >= 0) { // @2
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) { // @3
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { // @4
result = -1;
}
}
} else {
result = -1;
}
break;
}
程式碼@1:使用offsetStore從訊息消費進度檔案中讀取消費消費進度,本文將以叢集模式為例展開。稍後詳細分析。
程式碼@2:如果返回的偏移量大於等於0,則直接使用該offset,這個也能理解,大於等於0,表示查詢到有效的訊息消費進度,從該有效進度開始消費,但我們要特別留意lastOffset為0是什麼場景,因為返回0,並不會執行CONSUME_FROM_LAST_OFFSET(語義)。
程式碼@3:如果lastOffset為-1,表示當前並未儲存其有效偏移量,可以理解為第一次消費,如果是消費組重試主題,從重試佇列偏移量為0開始消費;如果是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。
程式碼@4:如果從遠端服務拉取最大偏移量拉取異常或其他情況,則使用-1作為第一次拉取偏移量。
分析,上述執行的現象,雖然設定的是CONSUME_FROM_LAST_OFFSET,但現象是從佇列的第一條訊息開始消費,根據上述原始碼的分析,只有從消費組消費進度儲存檔案中取到的訊息偏移量為0時,才會從第一條訊息開始消費,故接下來重點分析訊息消費進度儲存器(OffsetStore)在什麼情況下會返回0。
接下來我們將以叢集模式來檢視一下訊息消費進度的查詢邏輯,叢集模式的訊息進度儲存管理器實現為:
RemoteBrokerOffsetStore,最終Broker端的命令處理類為:ConsumerManageProcessor。
ConsumerManageProcessor#queryConsumerOffset
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); // @1
if (offset >= 0) { // @2
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else { // @3
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId()); // @4
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( // @5
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else { // @6
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
return response;
}
程式碼@1:從消費訊息進度檔案中查詢訊息消費進度。
程式碼@2:如果訊息消費進度檔案中儲存該佇列的訊息進度,其返回的offset必然會大於等於0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。
程式碼@3:如果未從訊息消費進度檔案中查詢到其進度,offset為-1。則首先獲取該主題、訊息隊列當前在Broker伺服器中的最小偏移量(@4)。如果小於等於0(返回0則表示該佇列的檔案還未曾刪除過)並且其最小偏移量對應的訊息儲存在記憶體中而不是存在磁碟中,則返回偏移量0,這就意味著ConsumeFromWhere中定義的三種列舉型別都不會生效,直接從0開始消費,到這裡就能解開其謎團了(@5)。
程式碼@6:如果偏移量小於等於0,但其訊息已經儲存在磁碟中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中得到的偏移量為-1。
看到這裡,大家應該能回答文章開頭處提到的問題了吧?
看到這裡,大家應該明白了,為什麼設定的CONSUME_FROM_LAST_OFFSET,但消費組是從訊息佇列的開始處消費了吧,原因就是訊息消費進度檔案中並沒有找到其訊息消費進度,並且該佇列在Broker端的最小偏移量為0,說的更直白點,consumequeue/topicName/queueNum的第一個訊息消費佇列檔案為00000000000000000000,並且訊息其對應的訊息快取在Broker端的記憶體中(pageCache),其返回給消費端的偏移量為0,故會從0開始消費,而不是從佇列的最大偏移量處開始消費。
為了知識體系的完備性,我們順便來看一下其他兩種策略的計算邏輯。
2.2 CONSUME_FROM_FIRST_OFFSET
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1
if (lastOffset >= 0) { // @2
result = lastOffset;
} else if (-1 == lastOffset) { // @3
result = 0L;
} else {
result = -1; // @4
}
break;
}
從佇列的開始偏移量開始消費,其計算邏輯如下:
程式碼@1:首先通過偏移量儲存器查詢消費佇列的消費進度。
程式碼@2:如果大於等於0,則從當前該偏移量開始消費。
程式碼@3:如果遠端返回-1,表示並沒有儲存該佇列的訊息消費進度,從0開始。
程式碼@4:否則從-1開始消費。
2.4 CONSUME_FROM_TIMESTAMP
從指定時戳後的訊息開始消費。
case CONSUME_FROM_TIMESTAMP: {
ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1
if (lastOffset >= 0) { // @2
result = lastOffset;
} else if (-1 == lastOffset) { // @3
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
其基本套路與CONSUME_FROM_LAST_OFFSET一樣:
程式碼@1:首先通過偏移量儲存器查詢消費佇列的消費進度。
程式碼@2:如果大於等於0,則從當前該偏移量開始消費。
程式碼@3:如果遠端返回-1,表示並沒有儲存該佇列的訊息消費進度,如果是重試主題,則從當前佇列的最大偏移量開始消費,如果是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。
原理就介紹到這裡,下面根據上述理論對其進行驗證。
3、猜想與驗證
根據上述理論分析我們得知設定CONSUME_FROM_LAST_OFFSET但並不是從訊息佇列的最大偏移量開始消費的“罪魁禍首”是因為訊息消費佇列的最小偏移量為0,如果不為0,則就會符合預期,我們來驗證一下這個猜想。
首先我們刪除commitlog目錄下的檔案,如圖所示:
其消費佇列截圖如下:
消費端的驗證程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
執行結果如下:
並沒有訊息存在的訊息,符合預期。
4、解決方案
如果在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設定CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName通常不會是00000000000000000000,如是是上面檔名,想要實現從佇列的最後開始消費,該如何做呢?那就走自動建立消費組的路子,執行如下命令:
./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05
//克隆一個訂閱了該topic的消費組消費進度
./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest
//重置消費進度到當前佇列的最大值
./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1
按照上上述命令後,即可實現其目的。
您都看到這裡了,麻煩幫忙點個贊,謝謝您的認可與鼓勵。
---
作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社群佈道師,公眾號:中介軟體興趣圈 維護者,目前已陸續發表原始碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等原始碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。