1. 程式人生 > >RocketMQ專題2:三種常用生產消費方式(順序、廣播、定時)以及順序消費原始碼探究

RocketMQ專題2:三種常用生產消費方式(順序、廣播、定時)以及順序消費原始碼探究

順序、廣播、定時任務

前插

​ 在進行常用的三種訊息型別例子展示的時候,我們先來說一說RocketMQ的幾個重要概念:

  • PullConsumer與PushConsumer:主要區別在於Pull與Push的區別。對於PullConsumer,消費者會主動從broker中拉取訊息進行消費。而對於PushConsumer,會封裝包含訊息獲取、訊息處理以及其他相關操作的介面給程式呼叫
  • Tag: Tag可以看做是一個子主題(sub-topic),可以進一步細化主題下的相關子業務。提高程式的靈活性和可擴充套件性
  • Broker:RocketMQ的核心元件之一。用來從生產者處接收訊息,儲存訊息以及將訊息推送給消費者。同時RocketMQ的broker也用來儲存訊息相關的資料,比如消費者組、消費處理的偏移量、主題以及訊息佇列等
  • Name Server: 可以看做是一個資訊路由器。生產者和消費者從NameServer中查詢對應的主題以及相應的broker

例項

​ 這裡我們不玩虛的,直接將三個型別的生產者,消費者程式碼例項給出(在官網給出的例子上做了些許改動和註釋說明):

生產者程式碼

/**
 * 多種型別組合訊息測試
 * @author ziyuqi
 *
 */
public class MultiTypeProducer {
    public static void main(String[] args) throws Exception {
        // 順序訊息生產者  FIFO
        OrderedProducer orderedProducer = new OrderedProducer();
        orderedProducer.produce();
        
        // 廣播訊息生產者
        /*BroadcastProducer broadcastProducer = new BroadcastProducer();
        broadcastProducer.produce();*/
        
        // 定時任務訊息生產者
        /*ScheduledProducer scheduledProducer = new ScheduledProducer();
        scheduledProducer.produce();*/
    }
}

/**
 * 按順序傳送訊息的生產者 
 * @author ziyuqi
 *
 */
class OrderedProducer {
    public void produce() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("GroupD");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        String[] tags = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"};
        for (int i=0; i<50; i++) {
            Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 * 所謂的順序,只能保證同一MessageQueue放入的訊息滿足FIFO。該方法返回應該將訊息放入那個MessageQueue,最後一個引數為send傳入的最後一個引數
                 * 如果需要全域性保持FIFO,則所有訊息應該依次放入同一佇列中去mqs佇列中的同一下標
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 訊息被分開放入多個佇列,每個佇列中的訊息保證按順序被消費FIFO
                    /*int index = (Integer) arg % mqs.size();
                    System.out.println("QueueSize:" + mqs.size());
                    return mqs.get(index);*/
                    
                    // 訊息全部放入同一佇列,全域性保持順序性 
                    return mqs.get(0);
                }
            }, i);
            System.out.println(sendResult);
        }
        producer.shutdown();
    }
}

/**
 * 廣播生產者
 * @author ziyuqi
 *
 */
class BroadcastProducer {
    public void produce() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("GroupA");
        // 也必須設定nameServer
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i=0; i<50; i++) {
            Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }
        producer.shutdown();
    }
}

/**
 * 定時訊息傳送者
 * @author ziyuqi
 *
 */
class ScheduledProducer {
    public void produce() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("GroupA");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i=0; i<50; i++) {
            Message message = new Message("scheduledTopic", ("Message:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 設定投遞的延遲時間
            message.setDelayTimeLevel(3);
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }
        producer.shutdown();
    }
}

消費者程式碼

public class MultiTypeConsumer {
    public static void main(String[] args) throws Exception {
        // 按順序消費者
        OrderedConsumer orderedConsumer = new OrderedConsumer();
        orderedConsumer.consume();
        
        // 廣播消費者
        /*BroadcastConsumer broadcastConsumer = new BroadcastConsumer();
        broadcastConsumer.consume();*/
        
        // 定時任務消費者
        /*ScheduledConsumer scheduledConsumer = new ScheduledConsumer();
        scheduledConsumer.consume();*/
    }
}


/**
 * 按順序的消費者
 * @author ziyuqi
 *
 */
class OrderedConsumer {
    public void consume() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupD");
        /*
         *  設定從哪裡開始消費 :
         *  當設定為: ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);       
        consumer.setNamesrvAddr("localhost:9876");
        // 設定定於的主題和tag(必須顯示指定tag)
        consumer.subscribe("OrderedTopic", "tagA || tagB || tagC || tagD || tagE");                     
        
        consumer.setMessageListener(new MessageListenerOrderly() {
            AtomicLong num = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                /**
                 *  設定是否自動提交: 預設自動提交,提交之後訊息就不能夠被再次消費。
                 *  非自動提交時,訊息可能會被重複消費
                 */
                context.setAutoCommit(false);
                this.num.incrementAndGet();
                try {
                    for (MessageExt msg : msgs) {
                        System.out.println("Received:num=" + this.num.get() +", queueId=" + msg.getQueueId() +  ", Keys=" + msg.getKeys() + ", value=" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                /*try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/
                if (this.num.get() % 3 == 0) {
                    // return ConsumeOrderlyStatus.ROLLBACK;
                } else if (this.num.get() % 4 == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if (this.num.get() % 5 == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            
                // 非主動提交的時候,SUCCESS不會導致佇列訊息提交,訊息未提交就可以被迴圈消費
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}


class BroadcastConsumer {
    public void consume() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 即使是廣播形式下,nameServer還是要設定
        consumer.setNamesrvAddr("localhost:9876");
        // 設定消費的訊息型別為廣播類訊息
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("BroadcastTopic", "tagA || tagB || tagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        System.out.println("Received:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

/**
 * 定時任務消費者
 * @author ziyuqi
 *
 */
class ScheduledConsumer {
    public void consume() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        consumer.setNamesrvAddr("localhost:9876");  
        consumer.subscribe("scheduledTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println("Received:[" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET) + "]" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + " ms later!");
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

原始碼與例項分析

​ 結合我上面的測試程式碼,以及我在測試中主要針對順序消費的疑惑和原始碼除錯。我這裡簡單分析下順序消費者的相關執行過程,大致的執行步驟如下:

消費者啟動

​ 我們知道每次consumer建立之後,都會呼叫consumer.start()方法來啟動消費者。跟進程式碼巢狀,不難發現最終會進入DefaultMQPushConsumerImplstart方法中,該方法的主要程式碼如下:

 public synchronized void start() throws MQClientException {
     switch (this.serviceState) {
             // 消費者啟動狀態滿足Create_just
         case CREATE_JUST:
             log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                      this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
             this.serviceState = ServiceState.START_FAILED;
             // 配置檢查
             this.checkConfig();

             this.copySubscription();

             if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                 this.defaultMQPushConsumer.changeInstanceNameToPID();
             }

             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

             this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
             this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
             this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
             this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

             this.pullAPIWrapper = new PullAPIWrapper(
                 mQClientFactory,
                 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
             this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

             if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
             } else {
                 switch (this.defaultMQPushConsumer.getMessageModel()) {
                     case BROADCASTING:
                         this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                         break;
                     case CLUSTERING:
                         this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                         break;
                     default:
                         break;
                 }
                 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
             }
             this.offsetStore.load();

             if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                 this.consumeOrderly = true;
                 this.consumeMessageService =
                     new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
             } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                 this.consumeOrderly = false;
                 this.consumeMessageService =
                     new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
             }

             this.consumeMessageService.start();

             boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
             if (!registerOK) {
                 this.serviceState = ServiceState.CREATE_JUST;
                 this.consumeMessageService.shutdown();
                 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                             + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                             null);
             }
             // 主要方法在這,啟動MQ客戶端工廠,進行訊息拉取
             mQClientFactory.start();
             log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
             this.serviceState = ServiceState.RUNNING;
             break;
         case RUNNING:
         case START_FAILED:
         case SHUTDOWN_ALREADY:
             throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                                         + this.serviceState
                                         + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                         null);
         default:
             break;
     }

     this.updateTopicSubscribeInfoWhenSubscriptionChanged();
     this.mQClientFactory.checkClientInBroker();
     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
     this.mQClientFactory.rebalanceImmediately();
 }

MQClient啟動

​ 上一段原始碼我們發現最終呼叫了mQClientFactory.start();.我們繼續跟進該方法,發現實際呼叫的是MQClientInstance.start()

