1. 程式人生 > >RocketMQ中PullConsumer的訊息拉取原始碼分析

RocketMQ中PullConsumer的訊息拉取原始碼分析

在PullConsumer中,有關訊息的拉取RocketMQ提供了很多API,但總的來說分為兩種,同步訊息拉取和非同步訊息拉取


同步訊息拉取
以同步方式拉取訊息都是通過DefaultMQPullConsumerImpl的pullSyncImpl方法:

 1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
 2     long timeout)
 3     throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 4     this.makeSureStateOK();
 5 
 6     if (null == mq) {
 7         throw new MQClientException("mq is null", null);
 8     }
 9 
10     if (offset < 0) {
11         throw new MQClientException("offset < 0", null);
12     }
13 
14     if (maxNums <= 0) {
15         throw new MQClientException("maxNums <= 0", null);
16     }
17 
18     this.subscriptionAutomatically(mq.getTopic());
19 
20     int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
21 
22     long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
23 
24     boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
25     PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
26         mq,
27         subscriptionData.getSubString(),
28         subscriptionData.getExpressionType(),
29         isTagType ? 0L : subscriptionData.getSubVersion(),
30         offset,
31         maxNums,
32         sysFlag,
33         0,
34         this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
35         timeoutMillis,
36         CommunicationMode.SYNC,
37         null
38     );
39     this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
40     if (!this.consumeMessageHookList.isEmpty()) {
41         ConsumeMessageContext consumeMessageContext = null;
42         consumeMessageContext = new ConsumeMessageContext();
43         consumeMessageContext.setConsumerGroup(this.groupName());
44         consumeMessageContext.setMq(mq);
45         consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
46         consumeMessageContext.setSuccess(false);
47         this.executeHookBefore(consumeMessageContext);
48         consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
49         consumeMessageContext.setSuccess(true);
50         this.executeHookAfter(consumeMessageContext);
51     }
52     return pullResult;
53 }

首先通過subscriptionAutomatically方法檢查Topic是否訂閱

 

 1 public void subscriptionAutomatically(final String topic) {
 2     if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
 3         try {
 4             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
 5                 topic, SubscriptionData.SUB_ALL);
 6             this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
 7         } catch (Exception ignore) {
 8         }
 9     }
10 }

若是沒有就新建一條訂閱資料儲存在rebalanceImpl的subscriptionInner中


之後呼叫pullKernelImpl方法:

 1 public PullResult pullKernelImpl(
 2     final MessageQueue mq,
 3     final String subExpression,
 4     final String expressionType,
 5     final long subVersion,
 6     final long offset,
 7     final int maxNums,
 8     final int sysFlag,
 9     final long commitOffset,
10     final long brokerSuspendMaxTimeMillis,
11     final long timeoutMillis,
12     final CommunicationMode communicationMode,
13     final PullCallback pullCallback
14 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
15     FindBrokerResult findBrokerResult =
16         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
17             this.recalculatePullFromWhichNode(mq), false);
18     if (null == findBrokerResult) {
19         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
20         findBrokerResult =
21             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
22                 this.recalculatePullFromWhichNode(mq), false);
23     }
24 
25     if (findBrokerResult != null) {
26         {
27             // check version
28             if (!ExpressionType.isTagType(expressionType)
29                 && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
30                 throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
31                     + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
32             }
33         }
34         int sysFlagInner = sysFlag;
35 
36         if (findBrokerResult.isSlave()) {
37             sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
38         }
39 
40         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
41         requestHeader.setConsumerGroup(this.consumerGroup);
42         requestHeader.setTopic(mq.getTopic());
43         requestHeader.setQueueId(mq.getQueueId());
44         requestHeader.setQueueOffset(offset);
45         requestHeader.setMaxMsgNums(maxNums);
46         requestHeader.setSysFlag(sysFlagInner);
47         requestHeader.setCommitOffset(commitOffset);
48         requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
49         requestHeader.setSubscription(subExpression);
50         requestHeader.setSubVersion(subVersion);
51         requestHeader.setExpressionType(expressionType);
52 
53         String brokerAddr = findBrokerResult.getBrokerAddr();
54         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
55             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
56         }
57 
58         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
59             brokerAddr,
60             requestHeader,
61             timeoutMillis,
62             communicationMode,
63             pullCallback);
64 
65         return pullResult;
66     }
67 
68     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
69 }

首先通過findBrokerAddressInSubscribe方法查詢關於訊息佇列的Broker資訊


這裡的recalculatePullFromWhichNode方法:

 1 public long recalculatePullFromWhichNode(final MessageQueue mq) {
 2     if (this.isConnectBrokerByUser()) {
 3         return this.defaultBrokerId;
 4     }
 5 
 6     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
 7     if (suggest != null) {
 8         return suggest.get();
 9     }
10 
11     return MixAll.MASTER_ID;
12 }

根據訊息佇列,在pullFromWhichNodeTable查詢其對應的Broker的ID
pullFromWhichNodeTable記錄了訊息對了和BrokerID的對映

1 private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
2         new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

(master的BrokerID為0,slave的BrokerID大於0)

 

findBrokerAddressInSubscribe方法:

 1 public FindBrokerResult findBrokerAddressInSubscribe(
 2     final String brokerName,
 3     final long brokerId,
 4     final boolean onlyThisBroker
 5 ) {
 6     String brokerAddr = null;
 7     boolean slave = false;
 8     boolean found = false;
 9 
10     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
11     if (map != null && !map.isEmpty()) {
12         brokerAddr = map.get(brokerId);
13         slave = brokerId != MixAll.MASTER_ID;
14         found = brokerAddr != null;
15 
16         if (!found && !onlyThisBroker) {
17             Entry<Long, String> entry = map.entrySet().iterator().next();
18             brokerAddr = entry.getValue();
19             slave = entry.getKey() != MixAll.MASTER_ID;
20             found = true;
21         }
22     }
23 
24     if (found) {
25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
26     }
27 
28     return null;
29 }

這裡就根據brokerAddrTable表查詢該BrokerID對應的Broker的地址資訊,以及是否是slave
封裝為FindBrokerResult返回


若是沒有找到Broker的路由資訊,則通過updateTopicRouteInfoFromNameServer方法向NameServer請求更新,更新完成後再呼叫findBrokerAddressInSubscribe方法查詢


之後會根據相應的資訊封裝請求訊息頭PullMessageRequestHeader

然後呼叫pullMessage方法:

 1 public PullResult pullMessage(
 2     final String addr,
 3     final PullMessageRequestHeader requestHeader,
 4     final long timeoutMillis,
 5     final CommunicationMode communicationMode,
 6     final PullCallback pullCallback
 7 ) throws RemotingException, MQBrokerException, InterruptedException {
 8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 9 
10     switch (communicationMode) {
11         case ONEWAY:
12             assert false;
13             return null;
14         case ASYNC:
15             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
16             return null;
17         case SYNC:
18             return this.pullMessageSync(addr, request, timeoutMillis);
19         default:
20             assert false;
21             break;
22     }
23 
24     return null;
25 }

這裡就可以看出我前面說的兩種型別,同步拉取和非同步拉取


pullMessageSync方法:

1 private PullResult pullMessageSync(
2     final String addr,
3     final RemotingCommand request,
4     final long timeoutMillis
5 ) throws RemotingException, InterruptedException, MQBrokerException {
6     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
7     assert response != null;
8     return this.processPullResponse(response);
9 }

這裡其實就是通過invokeSync方法,由Netty進行同步傳送,將請求傳送給Broker
關於訊息的傳送詳見:

【RocketMQ中Producer訊息的傳送原始碼分析】

 

