RocketMQ原始碼解析:Message拉取&消費(下)
title: RocketMQ 原始碼分析 —— Message 拉取與消費(下)
date: 2017-05-11
tags:
categories: RocketMQ
permalink: RocketMQ/message-pull-and-consume-second
������關注微信公眾號:【芋艿的後端小屋】有福利:
1. RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表
2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋原始碼 GitHub 地址
3. 您對於原始碼的疑問每條留言都將得到認真回覆。甚至不知道如何讀原始碼也可以請教噢。
4. 新的原始碼解析文章實時收到通知。每週更新一篇左右。
1、概述
主要解析 Consumer
在 消費 邏輯涉及到的原始碼。
2、Consumer
MQ 提供了兩類消費者:
- PushConsumer:
- 在大多數場景下使用。
- 名字雖然是
Push
開頭,實際在實現時,使用Pull
方式實現。通過Pull
不斷不斷不斷輪詢Broker
獲取訊息。當不存在新訊息時,Broker
會掛起請求,直到有新訊息產生,取消掛起,返回新訊息。這樣,基本和Broker
主動Push
做到接近的實時性(當然,還是有相應的實時性損失)。原理類似 。
- PullConsumer
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
3、PushConsumer 一覽
先看一張 PushConsumer
包含的元件以及元件之間的互動圖:
RebalanceService
:均衡訊息佇列服務,負責分配當前Consumer
可消費的訊息佇列(MessageQueue
)。當有新的Consumer
的加入或移除,都會重新分配訊息佇列。PullMessageService
:拉取訊息服務,不斷不斷不斷從Broker
拉取訊息,並提交消費任務到ConsumeMessageService
。ConsumeMessageService
:消費訊息服務,不斷不斷不斷消費訊息,並處理消費結果。RemoteBrokerOffsetStore
:Consumer
消費進度管理,負責從Broker
獲取消費進度,同步消費進度到Broker
。ProcessQueue
:訊息處理佇列。MQClientInstance
:封裝對Namesrv
,Broker
的 API呼叫,提供給Producer
、Consumer
使用。
4、PushConsumer 訂閱
DefaultMQPushConsumerImpl#subscribe(…)
1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 建立訂閱資料
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通過心跳同步Consumer資訊到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
- 說明 :訂閱
Topic
。 - 第 7 至 10 行 :通過心跳同步
Consumer
資訊到Broker
。
FilterAPI.buildSubscriptionData(…)
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 處理訂閱表示式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
- 說明 :根據
Topic
和 訂閱表示式 建立訂閱資料 - subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(…)
1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
- 說明 :註冊訊息監聽器。
5、PushConsumer 訊息佇列分配
RebalanceService
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待間隔,單位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient物件
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
- 說明 :均衡訊息佇列服務,負責分配當前
Consumer
可消費的訊息佇列(MessageQueue
)。 第 26 行 :呼叫
MQClientInstance#doRebalance(...)
分配訊息佇列。目前有三種情況情況下觸發:- 如
第 25 行
等待超時,每 20s 呼叫一次。 PushConsumer
啟動時,呼叫rebalanceService#wakeup(...)
觸發。Broker
通知Consumer
加入 或 移除時,Consumer
響應通知,呼叫rebalanceService#wakeup(...)
觸發。
- 如
MQClientInstance#doRebalance(…)
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
- 說明 :遍歷當前
Client
包含的consumerTable
(Consumer
集合 ),執行訊息佇列分配。 - 疑問:目前程式碼除錯下來,
consumerTable
只包含Consumer
自己。��有大大對這個疑問有解答的,煩請解答下。 - 第 6 行 :呼叫
MQConsumerInner#doRebalance(...)
進行佇列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分別對該介面方法進行了實現。DefaultMQPushConsumerImpl#doRebalance(...)
詳細解析見:DefaultMQPushConsumerImpl#doRebalance(…)。
DefaultMQPushConsumerImpl#doRebalance(…)
1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }
- 說明:執行訊息佇列分配。
- 第 3 行 :呼叫
RebalanceImpl#doRebalance(...)
進行佇列分配。詳細解析見:RebalancePushImpl#doRebalance(…)。
RebalanceImpl#doRebalance(…)
1: /**
2: * 執行分配訊息佇列
3: *
4: * @param isOrder 是否順序訊息
5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每個 topic 的訊息佇列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未訂閱的topic對應的訊息佇列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /**
26: * 移除未訂閱的訊息佇列
27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }
#doRebalance(...)
說明 :執行分配訊息佇列。
- 第 7 至 20 行 :迴圈訂閱主題集合(
subscriptionInner
),分配每一個Topic
的訊息佇列。 - 第 22 行 :移除未訂閱的
Topic
的訊息佇列。
- 第 7 至 20 行 :迴圈訂閱主題集合(
#truncateMessageQueueNotMyTopic(...)
說明 :移除未訂閱的訊息佇列。當呼叫DefaultMQPushConsumer#unsubscribe(topic)
時,只移除訂閱主題集合(subscriptionInner
),對應訊息佇列移除在該方法。
RebalanceImpl#rebalanceByTopic(…)
1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
2: switch (messageModel) {
3: case BROADCASTING: {
4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5: if (mqSet != null) {
6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
7: if (changed) {
8: this.messageQueueChanged(topic, mqSet, mqSet);
9: log.info("messageQueueChanged {} {} {} {}", //
10: consumerGroup, //
11: topic, //
12: mqSet, //
13: mqSet);
14: }
15: } else {
16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17: }
18: break;
19: }
20: case CLUSTERING: {
21: // 獲取 topic 對應的 佇列 和 consumer資訊
22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
24: if (null == mqSet) {
25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
27: }
28: }
29:
30: if (null == cidAll) {
31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
32: }
33:
34: if (mqSet != null && cidAll != null) {
35: // 排序 訊息佇列 和 消費者陣列。因為是在Client進行分配佇列,排序後,各Client的順序才能保持一致。
36: List<MessageQueue> mqAll = new ArrayList<>();
37: mqAll.addAll(mqSet);
38:
39: Collections.sort(mqAll);
40: Collections.sort(cidAll);
41:
42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
43:
44: // 根據 佇列分配策略 分配訊息佇列
45: List<MessageQueue> allocateResult;
46: try {
47: allocateResult = strategy.allocate(//
48: this.consumerGroup, //
49: this.mQClientFactory.getClientId(), //
50: mqAll, //
51: cidAll);
52: } catch (Throwable e) {
53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
54: e);
55: return;
56: }
57:
58: Set<MessageQueue> allocateResultSet = new HashSet<>();
59: if (allocateResult != null) {
60: allocateResultSet.addAll(allocateResult);
61: }
62:
63: // 更新訊息佇列
64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
65: if (changed) {
66: log.info(
67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
69: allocateResultSet.size(), allocateResultSet);
70: this.messageQueueChanged(topic, mqSet, allocateResultSet);
71: }
72: }
73: break;
74: }
75: default:
76: break;
77: }
78: }
79:
80: /**
81: * 當負載均衡時,更新 訊息處理佇列
82: * - 移除 在processQueueTable && 不存在於 mqSet 裡的訊息佇列
83: * - 增加 不在processQueueTable && 存在於mqSet 裡的訊息佇列
84: *
85: * @param topic Topic
86: * @param mqSet 負載均衡結果後的訊息佇列陣列
87: * @param isOrder 是否順序
88: * @return 是否變更
89: */
90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
91: boolean changed = false;
92:
93: // 移除 在processQueueTable && 不存在於 mqSet 裡的訊息佇列
94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
95: while (it.hasNext()) { // TODO 待讀:
96: Entry<MessageQueue, ProcessQueue> next = it.next();
97: MessageQueue mq = next.getKey();
98: ProcessQueue pq = next.getValue();
99:
100: if (mq.getTopic().equals(topic)) {
101: if (!mqSet.contains(mq)) { // 不包含的佇列
102: pq.setDropped(true);
103: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104: it.remove();
105: changed = true;
106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107: }
108: } else if (pq.isPullExpired()) { // 佇列拉取超時,進行清理
109: switch (this.consumeType()) {
110: case CONSUME_ACTIVELY:
111: break;
112: case CONSUME_PASSIVELY:
113: pq.setDropped(true);
114: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115: it.remove();
116: changed = true;
117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118: consumerGroup, mq);
119: }
120: break;
121: default:
122: break;
123: }
124: }
125: }
126: }
127:
128: // 增加 不在processQueueTable && 存在於mqSet 裡的訊息佇列。
129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉訊息請求陣列
130: for (MessageQueue mq : mqSet) {
131: if (!this.processQueueTable.containsKey(mq)) {
132: if (isOrder && !this.lock(mq)) {
133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134: continue;
135: }
136:
137: this.removeDirtyOffset(mq);
138: ProcessQueue pq = new ProcessQueue();
139: long nextOffset = this.computePullFromWhere(mq);
140: if (nextOffset >= 0) {
141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142: if (pre != null) {
143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144: } else {
145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146: PullRequest pullRequest = new PullRequest();
147: pullRequest.setConsumerGroup(consumerGroup);
148: pullRequest.setNextOffset(nextOffset);
149: pullRequest.setMessageQueue(mq);
150: pullRequest.setProcessQueue(pq);
151: pullRequestList.add(pullRequest);
152: changed = true;
153: }
154: } else {
155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156: }
157: }
158: }
159:
160: // 發起訊息拉取請求
161: this.dispatchPullRequest(pullRequestList);
162:
163: return changed;
164: }
#rebalanceByTopic(...)
說明 :分配Topic
的訊息佇列。
- 第 3 至 19 行 :廣播模式(
BROADCASTING
) 下,分配Topic
對應的所有訊息佇列。 - 第 20 至 74 行 :叢集模式(
CLUSTERING
) 下,分配Topic
對應的部分訊息佇列。
- 第 21 至 40 行 :獲取
Topic
對應的訊息佇列和消費者們,並對其進行排序。因為各Consumer
是在本地分配訊息佇列,排序後才能保證各Consumer
順序一致。 - 第 42 至 61 行 :根據 佇列分配策略(
AllocateMessageQueueStrategy
) 分配訊息佇列。詳細解析見:AllocateMessageQueueStrategy。 - 第 63 至 72 行 :更新
Topic
對應的訊息佇列。
- 第 21 至 40 行 :獲取
- 第 3 至 19 行 :廣播模式(
#updateProcessQueueTableInRebalance(...)
說明 :當分配佇列時,更新Topic
對應的訊息佇列,並返回是否有變更。
- 第 93 至 126 行 :移除不存在於分配的訊息佇列(
mqSet
) 的 訊息處理佇列(processQueueTable
)。
- 第 108 至 120 行 :佇列拉取超時,即
當前時間 - 最後一次拉取訊息時間 > 120s
( 120s 可配置),判定發生 BUG,過久未進行訊息拉取,移除訊息佇列。移除後,下面#新增佇列邏輯#可以重新加入新的該訊息佇列。
- 第 108 至 120 行 :佇列拉取超時,即
- 第 128 至 158 行 :增加 分配的訊息佇列(
mqSet
) 新增的訊息佇列。
- 第 93 至 126 行 :移除不存在於分配的訊息佇列(
RebalanceImpl#removeUnnecessaryMessageQueue(…)
RebalancePushImpl#removeUnnecessaryMessageQueue(…)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步佇列的消費進度,並移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 順序消費
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }
- 說明 :移除不需要的訊息佇列相關的資訊,並返回是否移除成功。
- 第 2 至 4 行 :同步佇列的消費進度,並移除之。
[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(…)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }
- 說明 :移除不需要的訊息佇列相關的資訊,並返回移除成功。和
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。
RebalancePushImpl#dispatchPullRequest(…)
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }
- 說明 :發起訊息拉取請求。該呼叫是
PushConsumer
不斷不斷不斷拉取訊息的起點。
DefaultMQPushConsumerImpl#executePullRequestImmediately(…)
1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }
- 說明 :提交拉取請求。提交後,
PullMessageService
非同步執行,非阻塞。詳細解析見:PullMessageService。
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely
1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校驗引數是否正確
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26: // 平均分配
27: int index = cidAll.indexOf(currentCID); // 第幾個consumer。
28: int mod = mqAll.size() % cidAll.size(); // 餘數,即多少訊息佇列無法平均分配。
29: int averageSize =
30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
31: + 1 : mqAll.size() / cidAll.size());
32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有餘數的情況下,[0, mod) 平分餘數,即每consumer多分配一個節點;第index開始,跳過前mod餘數。
33: int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配佇列數量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到訊息佇列。
34: for (int i = 0; i < range; i++) {
35: result.add(mqAll.get((startIndex + i) % mqAll.size()));
36: }
37: return result;
38: }
39:
40: @Override
41: public String getName() {
42: return "AVG";
43: }
44: }
- 說明 :平均分配佇列策略。
- 第 7 至 25 行 :引數校驗。
- 第 26 至 36 行 :平均分配訊息佇列。
- 第 27 行 :
index
:當前Consumer
在消費叢集裡是第幾個。這裡就是為什麼需要對傳入的cidAll
引數必須進行排序的原因。如果不排序,Consumer
在本地計算出來的index
無法一致,影響計算結果。 - 第 28 行 :
mod
:餘數,即多少訊息佇列無法平均分配。 - 第 29 至 31 行 :
averageSize
:程式碼可以簡化成(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
。
[ 0, mod )
:mqAll.size() / cidAll.size() + 1
。前面mod
個Consumer
平分餘數,多獲得 1 個訊息佇列。[ mod, cidAll.size() )
:mqAll.size() / cidAll.size()
。
- 第 32 行 :
startIndex
:Consumer
分配訊息佇列開始位置。 - 第 33 行 :
range
:分配佇列數量。之所以要Math#min(...)
的原因:當mqAll.size() <= cidAll.size()
時,最後幾個Consumer
分配不到訊息佇列。 - 第 34 至 36 行 :生成分配訊息佇列結果。
- 第 27 行 :
- 舉個例子:
固定訊息佇列長度為4。
Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 無法都分配 |
---|---|---|
訊息佇列[0] | Consumer[0] | Consumer[0] |
訊息佇列[1] | Consumer[0] | Consumer[0] |
訊息佇列[2] | Consumer[1] | Consumer[1] |
訊息佇列[3] | Consumer[1] | Consumer[2] |
AllocateMessageQueueByMachineRoom
1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
2: /**
3: * 消費者消費brokerName集合
4: */
5: private Set<String> consumeridcs;
6:
7: @Override
8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
9: List<String> cidAll) {
10: // 引數校驗
11: List<MessageQueue> result = new ArrayList<MessageQueue>();
12: int currentIndex = cidAll.indexOf(currentCID);
13: if (currentIndex < 0) {
14: return result;
15: }
16: // 計算符合當前配置的消費者陣列('consumeridcs')對應的訊息佇列
17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
18: for (MessageQueue mq : mqAll) {
19: String[] temp = mq.getBrokerName().split("@");
20: if (temp.length == 2 && consumeridcs.contains(temp[0])) {
21: premqAll.add(mq);
22: }
23: }
24: // 平均分配
25: int mod = premqAll.size() / cidAll.size();
26: int rem = premqAll.size() % cidAll.size();
27: int startIndex = mod * currentIndex;
28: int endIndex = startIndex + mod;
29: for (int i = startIndex; i < endIndex; i++) {
30: result.add(mqAll.get(i));
31: }
32: if (rem > currentIndex) {
33: result.add(premqAll.get(currentIndex + mod * cidAll.size()));
34: }
35: return result;
36: }
37:
38: @Override
39: public String getName() {
40: return "MACHINE_ROOM";
41: }
42:
43: public Set<String> getConsumeridcs() {
44: return consumeridcs;
45: }
46:
47: public void setConsumeridcs(Set<String> consumeridcs) {
48: this.consumeridcs = consumeridcs;
49: }
50: }
- 說明 :平均分配可消費的
Broker
對應的訊息佇列。 - 第 7 至 15 行 :引數校驗。
- 第 16 至 23 行 :計算可消費的
Broker
對應的訊息佇列。 - 第 25 至 34 行 :平均分配訊息佇列。該平均分配方式和
AllocateMessageQueueAveragely
略有不同,其是將多餘的結尾部分分配給前rem
個Consumer
。 - 疑問:使用該分配策略時,
Consumer
和Broker
分配需要怎麼配置。��等研究主從相關原始碼時,仔細考慮下。
AllocateMessageQueueAveragelyByCircle
Java
1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校驗引數是否正確
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == nu