1. 程式人生 > >RocketMQ原理學習--消費者消費訊息

RocketMQ原理學習--消費者消費訊息

        在之前的一篇部落格《RocketMQ原理學習--訊息型別》中我們有介紹過RocketMQ的訊息型別,這篇部落格我們簡單介紹一下RocketMQ消費者是如何消費訊息的。

一、Pull or Push

        簡單來說RocketMQ給我們提供了兩種訊息消費方式,Pull模式和Push模式,簡單理解我們可能會認為Pull模式是消費者主動去拉取訊息,Push模式是RocketMQ的Broker主動將訊息推送過來,其實RocketMQ對於這兩種方式都是採用的Pull拉取的方式,Push模式不過是通過回撥來實現的,讓我們理解為推送模式

1、Push示例:

提供一個MessageListenerConcurrently監聽器,當存在訊息時會回撥這個類的consumeMessage方法。

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
        consumer.setNamesrvAddr("localhost:9876");
        //consumer.setInstanceName("rmq-instance2");
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

2、Pull模式:

維護MessageQueue和訊息offset不斷的去拉取訊息。

public class PullConsumer {


    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>();

    public static void main(String [] args) throws Exception{


        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rmq-group");

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicA-test");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }


}

二、執行流程

1、Push模式:

對於Push模式,RocketMQ提供了PullMessageService執行緒,定時不斷的從RocketMQ中拉取訊息,最終來回調MessageListener的consumeMessage方法來消費訊息,如下執行流程圖: