1. 程式人生 > >RocketMQ原理解析-broker 4.HA & master slave

RocketMQ原理解析-broker 4.HA & master slave

在broker啟動的時候BrokerController如果是slave,配置了master地址更新,沒有配置所有broker會想namesrv註冊,從namesrv獲取haServerAddr,然後更新到HAClient

當HAClient的MasterAddress不為空的時候(因為broker  master和slave都構建了HAClient)會主動連線master獲取SocketChannel

Master監聽Slave請求的埠,預設為服務埠+1

接收slave上傳的offset long型別

int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8)  //沒有理解意圖

long readOffset =this.byteBufferRead.getLong(pos - 8);

this.processPostion = pos;

主從複製從哪裡開始複製:如果請求時0 ,從最後一個檔案開始複製

Slave啟動的時候brokerController開啟定時任務定時拷貝master的配置資訊

SlaveSynchronize類代表slave從master同步資訊(非訊息)

         syncTopicConfig       同步topic的配置資訊

         syncConsumerOffset       同步消費進度

         syncDelayOffset                同步定時進度

         syncSubcriptionGroupConfig  同步訂閱組配7F6E

HaService類實現了HA服務,負責同步雙寫,非同步複製功能, 這個類master和slave的broker都會例項化,

Master通過AcceptSocketService監聽slave的連線,每個masterslave連線都會構建一個HAConnection物件搭建他們之間的橋樑,對於一個master多slave部署結構的會有多個HAConnection例項,

Master構建HAConnection時會構建向slave寫入資料服務執行緒物件WriteSocketService物件和讀取Slave反饋服務執行緒物件ReadSocketService

WriteSocketService

         向slave同步commitLog資料執行緒,

slaveRequestOffset是每次slave同步完資料都會向master傳送一個ack表示下次同步的資料的offset。

如果slave是第一次啟動的話slaveRequestOffset=0, master會從最近那個commitLog檔案開始同步。(如果要把master上的所有commitLog檔案同步到slave的話, 把masterOffset值賦為minOffset)


向socket寫入同步資料: 傳輸資料協議<Phy Offset> <Body Size> <Body Data>

ReadSocketService:

         讀取slave通過HAClient向master返回同步commitLog的物理偏移量phyOffset值

         通知前端執行緒,如果是同步複製的話通知是否複製成功

Slave 通過HAClient建立與master的連線, 

來定時彙報slave最大物理offset,預設5秒彙報一次也代表了跟master之間的心跳檢測

讀取master向slave寫入commitlog的資料, master向slave寫入資料的格式是


Slave初始化DefaultMessageStore時候會構建ReputMessageService服務執行緒並在啟動儲存服務的start方法中被啟動

ReputMessageService的作用是slave從物理佇列(由commitlog檔案構成的MapedFileQueue)載入資料,並分發到各個邏輯佇列

HA同步複製, 當msg寫入master的commitlog檔案後,判斷maser的角色如果是同步雙寫SYNC_MASTER, 等待master同步到slave在返回結果

3 HA非同步複製