在收到響應後由processPullResponse方法處理
processPullResponse方法:

 1 private PullResult processPullResponse(
 2     final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
 3     PullStatus pullStatus = PullStatus.NO_NEW_MSG;
 4     switch (response.getCode()) {
 5         case ResponseCode.SUCCESS:
 6             pullStatus = PullStatus.FOUND;
 7             break;
 8         case ResponseCode.PULL_NOT_FOUND:
 9             pullStatus = PullStatus.NO_NEW_MSG;
10             break;
11         case ResponseCode.PULL_RETRY_IMMEDIATELY:
12             pullStatus = PullStatus.NO_MATCHED_MSG;
13             break;
14         case ResponseCode.PULL_OFFSET_MOVED:
15             pullStatus = PullStatus.OFFSET_ILLEGAL;
16             break;
17 
18         default:
19             throw new MQBrokerException(response.getCode(), response.getRemark());
20     }
21 
22     PullMessageResponseHeader responseHeader =
23         (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
24 
25     return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
26         responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
27 }

根據響應的狀態,設定PullStatus狀態

然後通過decodeCommandCustomHeader方法,將響應中的資訊解碼
最後由PullResultExt封裝訊息資訊

 1 public class PullResultExt extends PullResult {
 2     private final long suggestWhichBrokerId;
 3     private byte[] messageBinary;
 4 
 5     public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
 6         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
 7         super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
 8         this.suggestWhichBrokerId = suggestWhichBrokerId;
 9         this.messageBinary = messageBinary;
10     }
11     ......
12 }
13 
14 public class PullResult {
15     private final PullStatus pullStatus;
16     private final long nextBeginOffset;
17     private final long minOffset;
18     private final long maxOffset;
19     private List<MessageExt> msgFoundList;
20     
21     public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
22         List<MessageExt> msgFoundList) {
23         super();
24         this.pullStatus = pullStatus;
25         this.nextBeginOffset = nextBeginOffset;
26         this.minOffset = minOffset;
27         this.maxOffset = maxOffset;
28         this.msgFoundList = msgFoundList;
29     }
30     ......
31 }

拉取到的訊息可能是多條,具體內容在PullResult 中的msgFoundList儲存,MessageExt是Message的超類

 

回到pullSyncImpl方法,在拉取到訊息後,呼叫processPullResult方法:

 1 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
 2     final SubscriptionData subscriptionData) {
 3     PullResultExt pullResultExt = (PullResultExt) pullResult;
 4 
 5     this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
 6     if (PullStatus.FOUND == pullResult.getPullStatus()) {
 7         ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
 8         List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
 9 
10         List<MessageExt> msgListFilterAgain = msgList;
11         if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
12             msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
13             for (MessageExt msg : msgList) {
14                 if (msg.getTags() != null) {
15                     if (subscriptionData.getTagsSet().contains(msg.getTags())) {
16                         msgListFilterAgain.add(msg);
17                     }
18                 }
19             }
20         }
21 
22         if (this.hasHook()) {
23             FilterMessageContext filterMessageContext = new FilterMessageContext();
24             filterMessageContext.setUnitMode(unitMode);
25             filterMessageContext.setMsgList(msgListFilterAgain);
26             this.executeHook(filterMessageContext);
27         }
28 
29         for (MessageExt msg : msgListFilterAgain) {
30             String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
31             if (traFlag != null && Boolean.parseBoolean(traFlag)) {
32                 msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
33             }
34             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
35                 Long.toString(pullResult.getMinOffset()));
36             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
37                 Long.toString(pullResult.getMaxOffset()));
38         }
39 
40         pullResultExt.setMsgFoundList(msgListFilterAgain);
41     }
42 
43     pullResultExt.setMessageBinary(null);
44 
45     return pullResult;
46 }

首先呼叫updatePullFromWhichNode方法:

1 public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
2    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
3     if (null == suggest) {
4         this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
5     } else {
6         suggest.set(brokerId);
7     }
8 }

這裡就會將pullFromWhichNodeTable中記錄的訊息佇列和BrokerID的對映,更新為Broker傳送過來的建議ID
結合上一篇部落格來看,若是採用叢集模式,就完成了消費者端的負載均衡


在PullStatus.FOUND情況下,會呼叫MessageDecoder的decodes方法,將CommitLog格式的訊息資料進行解碼,轉化為真正可讀的訊息

之後會對Tag進行判斷,設定了Tag,新增Tag訊息記錄

之後,在設定了FilterMessageHook鉤子情況下,通過executeHook方法執行FilterMessageHook鉤子的filterMessage方法:

 1 public void executeHook(final FilterMessageContext context) {
 2     if (!this.filterMessageHookList.isEmpty()) {
 3         for (FilterMessageHook hook : this.filterMessageHookList) {
 4             try {
 5                 hook.filterMessage(context);
 6             } catch (Throwable e) {
 7                 log.error("execute hook error. hookName={}", hook.hookName());
 8             }
 9         }
10     }
11 }

然後對訊息進行屬性設定