 public void start() throws MQClientException {

     synchronized (this) {
         switch (this.serviceState) {
             case CREATE_JUST:
                 this.serviceState = ServiceState.START_FAILED;
                 // If not specified,looking address from name server
                 if (null == this.clientConfig.getNamesrvAddr()) {
                     this.mQClientAPIImpl.fetchNameServerAddr();
                 }
                 // Start request-response channel
                 this.mQClientAPIImpl.start();
                 // Start various schedule tasks
                 this.startScheduledTask();
                 // Start pull service 關鍵點在這呼叫了pullMessageService的start方法
                 this.pullMessageService.start();
                 // Start rebalance service
                 this.rebalanceService.start();
                 // Start push service
                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                 log.info("the client factory [{}] start OK", this.clientId);
                 this.serviceState = ServiceState.RUNNING;
                 break;
             case RUNNING:
                 break;
             case SHUTDOWN_ALREADY:
                 break;
             case START_FAILED:
                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
             default:
                 break;
         }
     }
 }

訊息拉取

​ 根據上一段程式碼的註釋,我們進入到核心的訊息推送程式碼PullMessageServicestart方法(實際上PullMessage繼承自Thread類,呼叫的是run方法):

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);  // 重點轉移到該方法具體推送實現
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer =         this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);      // 呼叫預設的拉訊息消費者實現
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

​ 我們繼續跟進DefaultMQPushConsumerImplpullMessage方法:

 public void pullMessage(final PullRequest pullRequest) {
     // ... 省略

     final long beginTimestamp = System.currentTimeMillis();
     // 該回調函式實際是對訊息消費的具體處理
     PullCallback pullCallback = new PullCallback() {
         @Override
         public void onSuccess(PullResult pullResult) {
             if (pullResult != null) {
                 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                              subscriptionData);

                 switch (pullResult.getPullStatus()) {
                     case FOUND:
                         long prevRequestOffset = pullRequest.getNextOffset();
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                         long pullRT = System.currentTimeMillis() - beginTimestamp;
                         DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                                            pullRequest.getMessageQueue().getTopic(), pullRT);

                         long firstMsgOffset = Long.MAX_VALUE;
                         if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                         } else {
                             firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                                 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                             boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                             // 向執行緒池丟入消費請求任務
                             DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                 pullResult.getMsgFoundList(),
                                 processQueue,
                                 pullRequest.getMessageQueue(),
                                 dispatchToConsume);

                             if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                             } else {
                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                             }
                         }

                         if (pullResult.getNextBeginOffset() < prevRequestOffset
                             || firstMsgOffset < prevRequestOffset) {
                             log.warn(
                                 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                 pullResult.getNextBeginOffset(),
                                 firstMsgOffset,
                                 prevRequestOffset);
                         }

                         break;
                     case NO_NEW_MSG:
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                         break;
                     case NO_MATCHED_MSG:
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                         break;
                     case OFFSET_ILLEGAL:
                         log.warn("the pull request offset illegal, {} {}",
                                  pullRequest.toString(), pullResult.toString());
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                         pullRequest.getProcessQueue().setDropped(true);
                         DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                             @Override
                             public void run() {
                                 try {
                                     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                                             pullRequest.getNextOffset(), false);

                                     DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                     log.warn("fix the pull request offset, {}", pullRequest);
                                 } catch (Throwable e) {
                                     log.error("executeTaskLater Exception", e);
                                 }
                             }
                         }, 10000);
                         break;
                     default:
                         break;
                 }
             }
         }

         @Override
         public void onException(Throwable e) {
             if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                 log.warn("execute the pull request exception", e);
             }

             DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
         }
     };

     // ... 省略
     try {
         this.pullAPIWrapper.pullKernelImpl(            // 定義訊息拉取核心實現的相關引數:包括拉取方式、回撥函式等,最終會通過Netty遠端請求訊息然後請求成功後呼叫回撥方法
             pullRequest.getMessageQueue(),
             subExpression,
             subscriptionData.getExpressionType(),
             subscriptionData.getSubVersion(),
             pullRequest.getNextOffset(),
             this.defaultMQPushConsumer.getPullBatchSize(),
             sysFlag,
             commitOffsetValue,
             BROKER_SUSPEND_MAX_TIME_MILLIS,
             CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
             CommunicationMode.ASYNC,
             pullCallback
         );
     } catch (Exception e) {
         log.error("pullKernelImpl exception", e);
         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
     }
 }

​ 以上程式碼註釋有三個重點的地方,具體的處理流程大致是這樣。首先this.pullAPIWrapper.pullKernelImpl這個方法定義了具體的訊息拉取策略,內部實現其實會根據訊息型別取拉取訊息。對於預設的叢集訊息模式,實際會呼叫Netty進行訊息拉取,拉取結束後會呼叫註釋中的回撥函式進行處理。最終實際會進入DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest,而實際上對於順序訊息消費會進入ConsumeMessageOrderlyServicesubmitConsumeRequest方法。該方法直接向消費執行緒池中放入一個消費請求任務。

消費請求任務

​ 我們繼續跟進ConsumeRequest消費請求任務的具體實現:

@Override
public void run() {
    if (this.processQueue.isDropped()) {
        log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
    }

    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) {
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            for (boolean continueConsume = true; continueConsume; ) {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    break;
                }

                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && !this.processQueue.isLocked()) {
                    log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                }

                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && this.processQueue.isLockExpired()) {
                    log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                }

                long interval = System.currentTimeMillis() - beginTime;
                if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                    break;
                }

                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                if (!msgs.isEmpty()) {
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                    ConsumeOrderlyStatus status = null;

                    ConsumeMessageContext consumeMessageContext = null;
                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext = new ConsumeMessageContext();
                        consumeMessageContext
                            .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                        consumeMessageContext.setMq(messageQueue);
                        consumeMessageContext.setMsgList(msgs);
                        consumeMessageContext.setSuccess(false);
                        // init the consume context type
                        consumeMessageContext.setProps(new HashMap<String, String>());
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                    }

                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try {
                        this.processQueue.getLockConsume().lock();
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                     this.messageQueue);
                            break;
                        }
                        // 呼叫註冊的listener消費訊息,並且得到返回結果
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                 RemotingHelper.exceptionSimpleDesc(e),
                                 ConsumeMessageOrderlyService.this.consumerGroup,
                                 msgs,
                                 messageQueue);
                        hasException = true;
                    } finally {
                        this.processQueue.getLockConsume().unlock();
                    }

                    if (null == status
                        || ConsumeOrderlyStatus.ROLLBACK == status
                        || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                        log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                 ConsumeMessageOrderlyService.this.consumerGroup,
                                 msgs,
                                 messageQueue);
                    }

                    long consumeRT = System.currentTimeMillis() - beginTimestamp;
                    if (null == status) {
                        if (hasException) {
                            returnType = ConsumeReturnType.EXCEPTION;
                        } else {
                            returnType = ConsumeReturnType.RETURNNULL;
                        }
                    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                        returnType = ConsumeReturnType.TIME_OUT;
                    } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                        returnType = ConsumeReturnType.FAILED;
                    } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                        returnType = ConsumeReturnType.SUCCESS;
                    }

                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                    }

                    if (null == status) {
                        status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }

                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext.setStatus(status.toString());
                        consumeMessageContext
                            .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                    }

                    ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                        .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                    // 處理Listener的返回結果
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                } else {
                    continueConsume = false;
                }
            }
        } else {
            if (this.processQueue.isDropped()) {
                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
        }
    }
}

​ 可以看出我們開始會呼叫我們實現的MessageListener對拉取到的訊息進行消費,消費完成之後我們會拿到消費結果,並對消費結果進行處理。

消費結果處理(COMMIT ROLLBACK)

​ 我們直接跟進消費結果處理程式碼:

public boolean processConsumeResult(
        final List<MessageExt> msgs,
        final ConsumeOrderlyStatus status,
        final ConsumeOrderlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        boolean continueConsume = true;
        long commitOffset = -1L;
        if (context.isAutoCommit()) {   // 自動提交的情況下
            switch (status) {
                case COMMIT:
                case ROLLBACK:
                    log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
                        consumeRequest.getMessageQueue());
                case SUCCESS:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    } else {
                        commitOffset = consumeRequest.getProcessQueue().commit();
                    }
                    break;
                default:
                    break;
            }
        } else {
            switch (status) {   // 非自動提交,需區別對待返回的處理結果
                case SUCCESS:
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case COMMIT:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    break;
                case ROLLBACK:
                    consumeRequest.getProcessQueue().rollback();
                    this.submitConsumeRequestLater(
                        consumeRequest.getProcessQueue(),
                        consumeRequest.getMessageQueue(),
                        context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    }
                    break;
                default:
                    break;
            }
        }

        if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
        }

        return continueConsume;
    }

​ 因為我們例子中寫的是非自動提交,我們就來看看非自動提交下ROLLBACK和COMMIT的具體實現(對應ProcessQueue的相關方法):

public void rollback() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            /**
                 *  當消費到KEY2的時候,因為num=3所以進入rollback方法
                 *  此時:
                 *      this.msgTreeMap包含所有未消費的訊息 此時有 KEY3 --- KEY49 
                 *      this.consumingMsgOrderlyTreeMap 有所有按順序消費過的訊息 KEY0 --- KEY2
                 *  不難看出一旦執行rollback,不僅僅是將當前消費的訊息重新放入訊息佇列供再次消費,前面已經處理的訊息
                 *  將都會重新放入訊息佇列供再次消費。也就能解釋前面所出現的為什麼自動提交設定為false之後,訊息重複消費
                 */
            this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
            this.consumingMsgOrderlyTreeMap.clear();
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("rollback exception", e);
    }
}

public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            // 獲取已順序消費訊息佇列中最後一個訊息的偏移值
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            // 原佇列訊息個數減去已順序消費但未提交的訊息個數為剩下可繼續消費的訊息個數
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            // 佇列訊息總長度減去待提交的佇列訊息總長度
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            // 將已消費未提交的佇列列表清空
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}

​ 至此,整個簡單的消費流程分析完成。

消費流程原始碼分析總結

  • Pull OR Push:即使是Push模式的Consumer,其最終實現還是是通過Pull的方式來進行的
  • Netty:叢集模式的遠端訊息獲取是通過Netty來實現的

總結

​ RocketMQ的常用三種訊息生產消費模式到現在我們就基本分析完了。個人認為順序訊息消費給需要順序執行的流程非同步實現提供了強有力的支援。這一點特別適用於阿里當前的相關領域。當然RocketMQ也不是盡善盡美的,我個人在測試的時候發現順序訊息消費的效能不算特別高,當然具體什麼原因只有留到後續分析了。還有,因為這個專案開始是阿里內部研發的,可能原始碼註釋上相比於其他開源專案還是要少一些,也沒有那麼清楚。以至於consumer.setConsumeFromWhere這個的不同設值的具體區別在哪我還沒有探究出來(想想Spring的事務隔離級別以及傳遞特性相關常量的註釋基本一看就懂了),限於篇幅還有我趕緊趕去上班,就不再繼續深究了(後面繼續)。

參考連結