1. 程式人生 > >RocketMQ原始碼解析:Message拉取&消費(下)

RocketMQ原始碼解析:Message拉取&消費(下)

title: RocketMQ 原始碼分析 —— Message 拉取與消費(下)
date: 2017-05-11
tags:
categories: RocketMQ
permalink: RocketMQ/message-pull-and-consume-second

������關注微信公眾號:【芋艿的後端小屋】有福利:
1. RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表
2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋原始碼 GitHub 地址
3. 您對於原始碼的疑問每條留言將得到認真

回覆。甚至不知道如何讀原始碼也可以請教噢
4. 新的原始碼解析文章實時收到通知。每週更新一篇左右

1、概述

主要解析 Consumer消費 邏輯涉及到的原始碼。

2、Consumer

MQ 提供了兩類消費者:

  • PushConsumer:
    • 在大多數場景下使用。
    • 名字雖然是 Push 開頭,實際在實現時,使用 Pull 方式實現。通過 Pull 不斷不斷不斷輪詢 Broker 獲取訊息。當不存在新訊息時,Broker掛起請求,直到有新訊息產生,取消掛起,返回新訊息。這樣,基本和 Broker 主動 Push 做到接近的實時性(當然,還是有相應的實時性損失)。原理類似 。
  • PullConsumer

本文主要講解PushConsumer,部分講解PullConsumer,跳過順序消費
本文主要講解PushConsumer,部分講解PullConsumer,跳過順序消費
本文主要講解PushConsumer,部分講解PullConsumer,跳過順序消費

3、PushConsumer 一覽

先看一張 PushConsumer 包含的元件以及元件之間的互動圖:

PushConsumer手繪圖.png

  • RebalanceService:均衡訊息佇列服務,負責分配當前 Consumer 可消費的訊息佇列( MessageQueue )。當有新的 Consumer 的加入或移除,都會重新分配訊息佇列。
  • PullMessageService:拉取訊息服務,不斷不斷不斷Broker 拉取訊息,並提交消費任務到 ConsumeMessageService
  • ConsumeMessageService:消費訊息服務,不斷不斷不斷消費訊息,並處理消費結果。
  • RemoteBrokerOffsetStoreConsumer 消費進度管理,負責從 Broker 獲取消費進度,同步消費進度到 Broker
  • ProcessQueue :訊息處理佇列。
  • MQClientInstance :封裝對 NamesrvBroker 的 API呼叫,提供給 ProducerConsumer 使用。

4、PushConsumer 訂閱

DefaultMQPushConsumerImpl#subscribe(…)

  1: public void subscribe(String topic, String subExpression) throws MQClientException {
  2:     try {
  3:         // 建立訂閱資料
  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
  5:             topic, subExpression);
  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7:         // 通過心跳同步Consumer資訊到Broker
  8:         if (this.mQClientFactory != null) {
  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 10:         }
 11:     } catch (Exception e) {
 12:         throw new MQClientException("subscription exception", e);
 13:     }
 14: }
  • 說明 :訂閱 Topic
  • 第 7 至 10 行 :通過心跳同步 Consumer 資訊到 Broker

FilterAPI.buildSubscriptionData(…)

  1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
  2:     String subString) throws Exception {
  3:     SubscriptionData subscriptionData = new SubscriptionData();
  4:     subscriptionData.setTopic(topic);
  5:     subscriptionData.setSubString(subString);
  6:     // 處理訂閱表示式
  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);
  9:     } else {
 10:         String[] tags = subString.split("\\|\\|");
 11:         if (tags.length > 0) {
 12:             for (String tag : tags) {
 13:                 if (tag.length() > 0) {
 14:                     String trimString = tag.trim();
 15:                     if (trimString.length() > 0) {
 16:                         subscriptionData.getTagsSet().add(trimString);
 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());
 18:                     }
 19:                 }
 20:             }
 21:         } else {
 22:             throw new Exception("subString split error");
 23:         }
 24:     }
 25: 
 26:     return subscriptionData;
 27: }
  • 說明 :根據 Topic 和 訂閱表示式 建立訂閱資料
  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(…)

  1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
  2:     this.messageListener = messageListener;
  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
  4: }
  • 說明 :註冊訊息監聽器。

