RocketMQ學習筆記四之【DefaultMQPullConsumer使用與流程簡單分析】
阿新 • • 發佈:2019-01-06
我們首先看下DefaultMQPullConsumer使用例子:
package com.swk.springboot.rocketmq; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; public class MQPullConsumer { private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.start(); // 從指定topic中拉取所有訊息佇列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic"); for(MessageQueue mq:mqs){ try { // 獲取訊息的offset,指定從store中獲取 long offset = consumer.fetchConsumeOffset(mq,true); System.out.println("consumer from the queue:"+mq+":"+offset); while(true){ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); switch(pullResult.getPullStatus()){ case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; } } } catch (Exception e) { e.printStackTrace(); } } consumer.shutdown(); } // 儲存上次消費的訊息下標 private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset); } // 獲取上次消費的訊息的下標 private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l; } }
根據上面的例子我們來分析下DefaultMQPullConsumer拉取訊息的流程(Consumer的啟動和關閉流程我們會放在後面的章節介紹)
1、【consumer.fetchSubscribeMessageQueues("order-topic")】
// 從指定topic中拉取所有訊息佇列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
該方法最終是呼叫
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { try { // 1、向NameServer傳送GET_ROUTEINTO_BY_TOPIC請求碼獲取topic引數對應的Broker資訊和topic配置資訊,即TopicRouteData物件 TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); if (topicRouteData != null) { // 2、遍歷topicRouteData Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); if (!mqList.isEmpty()) { return mqList; } else { throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null); } } } catch (Exception e) { throw new MQClientException( "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); }
繼續看下第二步的遍歷過程
/** 遍歷TopicRouteData物件的QueueData列表中每個QueueData物件,首先判斷該QueueData物件是否具有讀許可權, 若有則根據該QueueData物件的readQueueNums值,建立readQueueNums個MessageQueue物件,並構成MessageQueue集合; 最後返回給MessageQueue集合 **/ public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { Set<MessageQueue> mqList = new HashSet<MessageQueue>(); List<QueueData> qds = route.getQueueDatas(); for (QueueData qd : qds) { if (PermName.isReadable(qd.getPerm())) { for (int i = 0; i < qd.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); } } } return mqList; }
2、【consumer.pullBlockIfNotFound】
/**
通過該方法獲取該MessageQueue佇列下面從offset位置開始的訊息內容,其中maxNums=32即表示獲取的最大訊息個數,offset為該MessageQueue物件的開始消費位置,可以呼叫DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法獲取該MessageQueue佇列的消費進度來設定引數offset值該方法最終呼叫DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法
**/
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
下面我來看下【DefaultMQPullConsumerImpl.pullSyncImpl】的實現過程
/**
**/
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
// 檢查MessageQueue物件的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>變數中,若不在則以consumerGroup、topic、subExpression為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法構造SubscriptionData物件儲存到RebalanceImpl.subscriptionInner變數中,其中 subExpression="*"
this.subscriptionAutomatically(mq.getTopic());
// 構建標誌位,邏輯或運算|=
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
//以請求引數subExpression以及consumerGroup、topic為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法構造SubscriptionData物件並返回
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
// 從broker中拉取訊息
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
// 對拉取到的訊息進行解碼,過濾並執行回撥,並把解析的message列表放到MsgFoundList中
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
最近公司要使用springboot,接下來一段時間暫時更新recketmq,先把springboot惡補下