RocketMQ——Consumer篇:PULL模式下的訊息消費(DefaultMQPullConsumer)
1 應用層的使用方式
在應用層初始化DefaultMQPullConsumer類,然後呼叫該類的start方法啟動Consumer;接下來的消費步驟如下:
1、呼叫DefaultMQPullConsumer.fetchSubscribeMessageQueues(String topic)方法,根據topic獲取對應的MessageQueue(即可被訂閱的佇列),在該方法中最終通過呼叫MQAdminImpl.fetchSubscribeMessageQueues(String topic)方法從NameServer獲取該topic的MessageQueue,大致邏輯如下:
1.1)呼叫MQClientAPIImpl.getTopicRouteInfoFromNameServer(String topic, long timeoutMillis)方法,其中timeoutMillis=3000,該方法向NameServer傳送GET_ROUTEINTO_BY_TOPIC請求碼獲取topic引數對應的Broker資訊和topic配置資訊,即TopicRouteData物件;
1.2)遍歷TopicRouteData物件的QueueData列表中每個QueueData物件,首先判斷該QueueData物件是否具有讀許可權,若有則根據該QueueData物件的readQueueNums值,建立readQueueNums個MessageQueue物件,並構成MessageQueue集合;最後返回給MessageQueue集合;
1.3)若上一步構建的MessageQueue集合不為空,則返回給該集合,否則拋MQClientException異常;
2、呼叫 DefaultMQPullConsumer.pullBlockIfNotFound( MessageQueue
2.1)檢查MessageQueue物件的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>變數中,若不在則以consumerGroup、topic、subExpression為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法構造SubscriptionData物件儲存到RebalanceImpl.subscriptionInner變數中,其中 subExpression="*" ;
2.2)構建訊息的標誌位sysFlag,其中suspend和subscription為true(即該標記位的第2/3位為1),其他commit和classFilter兩位為false(第1/4位為0);
2.3)以請求引數subExpression以及consumerGroup、topic為引數呼叫FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法構造SubscriptionData物件並返回;
2.4)呼叫PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法從Broker拉取訊息內容;
2.5)呼叫PullAPIWrapper.processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData)方法對拉取訊息的響應結果進行處理,主要是訊息反序列化;(詳細邏輯見 5.5.2小節—內部匿名類PullCallback的onSuccess方法部分)
3、在本地建立offseTable:Map<MessageQueue, Long>變數,根據上一步返回的結果,將該MessageQueue的下一次消費開始位置記錄下來;
2 獲取佇列的消費進度(fetchConsumeOffset)
呼叫DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法獲取MessageQueue佇列的消費進度,其中fromStore為true表示從儲存端(即Broker端)獲取消費進度;若fromStore為false表示從本地記憶體獲取消費進度;
1、對於從儲存端獲取消費進度(即fromStore=true)的情況:
1.1)對於LocalFileOffsetStore物件,從本地載入offsets.json檔案,然後獲取該MessageQueue物件的offset值;
1.2)對於RemoteBrokerOffsetStore物件,獲取邏輯如下:
A)以MessageQueue物件的brokername從MQClientInstance. brokerAddrTable中獲取Broker的地址;若沒有獲取到則立即呼叫updateTopicRouteInfoFromNameServer方法然後再次獲取;
B)構造QueryConsumerOffsetRequestHeader物件,其中包括topic、consumerGroup、queueId;然後呼叫MQClientAPIImpl.queryConsumerOffset (String addr, QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis)方法向Broker傳送QUERY_CONSUMER_OFFSET請求碼,獲取消費進度Offset;
C)用上一步從Broker獲取的offset更新本地記憶體的消費進度列表資料RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue, AtomicLong>變數值;
D)返回該offset值;
2、對於從本地記憶體獲取消費進度(即fromStore=false)的情況:
對於LocalFileOffsetStore或者RemoteBrokerOffsetStore物件,均是以MessageQueue物件作為key值從各自物件的offsetTable變數中獲取相應的消費進度。