5、PushConsumer 訊息佇列分配

RebalanceService&PushConsumer分配佇列

RebalanceService

  1: public class RebalanceService extends ServiceThread {
  2: 
  3:     /**
  4:      * 等待間隔,單位:毫秒
  5:      */
  6:     private static long waitInterval =
  7:         Long.parseLong(System.getProperty(
  8:             "rocketmq.client.rebalance.waitInterval", "20000"));
  9: 
 10:     private final Logger log = ClientLogger.getLog();
 11:     /**
 12:      * MQClient物件
 13:      */
 14:     private final MQClientInstance mqClientFactory;
 15: 
 16:     public RebalanceService(MQClientInstance mqClientFactory) {
 17:         this.mqClientFactory = mqClientFactory;
 18:     }
 19: 
 20:     @Override
 21:     public void run() {
 22:         log.info(this.getServiceName() + " service started");
 23: 
 24:         while (!this.isStopped()) {
 25:             this.waitForRunning(waitInterval);
 26:             this.mqClientFactory.doRebalance();
 27:         }
 28: 
 29:         log.info(this.getServiceName() + " service end");
 30:     }
 31: 
 32:     @Override
 33:     public String getServiceName() {
 34:         return RebalanceService.class.getSimpleName();
 35:     }
 36: }
  • 說明 :均衡訊息佇列服務,負責分配當前 Consumer 可消費的訊息佇列( MessageQueue )。
  • 第 26 行 :呼叫 MQClientInstance#doRebalance(...) 分配訊息佇列。目前有三種情況情況下觸發:

    • 第 25 行 等待超時,每 20s 呼叫一次。
    • PushConsumer 啟動時,呼叫 rebalanceService#wakeup(...) 觸發。
    • Broker 通知 Consumer 加入 或 移除時,Consumer 響應通知,呼叫 rebalanceService#wakeup(...) 觸發。

MQClientInstance#doRebalance(…)

  1: public void doRebalance() {
  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  3:         MQConsumerInner impl = entry.getValue();
  4:         if (impl != null) {
  5:             try {
  6:                 impl.doRebalance();
  7:             } catch (Throwable e) {
  8:                 log.error("doRebalance exception", e);
  9:             }
 10:         }
 11:     }
 12: }
  • 說明 :遍歷當前 Client 包含的 consumerTable( Consumer集合 ),執行訊息佇列分配。
  • 疑問:目前程式碼除錯下來,consumerTable 只包含 Consumer 自己。��有大大對這個疑問有解答的,煩請解答下。
  • 第 6 行 :呼叫 MQConsumerInner#doRebalance(...) 進行佇列分配。DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 分別對該介面方法進行了實現。DefaultMQPushConsumerImpl#doRebalance(...) 詳細解析見:DefaultMQPushConsumerImpl#doRebalance(…)

DefaultMQPushConsumerImpl#doRebalance(…)

  1: public void doRebalance() {
  2:     if (!this.pause) {
  3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
  4:     }
  5: }