processPullResult完成後,若是設定了ConsumeMessageHook鉤子,呼叫executeHookBefore和executeHookAfter方法,分別執行鉤子中的consumeMessageBefore和consumeMessageAfter方法:

 1 public void executeHookBefore(final ConsumeMessageContext context) {
 2     if (!this.consumeMessageHookList.isEmpty()) {
 3         for (ConsumeMessageHook hook : this.consumeMessageHookList) {
 4             try {
 5                 hook.consumeMessageBefore(context);
 6             } catch (Throwable ignored) {
 7             }
 8         }
 9     }
10 }
11 
12 public void executeHookAfter(final ConsumeMessageContext context) {
13     if (!this.consumeMessageHookList.isEmpty()) {
14         for (ConsumeMessageHook hook : this.consumeMessageHookList) {
15             try {
16                 hook.consumeMessageAfter(context);
17             } catch (Throwable ignored) {
18             }
19         }
20     }
21 }

PullConsumer訊息的同步拉取到此結束

 


非同步訊息拉取

非同步拉取的API都通過pullAsyncImpl方法實現:

 1 private void pullAsyncImpl(
 2     final MessageQueue mq,
 3     final SubscriptionData subscriptionData,
 4     final long offset,
 5     final int maxNums,
 6     final PullCallback pullCallback,
 7     final boolean block,
 8     final long timeout) throws MQClientException, RemotingException, InterruptedException {
 9     this.makeSureStateOK();
10 
11     if (null == mq) {
12         throw new MQClientException("mq is null", null);
13     }
14 
15     if (offset < 0) {
16         throw new MQClientException("offset < 0", null);
17     }
18 
19     if (maxNums <= 0) {
20         throw new MQClientException("maxNums <= 0", null);
21     }
22 
23     if (null == pullCallback) {
24         throw new MQClientException("pullCallback is null", null);
25     }
26 
27     this.subscriptionAutomatically(mq.getTopic());
28 
29     try {
30         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
31 
32         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
33 
34         boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
35         this.pullAPIWrapper.pullKernelImpl(
36             mq,
37             subscriptionData.getSubString(),
38             subscriptionData.getExpressionType(),
39             isTagType ? 0L : subscriptionData.getSubVersion(),
40             offset,
41             maxNums,
42             sysFlag,
43             0,
44             this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
45             timeoutMillis,
46             CommunicationMode.ASYNC,
47             new PullCallback() {
48 
49                 @Override
50                 public void onSuccess(PullResult pullResult) {
51                     pullCallback
52                         .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
53                 }
54 
55                 @Override
56                 public void onException(Throwable e) {
57                     pullCallback.onException(e);
58                 }
59             });
60     } catch (MQBrokerException e) {
61         throw new MQClientException("pullAsync unknow exception", e);
62     }
63 }

相比同步,引數多了個PullCallback,用於處理非同步拉取後的回撥


過程基本上個同步拉取類似,只不過在呼叫pullKernelImpl方法時,會建立一個PullCallback
在onSuccess和onException中,實際上呼叫了pullCallback的相應方法,這樣就完成了非同步的回撥

在onSuccess回撥的引數中,同同步方式類似,會通過processPullResult方法,對結果進一步加工


之後的pullKernelImpl方法和同步一樣

只不過最後呼叫了pullMessageAsync方法:

 1 private void pullMessageAsync(
 2     final String addr,
 3     final RemotingCommand request,
 4     final long timeoutMillis,
 5     final PullCallback pullCallback
 6 ) throws RemotingException, InterruptedException {
 7     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
 8         @Override
 9         public void operationComplete(ResponseFuture responseFuture) {
10             RemotingCommand response = responseFuture.getResponseCommand();
11             if (response != null) {
12                 try {
13                     PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
14                     assert pullResult != null;
15                     pullCallback.onSuccess(pullResult);
16                 } catch (Exception e) {
17                     pullCallback.onException(e);
18                 }
19             } else {
20                 if (!responseFuture.isSendRequestOK()) {
21                     pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
22                 } else if (responseFuture.isTimeout()) {
23                     pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
24                         responseFuture.getCause()));
25                 } else {
26                     pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
27                 }
28             }
29         }
30     });
31 }

這裡實際上也是通過Netty完成非同步傳送
詳見:

【RocketMQ中Producer訊息的傳送原始碼分析】

 


由於是非同步傳送,這裡又設定了一個回撥InvokeCallback
當請求傳送完成,收到響應後,就會執行InvokeCallback的operationComplete方法,

在operationComplete方法中,和同步一樣,執行processPullResponse方法,處理響應
之後呼叫pullCallback的onSuccess方法,也就是剛才建立的回撥介面,進而執行使用者傳入的回撥介面的方法


訊息非同步拉取也就到此結束

&n