1. 程式人生 > >RocketMQ——Consumer篇:PULL模式下的訊息消費(DefaultMQPullConsumer)

RocketMQ——Consumer篇:PULL模式下的訊息消費(DefaultMQPullConsumer)

1 應用層的使用方式

在應用層初始化DefaultMQPullConsumer類,然後呼叫該類的start方法啟動Consumer;接下來的消費步驟如下:

1、呼叫DefaultMQPullConsumer.fetchSubscribeMessageQueues(String topic)方法,根據topic獲取對應的MessageQueue(即可被訂閱的佇列),在該方法中最終通過呼叫MQAdminImpl.fetchSubscribeMessageQueues(String topic)方法從NameServer獲取該topic的MessageQueue,大致邏輯如下:

1.1)呼叫MQClientAPIImpl.getTopicRouteInfoFromNameServer(String topic, long timeoutMillis)方法,其中timeoutMillis=3000,該方法向NameServer傳送GET_ROUTEINTO_BY_TOPIC請求碼獲取topic引數對應的Broker資訊和topic配置資訊,即TopicRouteData物件;

1.2)遍歷TopicRouteData物件的QueueData列表中每個QueueData物件,首先判斷該QueueData物件是否具有讀許可權,若有則根據該QueueData物件的readQueueNums值,建立readQueueNums個MessageQueue物件,並構成MessageQueue集合;最後返回給MessageQueue集合;

1.3)若上一步構建的MessageQueue集合不為空,則返回給該集合,否則拋MQClientException異常;

2、呼叫 DefaultMQPullConsumer.pullBlockIfNotFound( MessageQueue

mq, String subExpression, long offset, int maxNums)方法獲取該 MessageQueue佇列下面從offset位置開始的訊息內容,其中maxNums=32即表示獲取的最大訊息個數,offset為該MessageQueue物件的開始消費位置,可以呼叫DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法獲取該MessageQueue佇列的消費進度來設定引數offset值(詳見5.6.2小節);該方法最終呼叫DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法,該方法的大致邏輯如下:

2.1)檢查MessageQueue物件的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>變數中,若不在則以consumerGroup、topic、subExpression為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法構造SubscriptionData物件儲存到RebalanceImpl.subscriptionInner變數中,其中 subExpression="*"

2.2)構建訊息的標誌位sysFlag,其中suspend和subscription為true(即該標記位的第2/3位為1),其他commit和classFilter兩位為false(第1/4位為0);

2.3)以請求引數subExpression以及consumerGroup、topic為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法構造SubscriptionData物件並返回;

2.4)呼叫PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法從Broker拉取訊息內容;

2.5)呼叫PullAPIWrapper.processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData)方法對拉取訊息的響應結果進行處理,主要是訊息反序列化;(詳細邏輯見 5.5.2小節—內部匿名類PullCallback的onSuccess方法部分)

3、在本地建立offseTable:Map<MessageQueue, Long>變數,根據上一步返回的結果,將該MessageQueue的下一次消費開始位置記錄下來;

2 獲取佇列的消費進度(fetchConsumeOffset)

呼叫DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法獲取MessageQueue佇列的消費進度,其中fromStore為true表示從儲存端(即Broker端)獲取消費進度;若fromStore為false表示從本地記憶體獲取消費進度;

1、對於從儲存端獲取消費進度(即fromStore=true)的情況:

1.1)對於LocalFileOffsetStore物件,從本地載入offsets.json檔案,然後獲取該MessageQueue物件的offset值;

1.2)對於RemoteBrokerOffsetStore物件,獲取邏輯如下:

A)以MessageQueue物件的brokername從MQClientInstance. brokerAddrTable中獲取Broker的地址;若沒有獲取到則立即呼叫updateTopicRouteInfoFromNameServer方法然後再次獲取;

B)構造QueryConsumerOffsetRequestHeader物件,其中包括topic、consumerGroup、queueId;然後呼叫MQClientAPIImpl.queryConsumerOffset (String addr, QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis)方法向Broker傳送QUERY_CONSUMER_OFFSET請求碼,獲取消費進度Offset;

C)用上一步從Broker獲取的offset更新本地記憶體的消費進度列表資料RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue, AtomicLong>變數值;

D)返回該offset值;

2、對於從本地記憶體獲取消費進度(即fromStore=false)的情況:

對於LocalFileOffsetStore或者RemoteBrokerOffsetStore物件,均是以MessageQueue物件作為key值從各自物件的offsetTable變數中獲取相應的消費進度。