RocketMQ原理解析-consumer 5.push消費-順序消費訊息
順序消費服務ConsumeMessageConcurrentlyService構建的時候
構建一個執行緒池來接收消費請求ConsumeRequest
構建一個單執行緒的本地執行緒,用來稍後定時重新消費ConsumeRequest, 用來執行定時週期性(一秒)鍾鎖佇列任務
週期性鎖佇列lockMQPeriodically
獲取正在消費佇列列表ProcessQueueTable所有MesssageQueue, 構建根據broker歸類成MessageQueue集合Map<brokername,Set<MessageQueue>>
遍歷Map<brokername,Set<MessageQueue>>的brokername, 獲取broker的master機器地址,將brokerName的Set<MessageQueue>傳送到broker請求鎖定這些佇列。 在broker端鎖定佇列,其實就是在broker的queue中標記一下消費端,表示這個queue被某個client鎖定。 Broker會返回成功鎖定佇列的集合, 根據成功鎖定的MessageQueue,設定對應的正在處理佇列ProccessQueue的locked屬性為true沒有鎖定設定為false
通過長輪詢拉取到訊息後會提交到訊息服務ConsumeMessageOrderlyService,
ConsumeMessageOrderlyService的submitConsumeRequest方法構建ConsumeRequest任務提交到執行緒池。ConsumeRequest是由ProcessQueue和Messagequeue組成。
ConsumeRequest任務的run方法
判斷proccessQueue是否被droped的, 廢棄直接返回,不在消費訊息
每個messagequeue都會生成一個佇列鎖來保證在當前consumer內,同一個佇列序列消費,
判斷processQueue的lock屬性是否為true,lock屬性是否過期,如果為false或者過期,放到本地執行緒稍後鎖定在消費。 如果lock為true且沒有過期,開始消費訊息
計算任務執行的時間如果大於一分鐘且執行緒數小於佇列數情況下,將processqueue, messagequeue重新構建ConsumeRequest加到執行緒池10ms後在消費,這樣防止個別佇列被餓死
獲取客戶端的消費批次個數,預設一批次為一條
從proccessqueue獲取批次訊息, processqueue.takeMessags(batchSize), 從msgTreeMap中移除訊息放到臨時map中msgTreeMapTemp,這個臨時map用來回滾訊息和commit訊息來實現事物消費
調回調介面消費訊息,返回狀態物件ConsumeOrderlyStatus
根據消費狀態,處理結果
1) 非事物方式,自動提交
訊息訊息狀態為success:呼叫processQueue.commit方法
獲取msgTreeMapTemp的最後一個key,表示提交的 offset
清空msgTreeMapTemp的訊息,已經成功消費
2) 事物提交,由使用者來控制提交回滾(精衛專用)
更新消費進度, 這裡的更新只是一個記憶體offsetTable的更新,後面有定時任務定時更新到broker上去