RebalanceImpl#doRebalance(…)

  1: /**
  2:  * 執行分配訊息佇列
  3:  *
  4:  * @param isOrder 是否順序訊息
  5:  */
  6: public void doRebalance(final boolean isOrder) {
  7:     // 分配每個 topic 的訊息佇列
  8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  9:     if (subTable != null) {
 10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 11:             final String topic = entry.getKey();
 12:             try {
 13:                 this.rebalanceByTopic(topic, isOrder);
 14:             } catch (Throwable e) {
 15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 16:                     log.warn("rebalanceByTopic Exception", e);
 17:                 }
 18:             }
 19:         }
 20:     }
 21:     // 移除未訂閱的topic對應的訊息佇列
 22:     this.truncateMessageQueueNotMyTopic();
 23: }
 24: 
 25: /**
 26:  * 移除未訂閱的訊息佇列
 27:  */
 28: private void truncateMessageQueueNotMyTopic() {
 29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 30:     for (MessageQueue mq : this.processQueueTable.keySet()) {
 31:         if (!subTable.containsKey(mq.getTopic())) {
 32: 
 33:             ProcessQueue pq = this.processQueueTable.remove(mq);
 34:             if (pq != null) {
 35:                 pq.setDropped(true);
 36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
 37:             }
 38:         }
 39:     }
 40: }
  • #doRebalance(...) 說明 :執行分配訊息佇列。
    • 第 7 至 20 行 :迴圈訂閱主題集合( subscriptionInner ),分配每一個 Topic 的訊息佇列。
    • 第 22 行 :移除未訂閱的 Topic 的訊息佇列。
  • #truncateMessageQueueNotMyTopic(...) 說明 :移除未訂閱的訊息佇列。當呼叫 DefaultMQPushConsumer#unsubscribe(topic) 時,只移除訂閱主題集合( subscriptionInner ),對應訊息佇列移除在該方法。

