1. 程式人生 > >RocketMQ學習筆記四之【DefaultMQPullConsumer使用與流程簡單分析】

RocketMQ學習筆記四之【DefaultMQPullConsumer使用與流程簡單分析】

我們首先看下DefaultMQPullConsumer使用例子:

package com.swk.springboot.rocketmq;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

public class MQPullConsumer {

	private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
	
	public static void main(String[] args) throws MQClientException {
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
		consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
		consumer.start();
		// 從指定topic中拉取所有訊息佇列
		Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
		for(MessageQueue mq:mqs){
			try {
				// 獲取訊息的offset,指定從store中獲取
				long offset = consumer.fetchConsumeOffset(mq,true);
				System.out.println("consumer from the queue:"+mq+":"+offset);
				while(true){
					PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, 
							getMessageQueueOffset(mq), 32);
					putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
					switch(pullResult.getPullStatus()){
					case FOUND:
						List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.println(new String(m.getBody()));
                        }
						break;
					case NO_MATCHED_MSG:
						break;
					case NO_NEW_MSG:
						break;
					case OFFSET_ILLEGAL:
						break;
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		consumer.shutdown();
	}

	// 儲存上次消費的訊息下標
	private static void putMessageQueueOffset(MessageQueue mq,
			long nextBeginOffset) {
		OFFSE_TABLE.put(mq, nextBeginOffset);
	}
	
	// 獲取上次消費的訊息的下標
	private static Long getMessageQueueOffset(MessageQueue mq) {
		Long offset = OFFSE_TABLE.get(mq);
		if(offset != null){
			return offset;
		}
		return 0l;
	}
	

}

根據上面的例子我們來分析下DefaultMQPullConsumer拉取訊息的流程(Consumer的啟動和關閉流程我們會放在後面的章節介紹)

1、【consumer.fetchSubscribeMessageQueues("order-topic")】

// 從指定topic中拉取所有訊息佇列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");

該方法最終是呼叫

 public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            // 1、向NameServer傳送GET_ROUTEINTO_BY_TOPIC請求碼獲取topic引數對應的Broker資訊和topic配置資訊,即TopicRouteData物件
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {
                // 2、遍歷topicRouteData
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }

        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }

繼續看下第二步的遍歷過程

/**
遍歷TopicRouteData物件的QueueData列表中每個QueueData物件,首先判斷該QueueData物件是否具有讀許可權,
若有則根據該QueueData物件的readQueueNums值,建立readQueueNums個MessageQueue物件,並構成MessageQueue集合;
最後返回給MessageQueue集合
**/
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }

        return mqList;
    }

2、【consumer.pullBlockIfNotFound】

/**
通過該方法獲取該MessageQueue佇列下面從offset位置開始的訊息內容,其中maxNums=32即表示獲取的最大訊息個數,offset為該MessageQueue物件的開始消費位置,可以呼叫DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法獲取該MessageQueue佇列的消費進度來設定引數offset值該方法最終呼叫DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法
**/
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }

下面我來看下【DefaultMQPullConsumerImpl.pullSyncImpl】的實現過程

/**

**/
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();

        if (null == mq) {
            throw new MQClientException("mq is null", null);

        }

        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }

        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
        // 檢查MessageQueue物件的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>變數中,若不在則以consumerGroup、topic、subExpression為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法構造SubscriptionData物件儲存到RebalanceImpl.subscriptionInner變數中,其中 subExpression="*" 
        this.subscriptionAutomatically(mq.getTopic());
        // 構建標誌位,邏輯或運算|=
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

        SubscriptionData subscriptionData;
        try {
            //以請求引數subExpression以及consumerGroup、topic為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法構造SubscriptionData物件並返回
            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                mq.getTopic(), subExpression);
        } catch (Exception e) {
            throw new MQClientException("parse subscription error", e);
        }

        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
        // 從broker中拉取訊息
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            0L,
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );
        // 對拉取到的訊息進行解碼,過濾並執行回撥,並把解析的message列表放到MsgFoundList中
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }

最近公司要使用springboot,接下來一段時間暫時更新recketmq,先把springboot惡補下