1. 程式人生 > >RocketMQ——Consumer篇:PUSH模式下消費訊息(順序和併發兩種)

RocketMQ——Consumer篇:PUSH模式下消費訊息(順序和併發兩種)

1 接受並處理Broker返回的響應訊息

當傳送拉取訊息在Broker返回響應訊息之後呼叫NettyRemotingAbstract.processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg)方法,大致邏輯如下:

1、根據返回的響應物件RemotingCommand的opaque(請求序列號)從NettyRemotingAbstract.responseTable: ConcurrentHashMap<Integer /* opaque */, ResponseFuture>變數中獲取ResponseFuture物件(在傳送之前存入的該物件);

2、若該ResponseFuture物件為null,則啥也不幹,就打警告日誌;

3、若該ResponseFuture物件不為null,則將響應物件RemotingCommand賦值給ResponseFuture.responseCommand變數;

4、若ResponseFuture.invokeCallback:InvokeCallback變數不為空(在非同步傳送的情況下該變數不為空),則首先獲取NettyRemotingClient.publicExecutor執行緒池,若存在該執行緒池則初始化Runnable匿名執行緒,將該匿名執行緒提交到執行緒池中;該匿名執行緒的run方法主要是呼叫ResponseFuture.executeInvokeCallback()方法;若沒有該執行緒池則直接在主執行緒中呼叫ResponseFuture.executeInvokeCallback()方法;在executeInvokeCallback方法中,先確保ResponseFuture.executeCallbackOnlyOnce的值為false並且成功更新為true,則再執行InvokeCallback.operationComplete(ResponseFuture)方法,在該方法內部呼叫回撥類PullCallback物件的onSuccess方法;由於executeCallbackOnlyOnce在初始化時為false,若更新失敗說明該回調方法已經執行過了,故不在執行。

5、若ResponseFuture.invokeCallback:InvokeCallback變數為空(在同步方式拉取訊息的情況下),則呼叫ResponseFuture.putResponse (RemotingCommand responseCommand)方法首先將響應物件RemotingCommand賦值給ResponseFuture.responseCommand變數,然後喚醒ResponseFuture.waitResponse方法的等待;

2 消費訊息的回撥類(PullCallback)

該PullCallback類是DefaultMQPushConsumerImpl.pullMessage (PullRequest pullRequest)方法中的匿名內部類,採用非同步方式拉取訊息時,在收到Broker的響應訊息之後,回撥該方法執行業務呼叫者的回撥邏輯。

一、 onSucess方法

在收到響應訊息之後,先回調InvokeCallback匿名類的operationComplete (ResponseFuture responseFuture)方法,在正常情況下會回撥PullCallback類的onSucess方法,大致邏輯如下:

1、呼叫PullAPIWrapper.processPullResult(MessageQueue mq,PullResult

pullResult, SubscriptionData subscriptionData)方法處理拉取訊息的返回物件PullResult,大致邏輯如下:

1.1)呼叫PullAPIWrapper.updatePullFromWhichNode(MessageQueue mq, long brokerId)方法用Broker返回的PullResultExt.suggestWhichBrokerId變數值更新PullAPIWrapper.pullFromWhichNodeTable:ConcurrentHashMap <MessageQueue,AtomicLong/* brokerId */>變數中當前拉取訊息PullRequest.messageQueue物件對應的BrokerId。若以messageQueue為key值從pullFromWhichNodeTable中獲取的BrokerId為空則將PullResultExt.suggestWhichBrokerId存入該列表中,否則更新該MessageQueue對應的value值為suggestWhichBrokerId;

1.2)若pullResult.status=FOUND,則繼續下面的處理邏輯,否則設定PullResultExt.messageBinary=null並返回該PullResult物件;

1.3)對PullResultExt.messageBinary變數進行解碼,得到MessageExt列表;

1.4) Consumer端訊息過濾。若SubscriptionData.tagsSet集合(在5.5.1小節中拉取訊息之前以topic獲取的訂閱關係資料)不為空並且SubscriptionData. classFilterMode為false(在初始化DefaultMQPushConsumer時可以設定這兩個值),則遍歷MessageExt列表,檢查每個MessageExt物件的tags值(在commitlog資料的properties欄位的"TAGS"屬性值)是否在SubscriptionData.tagsSet集合中,只保留MessageExt.tags此tagsSet集合中的MessageExt物件,構成新的MessageExt列表,取名msgListFilterAgain;否則新的列表msgListFilterAgain等於上一步的MessageExt列表;

Consumer收到過濾後的訊息後,同樣也要執行在Broker端的操作,但是比對的是真實的Message Tag字串,而不是hashCode。因為在Broker端為了節約空間,過濾規則是儲存的HashCode,為了避免Hash衝突而受到錯誤訊息,在Consumer端還進行一次具體過濾規則的過濾,進行過濾修正。

1.5)檢查PullAPIWrapper.filterMessageHookList列表是否為空(可在應用層通過DefaultMQPullConsumerImpl.registerFilterMessageHook (FilterMessageHook hook)方法設定),若不為空則呼叫該列表中的每個FilterMessageHook物件的filterMessage方法;由應用層實現FilterMessageHook介面的filterMessage方法,可以在該方法中對訊息再次過濾;

1.6)向NameServer傳送GET_KV_CONFIG請求碼獲取NAMESPACE_PROJECT_CONFIG和本地IP下面的value值,賦值給projectGroupPrefix變數。若該值為空則將PullResult.minOffset和PullResult.maxOffset值設定到每個MessageExt物件的properties屬性中,其中屬性名稱分別為"MIN_OFFSET"和"MAX_OFFSET";若不為空則除了將PullResult.minOffset和PullResult.maxOffset值設定到每個MessageExt物件的properties屬性中之外,還從projectGroupPrefix變數值開頭的topic中去掉projectGroupPrefix值部分,然後將新的topic設定到MessageQueue、SubscriptionData的topic以及每個MessageExt物件的topic變數;

1.7)將新組建的MessageExt列表msgListFilterAgain賦值給PullResult.msgFoundList變數;

1.8)設定PullResultExt.messageBinary=null,並返回該PullResult物件;

2、下面根據PullResult.status變數的值執行不同的業務邏輯,若PullResult.status=FOUND,大致邏輯如下:

2.1)該PullRequest物件的nextOffset變數值表示本次消費的開始偏移量,賦值給臨時變數prevRequestOffset;

2.2)取PullResult.nextBeginOffset的值(Broker返回的下一次消費進度的偏移值)賦值給PullRequest.nextOffset變數值;

2.3)若PullResult.MsgFoundList列表為空,則呼叫DefaultMQPushConsumerImpl.executePullRequestImmediately(PullRequest pullRequest)方法將該拉取請求物件PullRequest重新延遲放入PullMessageService執行緒的pullRequestQueue佇列中,然後跳出該onSucess方法;否則繼續下面的邏輯;

2.4)呼叫該PullRequest.ProcessQueue物件的putMessage(List<MessageExt> msgs)方法,將MessageExt列表存入ProcessQueue.msgTreeMap:TreeMap<Long, MessageExt>變數中,放入此變數的目的是:第一在順序消費時從該變數列表中取訊息進行消費,第二可以用此變數中的訊息做流控;大致邏輯如下:

A)遍歷List<MessageExt>列表,以每個MessageExt物件的queueOffset值為key值,將MessageExt物件存入msgTreeMap:TreeMap<Long, MessageExt>變數中;該變數型別根據key值大小排序;

B)更新ProcessQueue.msgCount變數,記錄訊息個數;

C)經過第A步處理之後,若msgTreeMap變數不是空並且ProcessQueue.consuming為false(初始化為false)則置consuming為true(在該msgTreeMap變數消費完之後再置為false)、置臨時變數dispatchToConsume為true;否則置臨時變數dispatchToConsume為false表示沒有待消費的訊息或者msgTreeMap變數中存入了資料還未消費完,在沒有消費完之前不允許在此提交消費請求,在消費完msgTreeMap之後置consuming為false;

D)取List<MessageExt>列表的最後一個MessageExt物件,該物件的properties屬性中取MAX_OFFSET的值,減去該MessageExt物件的queueOffset值,即為Broker端該topic和queueId下訊息佇列中未消費的邏輯佇列的大小,將該值存入ProcessQueue.msgAccCnt變數,用於MQClientInstance. adjustThreadPool()方法動態調整執行緒池大小(在MQClientInstance中啟動定時任務定時調整執行緒池大小);

E)返回臨時變數dispatchToConsume值;

2.5)呼叫ConsumeMessageService.submitConsumeRequest(List <MessageExt> msgs,ProcessQueue processQueue,MessageQueue messageQueue, boolean dispathToConsume)方法,其中dispathToConsume的值由上一步所得,在順序消費時使用,為true表示可以消費;大致邏輯如下:

A)若是 順序消費 ,則呼叫ConsumeMessageOrderlyService. submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume)方法;在該方法中,若上次的msgTreeMap變數中的資料還未消費完(即在2.4步中返回dispathToConsume=false)則不執行任何邏輯;若dispathToConsume=true(即上一次已經消費完了)則以ProcessQueue和MessageQueue物件為引數初始化ConsumeMessageOrderlyService類的內部執行緒類ConsumeRequest;然後將該執行緒類放入ConsumeMessageOrderlyService.consumeExecutor執行緒池中。從而可以看出順序消費是從ProcessQueue物件的TreeMap樹形列表中取訊息的。

B)若是 併發消費 ,則呼叫ConsumeMessageConcurrentlyService.submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispatchToConsume)方法;在該方法中,則根據批次中最大資訊條數(由DefaultMQPushConsumer.consumeMessageBatchMaxSize設定,預設為1)來決定是否分提交到執行緒池中,大致邏輯為:首先比較List<MessageExt>列表的個數是否大於了批處理的最大條數,若沒有則以該List<MessageExt>佇列、ProcessQueue物件、MessageQueue物件初始化ConsumeMessageConcurrentlyService的內部類 ConsumeRequest 的物件,並放入ConsumeMessageConcurrentlyService.consumeExecutor執行緒池中;否則遍歷List<MessageExt>列表,每次從List<MessageExt>列表中取consumeMessageBatchMaxSize個MessageExt物件構成新的List<MessageExt>列表,然後以新的MessageExt佇列、ProcessQueue物件、MessageQueue物件初始化ConsumeMessageConcurrentlyService的內部類 ConsumeRequest 的物件並放入ConsumeMessageConcurrentlyService.consumeExecutor執行緒池中,直到該佇列遍歷完為止。從而可以看出併發消費是將從Broker獲取的MessageExt訊息列表分配到各個 ConsumeRequest 執行緒中進行併發消費。

2.6)檢查拉取訊息的間隔時間(DefaultMQPushConsumer.pullInterval,預設為0),若大於0,則呼叫DefaultMQPushConsumerImpl. executePullRequestLater方法,在間隔時間之後再將PullRequest物件放入PullMessageService執行緒的pullRequestQueue佇列中;若等於0(表示立即再次進行拉取訊息),則呼叫DefaultMQPushConsumerImpl. executePullRequestImmediately方法立即繼續下一次拉取訊息,從而形成一個迴圈不間斷地拉取訊息的過程;

3、若PullResult.status=NO_NEW_MSG或者NO_MATCHED_MSG時:

3.1)取PullResult.nextBeginOffset的值(Broker返回的下一次消費進度的偏移值)賦值給PullRequest.nextOffset變數值;

