1. 程式人生 > >分散式訊息佇列RocketMQ原始碼分析之3 -- Consumer負載均衡機制 -- Rebalance

分散式訊息佇列RocketMQ原始碼分析之3 -- Consumer負載均衡機制 -- Rebalance

同Kafka一樣,RocketMQ也需要探討一個問題:如何把一個topic的多個queue分攤給不同的consumer,也就是負載均衡問題。

有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。
或掃描如下二維碼:
這裡寫圖片描述

在討論這個問題之前,我們先看一下Client的整體架構。

Producer與Consumer類體系

從下圖可以看出以下幾點:
(1)Producer與Consumer的共同邏輯,封裝在MQClientInstance,MQClientAPIImpl, MQAdminImpl這3個藍色的類裡面。所謂共同的邏輯,比如定期更新NameServer地址列表,定期更新TopicRoute,傳送網路請求等。

(2)Consumer有2種,Pull和Push。下面會詳細講述這2者的區別。

這裡寫圖片描述

Consumer Group的Clustering模式與Pub/Sub模式

預設的,RocketMQ和Kafka採用的是同樣的策略:同1個Consumer Group的多個Consumer,是分攤,也就是負載均衡;多個Consumer Group之間,是Pub/Sub模式。

但RocketMQ對此還做了擴充套件,允許同1個Consumer Group內部,也可以是廣播模式。具體到程式碼裡面,就是:

 */
public enum MessageModel {
    /**
     * broadcast
     */
BROADCASTING("BROADCASTING"), /** * clustering */ CLUSTERING("CLUSTERING"); //預設取值 ... }

預設的,Consumer的MessageModel就是CLUSTERING模式,也就是同1個Consumer Group內部,多個Consumer分攤同1個topic的多個queue,也就是負載均衡。

如果你把MessageModel改成BROADCAST模式,那同1個Consumer Group內部也變成了廣播,此時ConsumerGroup其實就沒有區分的意義了。此時,不管是1個Consumer Group,還是多個Consumer Group,對同1個topic的訊息,都變成了廣播。

Pull Consumer 與 Push Consumer

Push的負載均衡

下面我們先看一下pull和push的最基本用法:

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); //指定Consumer Group

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); //獲取一個topic的所有MessageQueue

        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: " + mq + "%n");
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);  //遍歷所有queue,挨個呼叫pull

                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");  //指定Consumer Group
        consumer.subscribe("Jodie_topic_1023", "*"); //指定要消費的topic
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {  //該topic的任何一個queue有新訊息,該回調回被呼叫
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

從上面的程式碼我們可以看出,pull和push用法上的基本差別就是:pull是客戶端主動去拉取訊息,push是註冊了一個回撥,當有新訊息,該回調被呼叫。

但這還不是2者的最大區別,最大區別是:在pull裡面,所有MessageQueue是向我們暴露的,我們需要自己去手動遍歷所有的queue;而在push裡面,我們只指定了訂閱的topic,而MessageQueue是向我們隱藏的,在其內部做了“負載均衡”。

而上面的pull的程式碼,我們手動遍歷了所有的queue,沒有負載均衡!!!

那對於Pull模式,如何做負載均衡呢??

Pull的負載均衡

在MQPullConsumer這個類裡面,有一個MessageQueueListener,它的目的就是當queue發生變化的時候,通知Consumer。也正是這個藉口,幫助我們在Pull模式裡面,實現負載均衡。

注意,這個介面在MQPushConsumer裡面是沒有的,那裡面有的是上面程式碼裡的MessageListener。

 void registerMessageQueueListener(final String topic, final MessageQueueListener listener);

public interface MessageQueueListener {
    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                             final Set<MessageQueue> mqDivided);
}

有了這個Listener,我們就可以動態的知道當前的Consumer分攤到了幾個MessageQueue。然後對這些MessageQueue,我們可以開個執行緒池來消費。

MQPullConsumerScheduleService

幸運的是,RocketMQ給我們提供了一個工具類,MQPullConsumerScheduleService,幫助我們在pull模式下,實現負載均衡。

類似Push模式,在這個程式碼裡面,我們也只指定了topic,而不像上面簡陋的pull版本,要自己遍歷所有的messageQueue。其內部幫我們做了負載均衡。

其使用程式碼如下:


    public static void main(String[] args) throws MQClientException {
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                try {

                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());


                    context.setPullNextDelayTimeMillis(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }