RocketMQ學習筆記三之【DefaultMQPushConsumer流量控制】
阿新 • • 發佈:2018-12-25
上一節我們已經把DefaultMQPushConsumer的大體流程分析了一下,從這節開始我們分析一部分訊息處理的細節問題。
繼續在DefaultMQPushConsumerImpl的pullMessage方法中有個ProcessQueue,待會我們來分析這個佇列的作用。
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
...
}
在多執行緒處理訊息過程中,是通過執行緒池線上程池中各個訊息處理邏輯同時進行,程式碼如下:
public void pullMessage(final PullRequest pullRequest) { ... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ... } public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); }
pull獲得訊息如果直接提交到執行緒池中很難監控和控制,例如訊息堆積數量、重複處理某些訊息,延遲處理訊息。rocketmq是通過一個快照類ProcessQueue,在pushConsumer執行的時候,每個Message Queue都會對應一個ProcessQueue物件,儲存該Message Queue訊息處理狀態的快照。
ProcessQueue是通過TreeMap和讀寫鎖實現。TreeMap裡以Message Queue的offset作為key,以訊息內容引用為value,儲存了所有從MessageQueue獲取到但是還沒有被處理的訊息。
下面我來看看程式碼中邏輯控制:
public void pullMessage(final PullRequest pullRequest) { ... // 如果processQueue訊息數量>佇列級別的流量控制閾值,預設情況下,每個訊息佇列最多快取1000條訊息 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 如果佇列快取訊息的大小超過100M(預設),考慮到批量訊息瞬時可能超過100M if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (!this.consumeOrderly) { // 判斷是否大於最大訊息便宜跨度(預設2000) if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } ... }
通過上面原始碼可以看出PushConsumer會判斷獲取但未處理的訊息個數,訊息總大小、offset跨度,任何一個超過限制就隔一段時間再拉取訊息,從而達到控制流量的目的。