1. 程式人生 > >RocketMQ原始碼深度解析三之Broker篇

RocketMQ原始碼深度解析三之Broker篇

(一)Broker的初始化
在初始化過程中,會呼叫 BrokerController 物件的 initialize 方法進行初始化工作,大致邏輯如下:
(1)載入 topics.json、 consumerOffset.json、 subscriptionGroup.json 檔案,分別將各檔案的資料存入 TopicConfigManager、 ConsumerOffsetManager、SubscriptionGroupManager 物件中;
下面給出示例

//載入topics.json
{
        "dataVersion":{
        "counter":2,
        "timestatmp"
:1393729865073 }, "topicConfigTable":{ //根據 consumer 的 group 生成的重試 topic "%RETRY% group_name":{ "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"%RETRY% group_name", "writeQueueNums":1 }, "TopicTest"
:{ "perm":6, // 100 讀許可權 , 10 寫許可權 6 是 110 讀寫許可權 "readQueueNums":8, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "writeQueueNums":8 } } }
//載入消費進度偏移量consumerOffset.json
{
    "offsetTable":{
        "%RETRY% group_name@ group_name":{
            0
:0 //重試佇列消費進度為零 }, "TopicTest@ group_name":{ 0:23,1:23,2:22,3:22,4:21,5:18,6:18,7:18 //分組名 group_name 消費 topic 為 TopicTest 的進度為: // 佇列 queue=0 消費進度 23 // 佇列 queue=2 消費進度為 22 等等… } } }
//載入消費者訂閱關係
{
    "dataVersion":{
        "counter":1,
        "timestatmp":1393641744664
    },
    " group_name":{
            "brokerId":0, //0 代表這臺 broker 機器為 master,若要設為 slave 值大於 0
            "consumeBroadcastEnable":true,
            "consumeEnable":true,
            "consumeFromMinEnable":true,
            "groupName":" group_name",
            "retryMaxTimes":5,
            "retryQueueNums":1,
            "whichBrokerWhenConsumeSlowly":1
        }
    }
}

(2)初始化 DefaultMessageStore 物件,該物件是應用層訪問儲存層的訪問類;
(3)呼叫 DefaultMessageStore.load 載入資料
3.1)呼叫 ScheduleMessageService.load 方法,初始化延遲級別列表
3.2)呼叫 CommitLog.load 方法,在此方法中呼叫 MapedFileQueue.load 法,將HOME/store/commitlogMapedFileQueueList3.3)調DefaultMessageStore.loadConsumeQueueconsumequeueDefaultMessageStore.consumeQueueTable3.4調TransactionStateService.load()tranStateTabletranRedoLog3.5)StoreCheckPointHOME/store/checkpoint 檔案,該
檔案記錄三個欄位值,分別是物理佇列訊息時間戳、邏輯佇列訊息時間戳、索引佇列訊息時間戳
3.6)呼叫 IndexService.load 方法載入$HOME/store/index 目錄下的檔案。
對該目錄下的每個檔案初始化一個 IndexFile 物件。
3.7)恢復記憶體資料。
4、初始化 Netty 服務端 NettyRemotingServer 物件
5、初始化傳送訊息執行緒池( sendMessageExecutor)、拉取訊息執行緒池(pullMessageExecutor)、管理 Broker 執行緒池( adminBrokerExecutor)、客戶端管理執行緒池( clientManageExecutor)
6、註冊事件處理器
7、 啟動定時任務。
8、若該 Broker 為主用,則啟動定時任務列印主用 Broker 和備用 Broker 在Commitlog 上的寫入位置相差多少個位元組
9、若該 Broker 為備用,設定同步 Config 檔案的定時任務,每隔 60 秒呼叫 SlaveSynchronize.syncAll()方法向主用 Broker 請求一次 config 類檔案的同步。