3.2)更新消費進度offset。呼叫DefaultMQPushConsumerImpl.correctTagsOffset(PullRequest pullRequest)方法。若沒有獲取到訊息(即ProcessQueue.msgCount等於0)則更新訊息進度。對於LocalFileOffsetStore或RemoteBrokerOffsetStore類,均呼叫updateOffset(MessageQueue mq, long offset, boolean increaseOnly)方法,而且方法邏輯是一樣的,以MessageQueue物件為key值從offsetTable:ConcurrentHashMap<MessageQueue, AtomicLong>變數中獲取values值,若該values值為空,則將MessageQueue物件以及offset值(在3.1步中獲取的PullResult.nextBeginOffset值)存入offsetTable變數中,若不為空,則比較已經存在的值,若大於已存在的值才更新;

3.3)呼叫DefaultMQPushConsumerImpl.executePullRequestImmediately方法立即繼續下一次拉取;

4、若PullResult.status=OFFSET_ILLEGAL

4.1)取PullResult.nextBeginOffset的值(Broker返回的下一次消費進度的偏移值)賦值給PullRequest.nextOffset變數值;

4.2)設定PullRequest.processQueue.dropped等於true,將此該拉取請求作廢;

4.3)建立一個匿名Runnable執行緒類,然後呼叫DefaultMQPushConsumerImpl.executeTaskLater(Runnable r, long timeDelay)方法將該執行緒類放入PullMessageService.scheduledExecutorService: ScheduledExecutorService排程執行緒池中,在10秒鐘之後執行該匿名執行緒類;該匿名執行緒類的run方法邏輯如下:

A)呼叫OffsetStore.updateOffset(MessageQueue mq, long offset, boolean increaseOnly)方法更新更新消費進度offset;

B)呼叫OffsetStore.persist(MessageQueue mq)方法:對於廣播模式下offsetStore初始化為LocalFileOffsetStore物件,該物件的persist方法沒有處理邏輯;對於叢集模式下offsetStore初始化為RemoteBrokerOffsetStore物件,該物件的persist方法中,首先以入參MessageQueue物件為key值從RemoteBrokerOffsetStore.offsetTable: ConcurrentHashMap<MessageQueue, AtomicLong>變數中獲取偏移量offset值,然後呼叫updateConsumeOffsetToBroker(MessageQueue mq, long offset)方法向Broker傳送UPDATE_CONSUMER_OFFSET請求碼的消費進度資訊;
C)以PullRequest物件的messageQueue變數為引數呼叫RebalanceImpl.removeProcessQueue(MessageQueue mq)方法,在該方法中,首先從RebalanceImpl.processQueueTable: ConcurrentHashMap<MessageQueue, ProcessQueue>變數中刪除MessageQueue記錄並返回對應的ProcessQueue物件;然後該ProcessQueue物件的dropped變數設定為ture;最後以MessageQueue物件和ProcessQueue物件為引數呼叫removeUnnecessaryMessageQueue方法刪除未使用的訊息佇列的消費進度,具體邏輯詳見5.3.4小節;

二、 onException方法

在傳送訊息失敗或者等待響應超時或者其他異常時回撥該onException方法。在該方法中,呼叫DefaultMQPushConsumerImpl.executePullRequestLater方法,3秒鐘之後再將該PullRequest請求重新放入PullMessageService執行緒的pullRequestQueue佇列中;

3 順序消費(ConsumeMessageOrderlyService)

對於順序消費的三把鎖:1)首先在ConsumeMessageOrderlyService類中定義了定時任務每隔20秒執行一次lockMQPeriodically()方法,獲取該Consumer端在Broker端鎖住的MessageQueue集合(即分散式鎖),並將RebalanceImpl.processQueueTable:ConcurrentHashMap<MessageQueue, ProcessQueue>集合中獲得分散式鎖的MessageQueue物件(消費佇列)對應的ProcessQueue物件(消費處理佇列)加上本地鎖(即該物件的lock等於ture)以及加鎖的時間,目的是為了在消費時在本地檢查消費佇列是否鎖住;2)在進行訊息佇列的消費過程中,對MessageQueue物件進行本地同步鎖,保證同一時間只允許一個執行緒訊息一個ConsumeQueue佇列;3)在回撥業務層定義的ConsumeMessageOrderlyService.messageListener:MessageListenerOrderly類的consumeMessage方法之前獲取ProcessQueue.lockConsume:ReentrantLock變數的鎖即消費處理佇列的鎖,該鎖的粒度比訊息佇列的同步鎖粒度更小,該鎖的目的是保證在消費的過程中不會被解鎖。

3.1 回撥業務層定義的消費方法

在回撥DefaultMQPushConsumerImpl.pullMessage方法中的內部類PullCallback.onSucess方法時,呼叫ConsumeMessageOrderlyService. submitConsumeRequest方法提交消費請求ConsumeRequest物件,該消費請求就是在ConsumeMessageOrderlyService類的內部定義了ConsumeRequest執行緒類,將此物件提交到ConsumeMessageOrderlyService類的執行緒池中,由該執行緒完成回撥業務層定義的消費方法,該內部執行緒類的run方法邏輯如下:

1、檢查ProcessQueue.dropped是否為true,若不是則直接返回;

2、對MessageQueue物件加鎖,保證同一時間只允許一個執行緒使用該MessageQueue物件。呼叫ConsumeMessageOrderlyService.messageQueueLock. fetchLockObject(MessageQueue mq)獲取鎖物件。下面的處理邏輯均在獲得該Object物件的互斥鎖(synchronized)後進行處理;從而保證了在併發情況下一個MessageQueue物件只有一個執行緒使用,大致邏輯為:

2.1)根據MessageQueue物件從MessageQueueLock.mqLockTable: ConcurrentHashMap<MessageQueue, Object>中獲取Object物件,若獲取到Object物件不為null,則直接返回該物件;

2.2)若獲取的Object物件為null,則建立一個Object物件,並儲存到mqLockTable變數中,為了防止併發採用putIfAbsent方法存入該列表中,若已經有該MessageQueue物件,則返回已經存在的Object物件,若不為空,則返回該已存在的Object物件;

3、若訊息模式是廣播或者對應的ProcessQueue.locked等於true且鎖的時間未過期(根據獲取鎖locked的時候設定的lastLockTimestamp值來判斷),則初始化區域性變數continueConsume=true,然後無限期的迴圈執行下面的邏輯,直到區域性變數continueConsume=false為止或者跳出迴圈,便終止了該方法的執行,否則執行第4步操作;

3.1)執行for迴圈下面的邏輯,直到該continueConsume等於false或者直接跳出迴圈為止;

3.2)檢查ProcessQueue.dropped是否為true,若不是則跳出迴圈;

3.3)若訊息模式是叢集並且ProcessQueue.locked不等於true(未鎖住)或者鎖住了但是鎖已經超時,則呼叫ConsumeMessageOrderlyService.tryLockLaterAndReconsume(MessageQueue mq, ProcessQueue processQueue, long delayMills)方法,在該方法中初始化一個執行緒並將執行緒放入ConsumeMessageOrderlyService. scheduledExecutorService執行緒池中,然後跳出迴圈。該執行緒在延遲delayMills毫秒之後被執行,該執行緒的功能是獲取MessageQueue佇列的分散式鎖,然後再呼叫ConsumeMessageOrderlyService.submitConsumeRequestLater (MessageQueue mq, ProcessQueue processQueue, long delayMills)方法。

3.4)若該迴圈從開始到現在連續執行的時間已經超過的最大連續執行值(由property屬性中的"rocketmq.client.maxTimeConsumeContinuously"設定,預設為60秒),則呼叫ConsumeMessageOrderlyService. submitConsumeRequestLater(MessageQueue mq, ProcessQueue processQueue, long delayMills)方法,其中delayMills=100,表示在100毫秒之後再呼叫ConsumeMessageOrderlyService.submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume)方法,在該方法中重新建立ConsumeRequest物件,並放入ConsumeMessageOrderlyService.consumeExecutor執行緒池中重新被消費;

3.4)獲取一次批量消費的訊息個數batchSize(由引數DefaultMQPushConsumer.consumeMessageBatchMaxSize指定,預設為1),然後呼叫ProcessQueue.takeMessags(int batchSize),在該方法中,從ProcessQueue.msgTreeMap變數中獲取batchSize個數的List<MessageExt>列表;並且從msgTreeMap中刪除,存入臨時變數msgTreeMapTemp中,返回List<MessageExt>列表,若該列表為空,表示該msgTreeMap列表中的訊息已經消費完了,置ProcessQueue.consuming等於false;

3.5)檢查返回的List<MessageExt>列表是否為空,若為空,則置區域性變數continueConsume=false表示不在迴圈執行,本次訊息佇列的消費結束;否則繼續執行下面的步驟;

3.6)檢查DefaultMQPushConsumerImpl.consumeMessageHookList: ArrayList<ConsumeMessageHook>是否為空,若不是,則初始化ConsumeMessageContext物件,並呼叫ArrayList<ConsumeMessageHook>列表中每個ConsumeMessageHook物件的consumeMessageBefore (ConsumeMessageContextcontext)方法,該ArrayList<ConsumeMessageHook>列表由業務層呼叫 DefaultMQPushConsumerImpl.registerConsumeMessageHook (ConsumeMessageHook hook)方法設定;

3.7)執行業務層定義的消費訊息的業務邏輯並返回消費結果。先呼叫ProcessQueue. lockConsume:ReentrantLock變數的lock方法獲取鎖(目的是防止在消費的過程中,被其他執行緒將此消費佇列解鎖了,從而引起併發消費的問題),然後呼叫ConsumeMessageOrderlyService. messageListener:MessageListenerOrderly類的consumeMessage方法,保證同一個ProcessQueue在同一時間只能有一個執行緒呼叫consumeMessage方法,由應用層實現該MessageListenerOrderly介面的consumeMessage方法,執行完成之後呼叫呼叫ProcessQueue. lockConsume:ReentrantLock變數的unlock方法釋放鎖;

3.8)當consumeMessage方法的返回值status為空時,將結果狀態status賦值為ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

3.9)檢查DefaultMQPushConsumerImpl.consumeMessageHookList: ArrayList<ConsumeMessageHook>是否為空,若不是,則初始化ConsumeMessageContext物件,並呼叫ArrayList<ConsumeMessageHook>列表中每個ConsumeMessageHook物件的consumeMessageAfter (ConsumeMessageContextcontext)方法,該ArrayList<ConsumeMessageHook>列表由業務層呼叫DefaultMQPushConsumerImpl

.registerConsumeMessageHook(ConsumeMessageHook hook)方法設定;

3.10)處理回撥方法consumeMessage的消費結果,並將消費結果賦值給變數continueConsume。呼叫ConsumeMessageOrderlyService. processConsumeResult(List<MessageExt> msgs, ConsumeOrderlyStatus status, ConsumeOrderlyContext context, ConsumeRequest consumeRequest)方法,詳見5.7.2小節;

3.11)繼續從第3.1步開始遍歷,不間斷地從ProcessQueue物件的List<MessageExt>列表中獲取訊息物件並消費;直到消費完為止;

4、檢查ProcessQueue.dropped是否為true,若不為true則直接返回,否則呼叫ConsumeMessageOrderlyService.tryLockLaterAndReconsume (MessageQueue mq, ProcessQueue processQueue, long delayMills)方法,其中delayMills=100,即在100毫秒之後重新獲取鎖後再次進行消費;

3.2 根據消費結果進行相應處理

