1. 程式人生 > >RocketMQ學習筆記三之【DefaultMQPushConsumer流量控制】

RocketMQ學習筆記三之【DefaultMQPushConsumer流量控制】

上一節我們已經把DefaultMQPushConsumer的大體流程分析了一下,從這節開始我們分析一部分訊息處理的細節問題。
繼續在DefaultMQPushConsumerImpl的pullMessage方法中有個ProcessQueue,待會我們來分析這個佇列的作用。

 public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
         ...
 }

在多執行緒處理訊息過程中,是通過執行緒池線上程池中各個訊息處理邏輯同時進行,程式碼如下:

public void pullMessage(final PullRequest pullRequest) {
       ...
        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
       ...
 }

 public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

pull獲得訊息如果直接提交到執行緒池中很難監控和控制,例如訊息堆積數量、重複處理某些訊息,延遲處理訊息。rocketmq是通過一個快照類ProcessQueue,在pushConsumer執行的時候,每個Message Queue都會對應一個ProcessQueue物件,儲存該Message Queue訊息處理狀態的快照。

ProcessQueue是通過TreeMap和讀寫鎖實現。TreeMap裡以Message Queue的offset作為key,以訊息內容引用為value,儲存了所有從MessageQueue獲取到但是還沒有被處理的訊息。

下面我來看看程式碼中邏輯控制:

public void pullMessage(final PullRequest pullRequest) {
       ...
           // 如果processQueue訊息數量>佇列級別的流量控制閾值,預設情況下,每個訊息佇列最多快取1000條訊息
         if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        // 如果佇列快取訊息的大小超過100M(預設),考慮到批量訊息瞬時可能超過100M
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }

        if (!this.consumeOrderly) {
            // 判斷是否大於最大訊息便宜跨度(預設2000)
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        }
       ...
 }

通過上面原始碼可以看出PushConsumer會判斷獲取但未處理的訊息個數,訊息總大小、offset跨度,任何一個超過限制就隔一段時間再拉取訊息,從而達到控制流量的目的。