1. 程式人生 > >RocketMQ原理解析-consumer 3.長輪詢

RocketMQ原理解析-consumer 3.長輪詢

Rocketmq的訊息是由consumer端主動到broker拉取的, consumer向broker傳送拉訊息請求, PullMessageService服務通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取訊息

         DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法執行向broker拉訊息動作

1.      獲取ProcessQueue判讀是否drop的, drop為true返回

2.      給ProcessQueue設定拉訊息時間戳

3.      流量控制,正在消費佇列中訊息(未被消費的)超過閥值,稍後在執行拉訊息

4.      流量控制,正在消費佇列中訊息的跨度超過閥值(預設2000),稍後在消費

5.      根據topic獲取訂閱關係

6.      構建拉訊息回撥物件PullBack, 從broker拉取訊息(非同步拉取)返回結果是回撥

7.      從記憶體中獲取commitOffsetValue  //TODO 這個值跟pullRequest.getNextOffset區別

8.      構建sysFlag   pull介面用到的flag

9.      調底層通訊層向broker傳送拉訊息請求

如果master壓力過大,會建議去slave拉取訊息

如果是到broker拉取訊息清楚實時提交標記位,因為slave不允許實時提交消費進度,可以定時提交 

//TODO 關於master拉訊息實時提交指的是什麼?

10.  拉到訊息後回撥PullCallback

處理broker返回結果pullResult

           更新從哪個broker(master 還是slave)拉取訊息

           反序列化訊息

           訊息過濾

           訊息中放入佇列最大最小offset,方便應用來感知訊息堆積度

將訊息加入正在處理佇列ProcessQueue

將訊息提交到消費訊息服務ConsumeMessageService

流控處理, 如果pullInterval引數大於0 (拉訊息間隔,如果為了降低拉取速度,可以設定大於0的值),延遲再執行拉訊息,  如果pullInterval為0立刻在執行拉訊息動作

序列圖

1.      向broker傳送長輪詢請求


2.   Broker接收長輪詢請求


3.      Consumer接收broker響應


長輪詢活動圖:


一張圖畫不下,再來一張