1. 程式人生 > >RocketMQ中的生產者和消費者的流轉過程

RocketMQ中的生產者和消費者的流轉過程

一:RocketMQ中Remoting

RocketMQ訊息佇列的整體部署架構如下圖所示:

RocketMQ叢集的一部分通訊:

(1)Broker啟動後需要完成一次將自己註冊至NameServer的操作;隨後每隔30s時間定期向NameServer上報Topic路由資訊
(2)訊息生產者Producer作為客戶端傳送訊息時候,需要根據Msg的Topic從本地快取的TopicPublishInfoTable獲取路由資訊。如果沒有則更新路由資訊會從NameServer上重新拉取;
(3)訊息生產者Producer根據(2)中獲取的路由資訊選擇一個佇列,(MessageQueue)進行訊息傳送;Broker作為訊息的接收者收訊息並落盤儲存;

二、RocketMQ事務訊息

Half(Prepare) Message

指的是暫不能投遞的訊息,傳送方已經將訊息成功傳送到了 MQ 服務端,但是服務端未收到生產者對該訊息的二次確認此時該訊息被標記成“暫不能投遞”狀態,處於該種狀態下的訊息即半訊息

訊息回查

由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確認丟失,MQ 服務端通過掃描發現某條訊息長期處於“半訊息”時,需要主動向訊息生產者詢問該訊息的最終狀態(Commit 或是 Rollback),該過程即訊息回查。

執行流程圖

截圖

  1. 傳送方向 MQ 服務端傳送訊息。
  2. MQ Server 將訊息持久化成功之後,向傳送方 ACK 確認訊息已經發送成功,此時訊息為半訊息。
  3. 傳送方開始執行本地事務邏輯。
  4. 傳送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半訊息標記為可投遞,訂閱方最終將收到該訊息;MQ Server 收到 Rollback 狀態則刪除半訊息,訂閱方將不會接受該訊息。
  5. 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間後 MQ Server 將對該訊息發起訊息回查。
  6. 傳送方收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
  7. 傳送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半訊息進行操作。

NameServer作用

nameServer顧名思義,在系統中肯定是做命名服務,服務治理方面的工作,功能應該是和zookeeper差不多,據我瞭解,RocketMq的早期版本確實是使用的zookeeper,後來改為了自己實現的nameserver。

現在來看一下nameServer在RocketMQ中的兩個主要作用:

  • NameServer維護了一份Broker的地址列表和,broker在啟動的時候會去NameServer進行註冊,會維護Broker的存活狀態.

  • NameServer維護了一份Topic和Topic對應佇列的地址列表,broker每次傳送心跳過來的時候都會把Topic資訊帶上.

結合部署結構圖,描述叢集工作流程:
1,啟動Namesrv,Namesrv起來後監聽埠,等待Broker、Produer、Consumer連上來,相當於一個路由控制中心。
2,Broker啟動,跟所有的Namesrv保持長連線,定時傳送心跳包。心跳包中包含當前Broker資訊(IP+埠等)以及儲存所有topic資訊。註冊成功後,namesrv叢集中就有Topic跟Broker的對映關係。
3,收發訊息前,先建立topic,建立topic時需要指定該topic要儲存在哪些Broker上。也可以在傳送訊息時自動建立Topic。
4,Producer傳送訊息,啟動時先跟Namesrv叢集中的其中一臺建立長連線,並從Namesrv中獲取當前傳送的Topic存在哪些Broker上,然後跟對應的Broker建長連線,直接向Broker發訊息
5,Consumer跟Producer類似。跟其中一臺Namesrv建立長連線,獲取當前訂閱Topic存在哪些Broker然後直接跟Broker建立連線通道,開始消費訊息。

參考連結:https://www.jianshu.com/p/d5da161efc33