RebalanceImpl#rebalanceByTopic(…)

  1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
  2:     switch (messageModel) {
  3:         case BROADCASTING: {
  4:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  5:             if (mqSet != null) {
  6:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  7:                 if (changed) {
  8:                     this.messageQueueChanged(topic, mqSet, mqSet);
  9:                     log.info("messageQueueChanged {} {} {} {}", //
 10:                         consumerGroup, //
 11:                         topic, //
 12:                         mqSet, //
 13:                         mqSet);
 14:                 }
 15:             } else {
 16:                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 17:             }
 18:             break;
 19:         }
 20:         case CLUSTERING: {
 21:             // 獲取 topic 對應的 佇列 和 consumer資訊
 22:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 23:             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
 24:             if (null == mqSet) {
 25:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 26:                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 27:                 }
 28:             }
 29: 
 30:             if (null == cidAll) {
 31:                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
 32:             }
 33: 
 34:             if (mqSet != null && cidAll != null) {
 35:                 // 排序 訊息佇列 和 消費者陣列。因為是在Client進行分配佇列,排序後,各Client的順序才能保持一致。
 36:                 List<MessageQueue> mqAll = new ArrayList<>();
 37:                 mqAll.addAll(mqSet);
 38: 
 39:                 Collections.sort(mqAll);
 40:                 Collections.sort(cidAll);
 41: 
 42:                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 43: 
 44:                 // 根據 佇列分配策略 分配訊息佇列
 45:                 List<MessageQueue> allocateResult;
 46:                 try {
 47:                     allocateResult = strategy.allocate(//
 48:                         this.consumerGroup, //
 49:                         this.mQClientFactory.getClientId(), //
 50:                         mqAll, //
 51:                         cidAll);
 52:                 } catch (Throwable e) {
 53:                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
 54:                         e);
 55:                     return;
 56:                 }
 57: 
 58:                 Set<MessageQueue> allocateResultSet = new HashSet<>();
 59:                 if (allocateResult != null) {
 60:                     allocateResultSet.addAll(allocateResult);
 61:                 }
 62: 
 63:                 // 更新訊息佇列
 64:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
 65:                 if (changed) {
 66:                     log.info(
 67:                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
 68:                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
 69:                         allocateResultSet.size(), allocateResultSet);
 70:                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
 71:                 }
 72:             }
 73:             break;
 74:         }
 75:         default:
 76:             break;
 77:     }
 78: }
 79: 
 80: /**
 81:  * 當負載均衡時,更新 訊息處理佇列
 82:  * - 移除 在processQueueTable && 不存在於 mqSet 裡的訊息佇列
 83:  * - 增加 不在processQueueTable && 存在於mqSet 裡的訊息佇列
 84:  *
 85:  * @param topic Topic
 86:  * @param mqSet 負載均衡結果後的訊息佇列陣列
 87:  * @param isOrder 是否順序
 88:  * @return 是否變更
 89:  */
 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
 91:     boolean changed = false;
 92: 
 93:     // 移除 在processQueueTable && 不存在於 mqSet 裡的訊息佇列
 94:     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 95:     while (it.hasNext()) { // TODO 待讀:
 96:         Entry<MessageQueue, ProcessQueue> next = it.next();
 97:         MessageQueue mq = next.getKey();
 98:         ProcessQueue pq = next.getValue();
 99: 
100:         if (mq.getTopic().equals(topic)) {
101:             if (!mqSet.contains(mq)) { // 不包含的佇列
102:                 pq.setDropped(true);
103:                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104:                     it.remove();
105:                     changed = true;
106:                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107:                 }
108:             } else if (pq.isPullExpired()) { // 佇列拉取超時,進行清理
109:                 switch (this.consumeType()) {
110:                     case CONSUME_ACTIVELY:
111:                         break;
112:                     case CONSUME_PASSIVELY:
113:                         pq.setDropped(true);
114:                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115:                             it.remove();
116:                             changed = true;
117:                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118:                                 consumerGroup, mq);
119:                         }
120:                         break;
121:                     default:
122:                         break;
123:                 }
124:             }
125:         }
126:     }
127: 
128:     // 增加 不在processQueueTable && 存在於mqSet 裡的訊息佇列。
129:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉訊息請求陣列
130:     for (MessageQueue mq : mqSet) {
131:         if (!this.processQueueTable.containsKey(mq)) {
132:             if (isOrder && !this.lock(mq)) {
133:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134:                 continue;
135:             }
136: 
137:             this.removeDirtyOffset(mq);
138:             ProcessQueue pq = new ProcessQueue();
139:             long nextOffset = this.computePullFromWhere(mq);
140:             if (nextOffset >= 0) {
141:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142:                 if (pre != null) {
143:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144:                 } else {
145:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146:                     PullRequest pullRequest = new PullRequest();
147:                     pullRequest.setConsumerGroup(consumerGroup);
148:                     pullRequest.setNextOffset(nextOffset);
149:                     pullRequest.setMessageQueue(mq);
150:                     pullRequest.setProcessQueue(pq);
151:                     pullRequestList.add(pullRequest);
152:                     changed = true;
153:                 }
154:             } else {
155:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156:             }
157:         }
158:     }
159: 
160:     // 發起訊息拉取請求
161:     this.dispatchPullRequest(pullRequestList);
162: 
163:     return changed;
164: }
  • #rebalanceByTopic(...) 說明 :分配 Topic 的訊息佇列。
    • 第 3 至 19 行 :廣播模式( BROADCASTING ) 下,分配 Topic 對應的所有訊息佇列。
    • 第 20 至 74 行 :叢集模式( CLUSTERING ) 下,分配 Topic 對應的部分訊息佇列。
      • 第 21 至 40 行 :獲取 Topic 對應的訊息佇列和消費者們,並對其進行排序。因為各 Consumer 是在本地分配訊息佇列,排序後才能保證各 Consumer 順序一致。
      • 第 42 至 61 行 :根據 佇列分配策略( AllocateMessageQueueStrategy ) 分配訊息佇列。詳細解析見:AllocateMessageQueueStrategy
      • 第 63 至 72 行 :更新 Topic 對應的訊息佇列。
  • #updateProcessQueueTableInRebalance(...) 說明 :當分配佇列時,更新 Topic 對應的訊息佇列,並返回是否有變更。
    • 第 93 至 126 行 :移除不存在於分配的訊息佇列( mqSet ) 的 訊息處理佇列( processQueueTable )。
      • 第 108 至 120 行 :佇列拉取超時,即 當前時間 - 最後一次拉取訊息時間 > 120s ( 120s 可配置),判定發生 BUG,過久未進行訊息拉取,移除訊息佇列。移除後,下面#新增佇列邏輯#可以重新加入新的該訊息佇列。
    • 第 128 至 158 行 :增加 分配的訊息佇列( mqSet ) 新增的訊息佇列。