以回撥應用層定義的ConsumeMessageOrderlyService.consumeMessage方法的返回處理結果為引數呼叫ConsumeMessageOrderlyService. processConsumeResult(List<MessageExt> msgs, ConsumeOrderlyStatus status, ConsumeOrderlyContext context, ConsumeRequest consumeRequest) 方法。大致邏輯如下:

1、若ConsumeOrderlyContext.autoCommit為true(預設為true,可以在應用層的實現類MessageListenerOrderly的consumeMessage方法中設定該值);根據consumeMessage方法返回的不同結果執行不同的邏輯:

1.1)若status等於SUCCESS,則呼叫ProcessQueue.commit()方法,大致邏輯如下:

A)獲取ProcessQueue.lockTreeMap:ReentrantReadWriteLock鎖對該方法的整個處理邏輯加鎖;

B)從msgTreeMapTemp(在從msgTreeMap中獲取訊息並消費時存入的)中獲取最後一個元素的key值,即最大的offset;

C)將ProcessQueue.msgCount值減去臨時變數msgTreeMapTemp中的個數,即表示剩下的未消費的訊息個數;

D)清空臨時變數msgTreeMapTemp列表的值;

E)返回最大的offset+1的值並賦值給區域性變數commitOffset值用於更新消費進度之用;

F)然後呼叫ConsumerStatsManager.incConsumeOKTPS(String group, String topic, long msgs)方法進行消費統計;

1.2)若status等於SUSPEND_CURRENT_QUEUE_A_MOMENT,稍後重新消費。大致邏輯如下:

1.2.1)首先呼叫ProcessQueue.makeMessageToCosumeAgain (List<MessageExt> msgs)方法將List<MessageExt>列表的物件重新放入msgTreeMap變數中。大致邏輯如下:

A)獲取ProcessQueue.lockTreeMap:ReentrantReadWriteLock鎖對該方法的整個處理邏輯加鎖;

B)將List<MessageExt>列表的物件重新放入msgTreeMap變數中。遍歷List<MessageExt>列表的每個MessageExt物件,以該物件的queueoffset值為key值從msgTreeMapTemp中刪除對應的MessageExt物件;將MessageExt物件以MessageExt物件的queueoffset為key值重新加入到ProcessQueue.msgTreeMap變數中;

1.2.2)然後呼叫ConsumeMessageOrderlyService. submitConsumeRequestLater(ProcessQueue processQueue, MessageQueue messageQueue, long suspendTimeMillis)方法,其中suspendTimeMillis由ConsumeOrderlyContext.suspendCurrentQueueTimeMillis變數在應用層的回撥方法中設定,預設為1000,表示在1000毫秒之後呼叫ConsumeMessageOrderlyService.submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume)方法,在該方法中重新建立ConsumeRequest物件,並放入ConsumeMessageOrderlyService.consumeExecutor執行緒池中;

1.2.3)置區域性變數continueConsume=false;

1.2.4)呼叫ConsumerStatsManager.IncConsumeFailedTPS方法進行消費統計;

2、若ConsumeOrderlyContext.autoCommit為false,根據consumeMessage方法返回的不同狀態執行不同的邏輯:

2.1)若status等於SUCCESS,僅呼叫ConsumerStatsManager.incConsumeOKTPS(String group, String topic, long msgs)方法進行消費統計,並未呼叫ProcessQueue.commit()方法清理佇列資料;

2.2)若status等於COMMIT時,才呼叫ProcessQueue.commit()方法,並返回消費最大的offset值並賦值給區域性變數commitOffset值用於更新消費進度之用;

2.3)若status等於ROLLBACK,表示要重新消費,

A)首先呼叫ProcessQueue.rollback()方法,將msgTreeMapTemp變數中的內容全部重新放入msgTreeMap變數中,同時清理msgTreeMapTemp變數;

B)然後呼叫ConsumeMessageOrderlyService.submitConsumeRequestLater (ProcessQueue processQueue, MessageQueue messageQueue, long suspendTimeMillis)方法,其中suspendTimeMillis由ConsumeOrderlyContext. suspendCurrentQueueTimeMillis變數在應用層的回撥方法中設定,預設為1000,表示在1000毫秒之後呼叫ConsumeMessageOrderlyService. submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume)方法,在該方法中重新建立ConsumeRequest物件,並放入ConsumeMessageOrderlyService. consumeExecutor執行緒池中;

C)置區域性變數continueConsume=false;

2.4)若status等於SUSPEND_CURRENT_QUEUE_A_MOMENT,將msgTreeMapTemp變數中的內容全部重新放入msgTreeMap變數中,然後結束此輪消費(continueConsume=false),等待1秒之後再次提交消費,大致邏輯如下:

A)首先呼叫ProcessQueue.makeMessageToCosumeAgain(List<MessageExt> msgs)方法,詳見1.2.1步;

B)然後呼叫ConsumeMessageOrderlyService.submitConsumeRequestLater (ProcessQueue processQueue, MessageQueue messageQueue, long suspendTimeMillis)方法,其中suspendTimeMillis由ConsumeOrderlyContext. suspendCurrentQueueTimeMillis變數在應用層的回撥方法中設定,預設為1000;

C)置區域性變數continueConsume=false;

D)呼叫ConsumerStatsManager.IncConsumeFailedTPS方法進行消費統計;

3、若上面兩步得到的commitOffset值大於0,則呼叫OffsetStore.updateOffset(MessageQueue mq, long offset, boolean increaseOnly)方法更新消費進度。對於LocalFileOffsetStore或RemoteBrokerOffsetStore類,該方法邏輯是一樣的,以MessageQueue物件為key值從offsetTable: ConcurrentHashMap<MessageQueue, AtomicLong>變數中獲取values值,若該values值為空,則將MessageQueue物件以及commitOffset值存入offsetTable變數中,若不為空,則比較已經存在的值,若大於已存在的值才更新;

4、返回continueConsume值;若該值為false則結束此輪消費。

3.3 重新獲取分散式鎖後再消費(tryLockLaterAndReconsume)

在叢集模式下,若ProcessQueue未鎖或者鎖已經超時,則呼叫ConsumeMessageOrderlyService.tryLockLaterAndReconsume(MessageQueue mq, ProcessQueue processQueue, long delayMills)方法從Broker重新獲取鎖之後再進行消費。

在該方法中初始化一個Runnable匿名執行緒,並在delayMills毫秒之後再執行該匿名執行緒,該匿名執行緒的run方法邏輯如下:

1、先呼叫ConsumeMessageOrderlyService.lockOneMQ(MessageQueue mq)方法獲取MessageQueue佇列的鎖,向該MessageQueue物件的brokerName下面的主用Broker傳送LOCK_BATCH_MQ請求碼的請求訊息,請求Broker將傳送的MessageQueue物件鎖住;若該請求的MessageQueue物件在Broker返回的鎖住集合中,則鎖住成功了;

2、呼叫ConsumeMessageOrderlyService.submitConsumeRequestLater(ProcessQueue processQueue, MessageQueue messageQueue, long suspendTimeMillis)方法,若鎖住成功則suspendTimeMillis=10;若未鎖住,則suspendTimeMillis=3000。

3、在ConsumeMessageOrderlyService.submitConsumeRequestLater方法中,初始化一個匿名的Runnable執行緒類,在suspendTimeMillis毫秒之後,執行該執行緒類,在該類的run方法中呼叫ConsumeMessageOrderlyService.submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispathToConsume)方法,其中dispathToConsume=true;在該方法中根據ProcessQueue 和MessageQueue物件初始化ConsumeRequest物件,並放入ConsumeMessageOrderlyService.consumeExecutor執行緒池中;

4 併發消費(ConsumeMessageConcurrentlyService)

4.1 回撥業務層定義的消費方法

在回撥DefaultMQPushConsumerImpl.pullMessage方法中的內部類PullCallback.onSucess方法時,呼叫ConsumeMessageConcurrentlyService.submitConsumeRequest方法提交消費請求ConsumeRequest物件,該消費請求物件是在ConsumeMessageConcurrentlyService類的內部定義了ConsumeRequest執行緒類;將此物件提交到ConsumeMessageConcurrentlyService類的執行緒池中,由該執行緒完成回撥業務層定義的消費方法,該內部執行緒類的run方法邏輯如下:

1、檢查ProcessQueue.dropped是否為true,若不是則直接返回;

2、檢查DefaultMQPushConsumerImpl.consumeMessageHookList: ArrayList<ConsumeMessageHook>是否為空,若不是,則初始化ConsumeMessageContext物件,並呼叫ArrayList<ConsumeMessageHook>列表中每個ConsumeMessageHook物件的consumeMessageBefore (ConsumeMessageContext context)方法,該consumeMessageHookList變數由業務層呼叫DefaultMQPushConsumerImpl.registerConsumeMessageHook (ConsumeMessageHook hook)方法設定,由業務層自定義ConsumeMessageHook介面的實現類,實現該介面的consumeMessageBefore(final ConsumeMessageContext context)和consumeMessageAfter(final ConsumeMessageContext context)方法,分別在回撥業務層的具體消費方法之前和之後呼叫者兩個方法。

3、遍歷List<MessageExt>列表,檢查每個訊息(MessageExt物件)中topic值,若該值等於"%RETRY%+consumerGroup",則從訊息的propertis屬性中獲取"RETRY_TOPIC"屬性的值,若該屬性值不為null,則將該屬性值賦值給該訊息的topic值;對於重試訊息,會將真正的topic值放入該屬性中;

4、呼叫ConsumeMessageConcurrentlyService.messageListener: MessageListenerConcurrently的consumeMessage方法;該messageListener變數是由DefaultMQPushConsumer.registerMessageListener (MessageListener messageListener)方法在業務層設定的,對於併發消費,則在業務層就要實現MessageListenerConcurrently介面的consumeMessage方法;該回調方法consumeMessage返回ConsumeConcurrentlyStatus. CONSUME_SUCCESS或者ConsumeConcurrentlyStatus.RECONSUME_LATER值;

5、檢查上一部分返回值status,若為null,則置status等於ConsumeConcurrentlyStatus.RECONSUME_LATER;

5、檢查DefaultMQPushConsumerImpl.consumeMessageHookList: ArrayList<ConsumeMessageHook>是否為空,若不是,則初始化ConsumeMessageContext物件,並呼叫ArrayList<ConsumeMessageHook>列表中每個ConsumeMessageHook物件的executeHookAfter(ConsumeMessageContext context)方法,該ArrayList<ConsumeMessageHook>列表是應用層設定的回撥類;

6、處理消費失敗的訊息。對於回撥方法consumeMessage的執行結果為失敗的訊息,以內部Producer的名義重發到Broker端用於重試消費。若ProcessQueue.dropped為false,呼叫ConsumeMessageConcurrentlyService. processConsumeResult(ConsumeConcurrentlyStatus status, ConsumeConcurrentlyContext context, ConsumeRequest consumeRequest)方法,該方法的大致邏輯如下:

6.1)由於ConsumeConcurrentlyContext.ackIndex初始的預設值為Integer.MAX_VALUE,表示消費成功的個數,該變數值可由業務層在執行回撥方法失敗之後重新設定;若status等於CONSUME_SUCCESS則判斷ackIndex是否大於ConsumeRequest中MessageExt列表的個數,如果大於則表示該列表的訊息全部消費成功,則將ackIndex置為List<MessageExt>列表的個數減1,如果小於則ackIndex表示消費失敗的訊息在列表的位置;然後對消費成功/失敗個數進行統計;若status等於RECONSUME_LATER則表示訊息全部消費失敗,置ackIndex=-1,並對消費失敗個數進行統計;

6.2)若此訊息模式為廣播(DefaultMQPushConsumer.messageModel設定),則從消費失敗的列表位置(ackIndex+1)開始遍歷List<MessageExt>列表,列印消費失敗的MessageExt日誌資訊;

6.3)若此訊息模式為叢集(DefaultMQPushConsumer.messageModel設定),則從消費失敗的列表位置(ackIndex+1)開始遍歷List<MessageExt>列表,對於消費失敗的MessageExt物件,呼叫 ConsumeMessageConcurrentlyService.sendMessageBack (MessageExt msg, ConsumeConcurrentlyContext context)方法向Broker傳送CONSUMER_SEND_MSG_BACK請求碼;對於傳送BACK訊息失敗之後傳送RETRY訊息也失敗的MessageExt訊息構建一個臨時列表msgBackFailedList;

6.4)檢查上一步的臨時列表msgBackFailedList是否為空,若不為空則說明有傳送重試訊息失敗的,則首先從ConsumeRequest.msgs: List<MessageExt>變數列表中刪除消費失敗的這部分訊息,然後呼叫ConsumeMessageConcurrentlyService.submitConsumeRequestLater(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue)方法,在該方法中初始化一個匿名執行緒,並將該執行緒放入排程執行緒池(ConsumeMessageConcurrentlyService.scheduledExecutorService)中,延遲5秒之後執行該匿名執行緒,該匿名執行緒的run方法呼叫ConsumeMessageConcurrentlyService.submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispatchToConsume)方法,將傳送BACK訊息失敗之後傳送RETRY訊息也失敗的List<MessageExt>列表封裝成ConsumeRequest執行緒物件,再次提交到ConsumeMessageConcurrentlyService.consumeExecutor執行緒池中;

6.5)將消費成功了的訊息(ConsumeRequest.msgs列表中剩下的)從 ProcessQueue.msgTreeMap列表中刪除,同時更新 ProcessQueue.msgCount並返回該列表中第一個MessageExt元素的key值,即該元素的queueoffset值;

6.6)若上一步返回的queueoffset大於等於0,則呼叫 OffsetStore.updateOffset( MessageQueue mq, long offset, boolean increaseOnly)更新該消費佇列的消費進度;

4.2 對消費失敗的資訊傳送重試訊息給Broker(用於訊息重試)

在業務層消費訊息失敗並且是叢集模式下,會呼叫ConsumeMessageConcurrentlyService.sendMessageBack(MessageExt msg, ConsumeConcurrentlyContext context)方法。在該方法中呼叫DefaultMQPushConsumerImpl.sendMessageBack(MessageExt msg,int delayLevel, String brokerName)方法,大致邏輯如下:

1、根據brokerName獲取Broker地址;

2、呼叫MQClientAPIImpl.consumerSendMessageBack(String addr, MessageExt msg, String consumerGroup, int delayLevel, long timeoutMillis)方法,其中delayLevel引數由ConsumeConcurrentlyContext. delayLevelWhenNextConsume可在業務層的回撥方法中設定,預設為0(表示由伺服器根據重試次數自動疊加);構建ConsumerSendMsgBackRequestHeader物件,其中該物件的offset等於MessageExt.commitLogOffset、originTopic等於MessageExt.topic、originMsgId等於MessageExt.msgId;然後傳送CONSUMER_SEND_MSG_BACK請求碼的資訊給Broker(詳見3.1.17小節);

3、如果傳送重試訊息出現異常,則構建以%RETRY%+consumerGroup為topic值的新Message訊息,構建過程如下:

3.1)從MessageExt訊息的properties屬性中獲取"ORIGIN_MESSAGE_ID"屬性值,若沒有則以MessageExt訊息的msgId來設定新Message資訊的properties屬性的ORIGIN_MESSAGE_ID屬性值,若有該屬性值則以該屬性值來設定新Message資訊的properties屬性的ORIGIN_MESSAGE_ID屬性值。保證同一個訊息有多次傳送失敗能獲取到真正訊息的msgId;

3.2)將MessageExt訊息的flag賦值給新Message資訊的flag值;

3.3)將MessageExt訊息的topic值存入新Message資訊的properties屬性中"RETRY_TOPIC"屬性的值;

3.4)每次消費重試將MessageExt訊息的properties屬性中"RECONSUME_TIME"值加1;然後將該值再加3之後存入新Message資訊的properties屬性中"DELAY"屬性的值;

4、呼叫在啟動Consumer時建立的名為"CLIENT_INNER_PRODUCER"的DefaultMQProducer物件的send(Message msg)方法,以Consumer內部的訊息Producer重發訊息(該訊息是消費失敗且回傳給Broker也失敗的);

相關推薦

RocketMQ——ConsumerPUSH模式消費訊息順序併發

1 接受並處理Broker返回的響應訊息 當傳送拉取訊息在Broker返回響應訊息之後呼叫NettyRemotingAbstract.processMessageReceived(ChannelHandlerContext ctx, RemotingComma

RocketMQ——ConsumerPULL模式訊息消費DefaultMQPullConsumer

1 應用層的使用方式 在應用層初始化DefaultMQPullConsumer類,然後呼叫該類的start方法啟動Consumer;接下來的消費步驟如下: 1、呼叫DefaultMQPullConsumer.fetchSubscribeMessageQueu

總綱產品結構設計指導VI本博客指引章節

normal 定制化 watermark 設計 規範化 problems square span 博客 本章目的:搭建自己的產品結構設計konw-how體系,從零開始設計一個完整產品。 需知遠途即捷徑! (//作者的結構設計體系尚在搭建中,所有的文章都會定期進行

linux Nginx配置gzip常用引數詳解附使用效果驗證過程

gzip模組是使用“gzip”方法壓縮響應的過濾器,有助於將響應傳輸的資料大小減少一半甚至更多,能有效的緩解頻寬及流量問題。以下內容均翻譯於官網使用者手冊,介紹了gzip的一些常用引數,更多配置資訊可

OpenCV學習第三圖片的掩膜操作實現影象的對比度調整

掩膜操作實現影象的對比度調整 矩陣的掩膜操作十分簡單,根據掩膜來重新計算每個畫素的畫素值,掩膜(mask也被稱為kernel) I(i,j) = 5* I(i,j)-[I(i-1,j)+I(i+1,

阿里RocketMQ如何解決訊息順序重複大硬傷

分散式訊息系統作為實現分散式系統可擴充套件、可伸縮性的關鍵元件,需要具有高吞吐量、高可用等特點。而談到訊息系統的設計,就回避不了兩個問題: 訊息的順序問題 訊息的重複問題 RocketMQ作為阿里開源的一款高效能、高吞吐量的訊息中介軟體,它是怎樣來解決這

RocketMQ原始碼解析Message拉取&消費

title: RocketMQ 原始碼分析 —— Message 拉取與消費(下) date: 2017-05-11 tags: categories: RocketMQ permalink: RocketMQ/message-pull-and-cons

Linux實戰第五RHEL7.3Nginx虛擬主機配置實戰基於別名

虛擬主機 nginx個人筆記分享(在線閱讀):http://note.youdao.com/noteshare?id=05daf711c28922e50792c4b09cf63c58PDF版本下載http://down.51cto.com/data/2323313本文出自 “人才雞雞” 博客,請務必保留此出處

Linux實戰第八CentOS7.3Nginx虛擬主機配置實戰基於端口

基於 sub 主機配置 centos7.3 entos ada .com 版本 fad 個人筆記分享(在線閱讀): http://note.youdao.com/noteshare?id=9a8b56ec54800ccf197eb6c23de55a85&sub=2E3048

阿里雲端渲染系列-第一公網方式的雲端渲染

目 錄 1.文件介紹 12.基本知識點 12.1. 渲染環境 12.2. 常見製作軟體(略) 12.3. 常見渲染器(略) 23.雲產品介紹 23.1. VPC 23.2. VPN 23.3. 高速通道 23.4. ECS 23.5. BCS 23.6. OSS 23.7. NAS 23.8. 塊儲存 24

設計模式狀態模式

完整工程:https://gitee.com/NKG/DesignPatterns/blob/master/DesignPatterns.unitypackage 首先在Unity下面建立如下檔案 首先是場景都要繼承的 ISceneState 其中包括了場景的生命週期函式,

設計模式狀態模式

當我們用Unity進行場景切換時,可能會寫下如下程式碼: using UnityEngine; using UnityEngine.SceneManagement; public class ReverseVersion : MonoBehaviour { private stri

linux學習第一在VirtualBox安裝linux作業系統

轉載地址:https://blog.csdn.net/yuchao2015/article/details/52132270 感謝博主提供如此詳細的linux安裝地址  目標:在linux伺服器上部署Java開發的網站  工具 VirtualBox-4.3.8:下載後

spark作業監控standalone模式檢視歷史作業

1.關閉現有的master和worker程序 2.修改spark-defaults.conf檔案,配置三個屬性 spark.eventLog.enabled true spark.eventLog.dir hdfs://centos-5:900

ChainDesk開發模式的測試-簡化我們對鏈碼的測試過程

作者:ChainDesk韓小東,ChainDesk區塊鏈行業分析師, ChainDesk區塊鏈工程師     目標 熟練掌握鏈碼的 dev 開發測試模式 任務實現 從之前對鏈碼操作來看,我們需要在相關操作(安裝、例項化、升級、呼叫、查詢

第三SpringCloud之服務消費Feign

上一篇文章,講述瞭如何通過RestTemplate+Ribbon去消費服務,這篇文章主要講述如何通過Feign去消費服務。 Feign簡介 Feign是一種宣告式、模板化的HTTP客戶端,它使得寫Http客戶端變得更簡單。使用Feign,只需要建立一個介面並註解。它具有可

第二SpringCloud之服務消費Ribbon

在微服務架構中,業務都會被拆分成一個獨立的服務,服務與服務的通訊是基於Http RESTful的。SpringCloud有兩種服務呼叫方式,一種是Ribbon+RESTTemplate,另一種是Feign。在這一篇文章首先講解下基於Ribbon+REST。 Ribbon簡介

設計模式第3生成器模式Builder

一.生成器模式要解決的問題   生成器模式主要解決工廠方法模式和抽象工廠模式在所建立的物件包含大量的屬性時所面臨的問題: 當客戶端程式向工廠類傳遞大量引數時很容易發生錯誤,因為很多引數的型別有可能是相同的,並且也很難保證傳遞引數的順序。 在傳遞引數時有些引數是可選的,但是在工廠設計模式中,所有的

設計模式第5介面卡模式

一.介面卡模式要解決的問題 介面卡模式屬於結構設計模式,我們把將兩個不相關的介面做一個適配以使二者進行互動的介面成為介面卡。比如充電器就是一個介面卡,正常電源是120V,手機電池輸入電壓是3V,電瓶車電池輸入電壓為20V,空調輸入電壓為120V,通過介面卡適配,可以將120V電源轉換為3V、20V、120V

react-native系列(2)入門Windows系統配置Android開發環境

本文主要內容是在Window系統下配置Android APP的開發環境,配置過程將會有點麻煩,請大家需要有點耐心。當然大家也可以參考RN官網的配置過程,但實際上很多開發者根據官網步驟並沒能配置出完整的RN開發環境。目前RN的最新版本是0.57,我們將以這個版本為例配置Android的開發環境。