1. 程式人生 > >RocketMQ原理解析-consumer 5.push消費-順序消費訊息

RocketMQ原理解析-consumer 5.push消費-順序消費訊息

順序消費服務ConsumeMessageConcurrentlyService構建的時候

                   構建一個執行緒池來接收消費請求ConsumeRequest

                   構建一個單執行緒的本地執行緒,用來稍後定時重新消費ConsumeRequest, 用來執行定時週期性(一秒)鍾鎖佇列任務

         週期性鎖佇列lockMQPeriodically

                   獲取正在消費佇列列表ProcessQueueTable所有MesssageQueue, 構建根據broker歸類成MessageQueue集合Map<brokername,Set<MessageQueue>>

                   遍歷Map<brokername,Set<MessageQueue>>的brokername, 獲取broker的master機器地址,將brokerName的Set<MessageQueue>傳送到broker請求鎖定這些佇列。 在broker端鎖定佇列,其實就是在broker的queue中標記一下消費端,表示這個queue被某個client鎖定。 Broker會返回成功鎖定佇列的集合, 根據成功鎖定的MessageQueue,設定對應的正在處理佇列ProccessQueue的locked屬性為true沒有鎖定設定為false

通過長輪詢拉取到訊息後會提交到訊息服務ConsumeMessageOrderlyService,

ConsumeMessageOrderlyService的submitConsumeRequest方法構建ConsumeRequest任務提交到執行緒池。ConsumeRequest是由ProcessQueue和Messagequeue組成。

ConsumeRequest任務的run方法

         判斷proccessQueue是否被droped的, 廢棄直接返回,不在消費訊息

         每個messagequeue都會生成一個佇列鎖來保證在當前consumer內,同一個佇列序列消費,

         判斷processQueue的lock屬性是否為true,lock屬性是否過期,如果為false或者過期,放到本地執行緒稍後鎖定在消費。 如果lock為true且沒有過期,開始消費訊息

         計算任務執行的時間如果大於一分鐘且執行緒數小於佇列數情況下,將processqueue, messagequeue重新構建ConsumeRequest加到執行緒池10ms後在消費,這樣防止個別佇列被餓死

         獲取客戶端的消費批次個數,預設一批次為一條

         從proccessqueue獲取批次訊息, processqueue.takeMessags(batchSize), 從msgTreeMap中移除訊息放到臨時map中msgTreeMapTemp,這個臨時map用來回滾訊息和commit訊息來實現事物消費

         調回調介面消費訊息,返回狀態物件ConsumeOrderlyStatus

         根據消費狀態,處理結果

1)   非事物方式,自動提交

訊息訊息狀態為success:呼叫processQueue.commit方法

                   獲取msgTreeMapTemp的最後一個key,表示提交的 offset

                   清空msgTreeMapTemp的訊息,已經成功消費

2)   事物提交,由使用者來控制提交回滾(精衛專用)

     更新消費進度, 這裡的更新只是一個記憶體offsetTable的更新,後面有定時任務定時更新到broker上去