(二)啟動過程
1、呼叫 DefaultMessageStore.start 方法啟動 DefaultMessageStore 物件中的一些服務執行緒。
2、 啟動 Broker 的 Netty 服務端 NettyRemotingServer。監聽消費者或生產者發起的請求資訊並處理;
3、啟動 BrokerOuterAPI 中的 NettyRemotingClient,即建立與 NameServer的連結,用於自身 Broker 與其他模組的 RPC 功能呼叫;
4、 啟動拉訊息管理服務 PullRequestHoldService,當拉取訊息時未發現訊息,則初始化 PullRequeset 物件放入該服務執行緒的 pullRequestTable 列表中,由 PullRequestHoldService 每隔 1 秒鐘就檢查一遍每個 PullRequeset 物件要讀取的資料位置在 consumequeue 中是否已經有資料了,若有則交由PullManageProcessor 處理。
5、 啟動 ClientHousekeepingService 服務,在啟動過程中設定定時任務
6、啟動 FilterServerManager,每隔 30 秒定期一次檢查 Filter Server 個數,若沒有達到 16 個則建立
7、首先呼叫 BrokerController.registerBrokerAll 方法立即向 NameServer註冊 Broker;然後設定定時任務,每隔 30 秒呼叫一次該方法向 NameServer 註冊;
8、啟動每隔 5 秒對未使用的 topic 資料進行清理的定時任務。該定時任務呼叫 DefaultMessageStore.cleanUnusedTopic(Settopics)方法

(三)向NameServer註冊Broker
呼叫 BrokerController.registerBrokerAll 方法立即向 NameServer 註冊Broker。大致步驟如下:
(1)將 TopicConfigManager.topicConfigTable 變數序列化成TopicConfigSerializeWrapper 物件;
(2)通過BrokerOuterAPI.registerBroker向NameServer註冊
(3)根據 updateMasterHAServerAddrPeriodically 標註位判斷是否需要更新主用Broker地址
(4)用 NameServer 返回的 MasterAddr 值更新 SlaveSynchronize.masterAddr值,用於主備 Broker 同步 Config 檔案使用;
(5)根據 Name Server 上的配置引數 ORDER_TOPIC_CONFIG 的值來更新Broker端的 TopicConfigManager.topicConfigTable 變數值

(四)處理Producer傳送的訊息
如果收到請求碼為SEND_MESSAGE/SEND_MESSAGE_V2 的訊息後轉由 SendMessageProcessor 處理。大致過程如下:
1、解碼收到的訊息,並構建 SendMessageRequestHeader 物件;
2、若 SendMessageProcessor 處理器設定了傳送訊息的鉤子,則呼叫該鉤子類的
sendMessageBefore 方法,執行傳送前的處理方法,在傳送訊息結果之後呼叫
sendMessageAfter 方法。 該鉤子類是為了便於業務層面的擴充套件而設計的
3、 呼叫 SendMessageProcessor.sendMessage方法處理的訊息並返回處理結果
PutMessageResult 物件;
4、 根據上一步返回的 PutMessageResult 物件中的status,設定相應的訊息程式碼

(五)處理心跳訊息
接受到客戶端傳送的 HEART_BEAT 請求碼之後,由ClientManageProcessor.processRequest方法處理該請求。心跳訊息呼叫
heartBeat(ChannelHandlerContext ctx, RemotingCommand request)方法處理。大致邏輯如下:
1、 解碼接受訊息,生成 HeartbeatData 物件;
2、根據連結的 Channel、 ClientID 等資訊初始化 ClientChannelInfo 物件
3、若 HeartbeatData 物件中的 ConsumerData 集合有資料,則進行 Consumer註冊
4、若 HeartbeatData 物件中的 ProducerData 集合有資料,則對每個ProducerData進行 Producer 註冊

其實都是根據收到的請求碼,然後執行相應的驗證和處理,其他的方法就不詳細說了。有興趣的可以自己去看,遇到不懂的歡迎和我交流。