RebalanceImpl#removeUnnecessaryMessageQueue(…)

RebalancePushImpl#removeUnnecessaryMessageQueue(…)

  1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     // 同步佇列的消費進度,並移除之。
  3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
  4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
  5:     // TODO 順序消費
  6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
  7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
  8:         try {
  9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
 10:                 try {
 11:                     return this.unlockDelay(mq, pq);
 12:                 } finally {
 13:                     pq.getLockConsume().unlock();
 14:                 }
 15:             } else {
 16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
 17:                     mq, //
 18:                     pq.getTryUnlockTimes());
 19: 
 20:                 pq.incTryUnlockTimes();
 21:             }
 22:         } catch (Exception e) {
 23:             log.error("removeUnnecessaryMessageQueue Exception", e);
 24:         }
 25: 
 26:         return false;
 27:     }
 28:     return true;
 29: }
  • 說明 :移除不需要的訊息佇列相關的資訊,並返回是否移除成功。
  • 第 2 至 4 行 :同步佇列的消費進度,並移除之。

[PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(…)

  1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
  3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
  4:     return true;
  5: }
  • 說明 :移除不需要的訊息佇列相關的資訊,並返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。

RebalancePushImpl#dispatchPullRequest(…)

  1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
  2:     for (PullRequest pullRequest : pullRequestList) {
  3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
  4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
  5:     }
  6: }
  • 說明 :發起訊息拉取請求。該呼叫是PushConsumer不斷不斷不斷拉取訊息的起點

DefaultMQPushConsumerImpl#executePullRequestImmediately(…)

  1: public void executePullRequestImmediately(final PullRequest pullRequest) {
  2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
  3: }
  • 說明 :提交拉取請求。提交後,PullMessageService 非同步執行非阻塞。詳細解析見:PullMessageService

AllocateMessageQueueStrategy

AllocateMessageQueueStrategy類圖

AllocateMessageQueueAveragely

  1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
  2:     private final Logger log = ClientLogger.getLog();
  3: 
  4:     @Override
  5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  6:         List<String> cidAll) {
  7:         // 校驗引數是否正確
  8:         if (currentCID == null || currentCID.length() < 1) {
  9:             throw new IllegalArgumentException("currentCID is empty");
 10:         }
 11:         if (mqAll == null || mqAll.isEmpty()) {
 12:             throw new IllegalArgumentException("mqAll is null or mqAll empty");
 13:         }
 14:         if (cidAll == null || cidAll.isEmpty()) {
 15:             throw new IllegalArgumentException("cidAll is null or cidAll empty");
 16:         }
 17: 
 18:         List<MessageQueue> result = new ArrayList<>();
 19:         if (!cidAll.contains(currentCID)) {
 20:             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
 21:                 consumerGroup,
 22:                 currentCID,
 23:                 cidAll);
 24:             return result;
 25:         }
 26:         // 平均分配
 27:         int index = cidAll.indexOf(currentCID); // 第幾個consumer。
 28:         int mod = mqAll.size() % cidAll.size(); // 餘數,即多少訊息佇列無法平均分配。
 29:         int averageSize =
 30:             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
 31:                 + 1 : mqAll.size() / cidAll.size());
 32:         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有餘數的情況下,[0, mod) 平分餘數,即每consumer多分配一個節點;第index開始,跳過前mod餘數。
 33:         int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配佇列數量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到訊息佇列。
 34:         for (int i = 0; i < range; i++) {
 35:             result.add(mqAll.get((startIndex + i) % mqAll.size()));
 36:         }
 37:         return result;
 38:     }
 39: 
 40:     @Override
 41:     public String getName() {
 42:         return "AVG";
 43:     }
 44: }
  • 說明 :平均分配佇列策略。
  • 第 7 至 25 行 :引數校驗。
  • 第 26 至 36 行 :平均分配訊息佇列。
    • 第 27 行 :index :當前 Consumer 在消費叢集裡是第幾個。這裡就是為什麼需要對傳入的 cidAll 引數必須進行排序的原因。如果不排序,Consumer 在本地計算出來的 index 無法一致,影響計算結果。
    • 第 28 行 :mod :餘數,即多少訊息佇列無法平均分配。
    • 第 29 至 31 行 :averageSize :程式碼可以簡化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
      • [ 0, mod )mqAll.size() / cidAll.size() + 1。前面 modConsumer 平分餘數,多獲得 1 個訊息佇列。
      • [ mod, cidAll.size() )mqAll.size() / cidAll.size()
    • 第 32 行 :startIndexConsumer 分配訊息佇列開始位置。
    • 第 33 行 :range :分配佇列數量。之所以要 Math#min(...) 的原因:當 mqAll.size() <= cidAll.size() 時,最後幾個 Consumer 分配不到訊息佇列。
    • 第 34 至 36 行 :生成分配訊息佇列結果。
  • 舉個例子:

