Rocketmq實現簡析
本來想將broker和client分開寫。但是他們的每個功能都是共同協作完成的,寫broker的時候,難免會涉及到consumer和producer的細節,於是以大雜燴的方式粗略總結了rocketmq的主要功能,主要是broker。
一、nameSvr
RocketMq是Metaq的3.0版本,摒棄了之前使用的zk,用以新的命名服務namesrv,namesrv的程式碼量比較少,我們從這裡開始我們的原始碼分析路程。
可以看到Main-ClassNamesrvStartup
除了讀取配置,初始化日誌以外,主要功能是由NamesrvController
完成的。我們進一步檢視NamesrvController類,發現其是有以下幾個模組構成:RemotingServer
,KVConfigManager
,RouteInfoManager
1.2 namesrv的內部元件
1.2.1 KVConfigManager
KVConfigManager
為broker提供namespace,K,V 的雙層kv儲存/讀取功能。當發生資料變更時,會實時刷盤。內部用讀寫鎖來控制併發操作。
1.2.2 NettyRemotingServer
提供對外訊息接收處理服務。NettyRemotingServer
繼承自NettyRemotingAbstract
,後者對netty進行了封裝,抽象出server和client的公共部分。server的實現即為NettyRemotingServer
。namesrv
通過其registerDefaultProcessor
方法註冊了訊息處理物件DefaultRequestProcessor
。處理如下訊息:
- K,V 系統增刪改查功能;
- broker的註冊登出;topic的增刪改查;
- 獲取叢集資訊,包括broker列表和broker的叢集名稱和broker組名稱;
具體實現根據功能轉由KVConfigManager
和RouteInfoManager
模組處理。
1.2.3 RouteInfoManager
路由資訊管理,namesrv的核心模組。維護以下列表的增刪改查
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
1.3定時任務scheduledExecutorService
處理兩個事情:
-
剔除失聯的broker,預設2min超時。
-
列印KVConfigManager中的儲存資訊,預設10min觸發一次。
二、broker的元件
-
1.remotingServer 監聽10911埠,用於監聽comsumer和producer的訊息
-
2.fastRemotingServer 監聽埠10909 fastRemotingServer,埠號由10911-2得到。功能和10911高度重合(不懂作用是啥?可能是類似於vip通道?)
-
3.brokerOuterAPI broker的客戶端網路介面,維護與namesvr的通訊,每隔30s執行一次
registerBroker
,用於把自身維護的topic資訊傳送到所有namesvr,同時這次報文也充當心跳的作用。 -
4.messageStore 儲存模組,可以說是broker的核心。
-
4.1scheduleMessageService 負責延時將佇列的訊息寫到真實佇列。
-
4.2flushConsumeQueueService 定期將consumequeue刷盤。
-
4.3commitLog
-
4.4storeStatsService 統計訊息寫入commitlog耗時,qps等資訊。
-
4.5reputMessageService 每1ms將commitlog中的變化量寫入consumerqueue.
-
4.6 haService
-
4.7cleanCommitLogService 負責清理commitLog的過期檔案,如果滿足時間(每天凌晨4點)/磁碟空間不足/有人手動刪除過(???沒看懂) 中的一項,即可刪去3天內沒有修改記錄的commitlog。
-
4.8cleanConsumeQueueService 清理consumequeue的過期檔案,觸發條件比較苛刻,一個consumequeue檔案最多可以儲存30w個訊息位置資訊,檢查最後一個訊息的offset(即這個檔案中最新的訊息)是否小於commitlog的最小offset,如果是則刪除。即如果某個檔案的訊息不滿30w,肯定不會被刪除。
-
5.pullRequestHoldService consumer 拉取訊息如果還沒有訊息,則可以阻塞一定的時間直到有新的訊息或超時。pullRequestHoldService用於維護這些拉取請求。
-
6.clientHousekeepingService 用於清理1,2,3中心跳超時的連結。
-
7.filterServerManager 定期執行指令碼
startfsrv.sh
,啟動Filtersrv服務,暫未研究其工作內容。
三、一條producer發來的訊息處理流程
2.1 正常流程
-
-
預步驟:
adminProcessor
用於處理remotingServer
和fastRemotingServer
的處理器,除卻BrokerController#registerProcessor
方法中特殊註冊的處理器,其他都由adminProcessor
處理。在這裡,將處理器SendMessageProcessor註冊到通訊服務中。
-
預步驟:
-
-
producer把訊息發給broker,由netty轉交給
SendMessageProcessor#sendMessage
函式處理。
-
producer把訊息發給broker,由netty轉交給
-
-
檢查本地是否有該topic的該佇列。如果沒有且配置了自動建立,按照
TBW102
中的perm
(位於~/store/config/topics.json,perm共3bit,分別代表可讀,可寫,可自動建立。注意客戶端配置的queue不能超過預設)佇列的配置來建立一個新的topic及其佇列。(預設建立不太好,訊息服務無法控制訊息來源。一個broker預設建立了新的佇列以後,根據預設的pushConsumer的實現,如果使用者沒有額外配置其他broker來消費者這個topic,後續該topic下的訊息均會打到這個broker上。還是由運維人員手動配置比較好)。
-
檢查本地是否有該topic的該佇列。如果沒有且配置了自動建立,按照
-
-
由
DefaultMessageStore#putMessage
->CommitLog#putMessage
來將訊息寫入commitLog中。如果訊息配置了延遲處理的話(18個級別,分別對應時間 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"),會放入佇列SCHEDULE_TOPIC_XXXX
,broker會觸發
DeliverDelayedMessageTimerTask
任務,在時間到達的時候將訊息再寫入原始的topic(所以物理上會存了兩遍)。刷盤策略 見後文。
-
-
5.每隔1ms執行
ReputMessageService#doReput
,目的是以佇列分類,記錄每條訊息的大小,儲存在consumequeue/TopicTest/0/00000000000000000000
(示例topic+queue+offset)中,從commitlog中讀取還沒有寫入consumer的訊息。(疑問 :CommitLog#checkMessageAndReturnSize
的時候又讀了一次commitLog,為什麼不做成包裝一個簡要資訊塞入佇列的方式拋給ReputMessageService)。 -
6.觸發等待該佇列的consumer的pullRequest。
2.2 處理失敗的訊息
consumer對於消費失敗的訊息,可以在MessageListenerConcurrently#consumeMessage
處理的時候丟擲異常或者返回RECONSUME_LATER
來標示訊息處理失敗。對於失敗的訊息將會把原始訊息的offset發回給broker。
broker在commitlog中找到原始的訊息內容,取出來並投遞到新的retry topic(名稱%RETRY%+consumerGroupName)中。這裡有兩個關鍵點:根據延遲level指數退避先投遞到延遲佇列,如果最後retry topic依然消費失敗。那麼進入死信佇列(名稱%DLQ%+consumerGroupName)。
四、producer和consumer的工作流程
在瞭解消費流程之前,我們先看一下client(producer和consumer)的工作流程。
producer和consumer的工作流程大致是一致的,但是也有差異部分。詳見MQClientInstance#start
。
3.1 公共部分
-
3.1.1MQClientAPIImpl 負責和namesrv、broker通訊。其中帶
oneway
的傳送介面作用:傳送成功即可,不管broker的反應。 -
3.1.2定時任務,是和MQClientAPIImpl模組配合使用。
-
從nameserver獲取broker資訊。
-
向broker傳送心跳,並清除client中超時broker
-
consumer通知broker更新消費進度
-
動態更新本地執行緒池大小(3.4.6 版本中還沒有實現完全).
3.2 consumer獨有
其實producer也有啟動相應執行緒,但是沒有觸發條件,無法執行邏輯。
3.2.1.RebalanceService
服務,每10s執行一次,對於每個消費組的每個topic,從broker獲取到consumer同胞,然後根據負載演算法均攤所有的佇列。broker可以控制每個topic佇列的多少來完成帶權重的訊息負載,producer可以通過指定傳送的佇列來實現權重生產。consumer如果要實現類似功能,可以呼叫setAllocateMessageQueueStrategy
修改rocketmq的負載策略。
3.2.2.PullMessageService
服務,負責發起拉取訊息的任務。RebalanceService
服務呼叫抽象方法RebalanceImpl#dispatchPullRequest
將新增的broker佇列分發出去,其中pushconsumer的實現RebalancePushImpl
會呼叫PullMessageService
的介面向目標broker發起拉取訊息的請求。concumer從namesrv中獲取同組同topic的消費者,每個消費者分配不重複的佇列,所以具體使用的時候,消費者的數量應該要大於佇列的數量是沒有意義的。具體實現是rebalanceByTopic#rebalanceByTopic
。
在上文<<一條producer發來的訊息處理流程>>也有說過,訊息有可能會消費失敗,消費失敗的訊息最後都進了%RETRY%consumerGroupName
佇列,因此消費者在消費的時候,除了訂閱自己負責的topic,還需要訂閱本消費組的retry佇列。
五、一條push方式consumer發來的pull請求處理流程
-
1.consumer啟動的時候觸發rebalance,叢集模式下需要從broker取得topic+consumerGroup的消費進度,方法
RebalancePushImpl#computePullFromWhere
。broker處理函式ClientManageProcessor#queryConsumerOffset
,從本地配置config/consumerOffset.json
中讀取出進度返回給consumer。 -
2.consumer從拿到的offset開始消費,broker處理入口
PullMessageProcessor#processRequest
,對請求引數做一系列的檢查。先從consumequeue/TopicName/queueId/offset
(由於consumerqueue檔案每個訊息儲存均用了20位元組,因此可以很方便的根據offset讀出實際訊息在commitlog中的位置)中讀取到實際conmmitlog中的訊息位置,然後再到commitlog中找到具體的訊息,返回給consumer。
consumer的拉取模式
對於push模式來說,rocketmq採用的卻是pull的方式來獲取訊息。pull的間隔似乎決定了broker把訊息推給consumer的延時,間隔太長,訊息實時性無法保證,時間太短,徒增cpu和網路資源。但是rocketmq給出了一個比較好的解決方案。consumer對於分配到自己身上的每個broker的每個佇列,在pull請求的時候給出一個掛起時間pollingTimeMills
(第一次是由RebalanceService
觸發的,pollingTimeMills預設15s),如果對某個佇列查詢的結果是沒有新訊息,那麼掛起pollingTimeMills
時間,期間如果有新訊息到來,呼叫brokerPullRequestHoldService#notifyMessageArriving
來重新觸發一次訊息拉取返回給consumer,如果超時了也返回給consumer。consumer在接收到回覆以後立即發起下一條查詢。
producer傳送同步訊息如何實現?
每條訊息有一個唯一標識opaque,傳送一條訊息前,建立一個ResponseFuture,ResponseFuture內部維護了一個計數為1的CountDownLatch物件,儲存到上下文列表responseTable(型別:ConcurrentHashMap<Integer /* opaque */, ResponseFuture>)中, 在當前程序接收到訊息後,先判斷是請求還是回覆,如果是回覆則清除掉responseTable中的記錄,並減少CountDownLatch的計數。使用者在傳送介面中等到CountDownLatch的結果就可以返回了。
順序訊息
順序訊息由以下兩點來保證。
1.從producer到broker的順序性,producer對於同一類順序訊息,選擇同一個broker的同一個queue(呼叫SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) 介面來定製訊息傳往那個queue),tcp傳輸是有序的且broker中commitlog和consumer各佇列訊息的offset均是順序讀寫。因此這點得到了保證。
2.訊息從broker到consumer的有序性,同一個佇列有且對應了一個consumer且從broker傳送到consumer的時候是順序的。此外consumer註冊監聽器為MessageListenerOrderly
型別,client內部判斷如果是這種型別,在執行投遞到執行緒池的ConsumeRequest
任務時,需要獲取到對應queue佇列的鎖才能繼續呼叫到使用者程式碼,保證了第二點的順序性。
六、儲存模組補充
5.1 abort
broker在啟動的時候會建立一個空的名為abort
的檔案,並在shutdown時將其刪除,用於標識程序是否正常退出,如果不正常退出,會在啟動時做故障恢復(todo:分析具體邏輯)
5.2 commitlog和consumequeue
發給同一個broker的所有topic訊息均順序寫在commitlog當中(包括消費失敗的訊息)。每條訊息的大小不定,因為commitlog本身是無序且不定長的。所以需要有一種檔案來記錄每個topic每條訊息儲存的物理offset,即consumequeue。每個consumequeue檔案順序記錄了某個broker中的某個queue的commitlog offset。但是要做到以groupName來分組消費,我們還需要以每種groupName建立一類可以儲存每個group消費進度的檔案,即config/consumerOffset.json。
5.2.1 刷盤邏輯
-
同步刷盤需要broker配置為同步刷盤且producer在傳送訊息前呼叫
setWaitStoreMsgOK(true)
允許等待broker刷盤結果(預設也是true)。實現為GroupCommitService
,broker會嘗試兩次刷盤,並給出結果給客戶端。 -
非同步刷盤實現
FlushRealTimeService
,每秒觸發,如果髒資料超過4頁刷一次盤。每10s強制刷一次盤,最終是呼叫MappedByteBuffer#force()
方法。
5.3 index檔案
index是rocketmq的索引檔案,如果producer要讓一條訊息支援索引查詢,在傳送前需要指定message的key
欄位。producer或者consumer可以根據方法queryMessage
(協議號12)查詢所有broker中key是該值的訊息記錄。訊息塞入實現在IndexFile#putKey
,訊息獲取實現在IndexFile#selectPhyOffset
。
每個index檔案由 header(40byte),slot table(4byte500w,每個索引訊息的位置: hash(topic+key)%500w),index list(20byte 200w,儲存訊息在commitlog的位置資訊) 三個部分組成。hash衝突如何解決?因為要寫入檔案,開鏈法肯定行不通。rocketmq採取的方式是indexList部分順序寫,同時每個index記錄儲存了前一個相同hash的index的位置。而最尾部的index節點位置儲存在slot table中。
index檔案有如下幾個缺點(自己總結的,可能有謬誤)
index檔案中沒有儲存topic+key
的值,因此對給定一個key,查詢出來的結果可能包含無效值(其他hash值一樣的key),需要client二次過濾,因此client需要儘量確保key是唯一的。
client在查詢時,給定key,maxNum,如果實際獲取的list比較大,會查詢不全。但是rocketmq沒有提供分頁的機制。
七、待完善的部分
1.事務
2.Filtersrv服務
3.HA