固定訊息佇列長度為4

Consumer * 2 可以整除 Consumer * 3 不可整除 Consumer * 5 無法都分配
訊息佇列[0] Consumer[0] Consumer[0]
訊息佇列[1] Consumer[0] Consumer[0]
訊息佇列[2] Consumer[1] Consumer[1]
訊息佇列[3] Consumer[1] Consumer[2]

AllocateMessageQueueByMachineRoom

  1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
  2:     /**
  3:      * 消費者消費brokerName集合
  4:      */
  5:     private Set<String> consumeridcs;
  6: 
  7:     @Override
  8:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  9:         List<String> cidAll) {
 10:         // 引數校驗
 11:         List<MessageQueue> result = new ArrayList<MessageQueue>();
 12:         int currentIndex = cidAll.indexOf(currentCID);
 13:         if (currentIndex < 0) {
 14:             return result;
 15:         }
 16:         // 計算符合當前配置的消費者陣列('consumeridcs')對應的訊息佇列
 17:         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
 18:         for (MessageQueue mq : mqAll) {
 19:             String[] temp = mq.getBrokerName().split("@");
 20:             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
 21:                 premqAll.add(mq);
 22:             }
 23:         }
 24:         // 平均分配
 25:         int mod = premqAll.size() / cidAll.size();
 26:         int rem = premqAll.size() % cidAll.size();
 27:         int startIndex = mod * currentIndex;
 28:         int endIndex = startIndex + mod;
 29:         for (int i = startIndex; i < endIndex; i++) {
 30:             result.add(mqAll.get(i));
 31:         }
 32:         if (rem > currentIndex) {
 33:             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
 34:         }
 35:         return result;
 36:     }
 37: 
 38:     @Override
 39:     public String getName() {
 40:         return "MACHINE_ROOM";
 41:     }
 42: 
 43:     public Set<String> getConsumeridcs() {
 44:         return consumeridcs;
 45:     }
 46: 
 47:     public void setConsumeridcs(Set<String> consumeridcs) {
 48:         this.consumeridcs = consumeridcs;
 49:     }
 50: }
  • 說明 :平均分配可消費的 Broker 對應的訊息佇列。
  • 第 7 至 15 行 :引數校驗。
  • 第 16 至 23 行 :計算可消費的 Broker 對應的訊息佇列。
  • 第 25 至 34 行 :平均分配訊息佇列。該平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是將多餘的結尾部分分配給前 remConsumer
  • 疑問:使用該分配策略時,ConsumerBroker 分配需要怎麼配置。��等研究主從相關原始碼時,仔細考慮下。

AllocateMessageQueueAveragelyByCircle

Java
1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校驗引數是否正確
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == nu