1. 程式人生 > >RocketMQ中Broker的HA策略原始碼分析

RocketMQ中Broker的HA策略原始碼分析

Broker的HA策略分為兩部分
①同步元資料
②同步訊息資料

 

同步元資料

在Slave啟動時,會啟動一個定時任務用來從master同步元資料

 1 if (role == BrokerRole.SLAVE) {
 2     if (null != slaveSyncFuture) {
 3         slaveSyncFuture.cancel(false);
 4     }
 5     this.slaveSynchronize.setMasterAddr(null);
 6     slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 7         @Override
 8         public void run() {
 9             try {
10                 BrokerController.this.slaveSynchronize.syncAll();
11             }
12             catch (Throwable e) {
13                 log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
14             }
15         }
16     }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
17 } 

這裡設定了定時任務,執行slaveSynchronize的syncAll方法
可以注意在之前會通過setMasterAddr將Master的地址設為null,這是由於在後面會通過另一個定時任務registerBrokerAll來向NameServer獲取Master的地址,詳見:

【RocketMQ中Broker的啟動原始碼分析(二)】

 


SlaveSynchronize的syncAll方法:

1 public void syncAll() {
2     this.syncTopicConfig();
3     this.syncConsumerOffset();
4     this.syncDelayOffset();
5     this.syncSubscriptionGroupConfig();
6 }

這個方法會依次呼叫四個方法,來同步相應資訊:
syncTopicConfig:同步topic的配置資訊
syncConsumerOffset:同步Consumer的Offset資訊
syncDelayOffset:同步延遲佇列資訊
syncSubscriptionGroupConfig:同步訂閱資訊


由於這幾個方法的實現是類似的,這裡就只看下syncTopicConfig的實現:
syncTopicConfig方法:

 1 private void syncTopicConfig() {
 2     String masterAddrBak = this.masterAddr;
 3     if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
 4         try {
 5             TopicConfigSerializeWrapper topicWrapper =
 6                 this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
 7             if (!this.brokerController.getTopicConfigManager().getDataVersion()
 8                 .equals(topicWrapper.getDataVersion())) {
 9 
10                 this.brokerController.getTopicConfigManager().getDataVersion()
11                     .assignNewOne(topicWrapper.getDataVersion());
12                 this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
13                 this.brokerController.getTopicConfigManager().getTopicConfigTable()
14                     .putAll(topicWrapper.getTopicConfigTable());
15                 this.brokerController.getTopicConfigManager().persist();
16 
17                 log.info("Update slave topic config from master, {}", masterAddrBak);
18             }
19         } catch (Exception e) {
20             log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
21         }
22     }
23 }

這裡首先獲取master的地址masterAddr,由於registerBrokerAll定時任務的存在,即便這一次沒有獲取到masterAddr,只要節點中有master,總會在後面定時執行時從NameServer中獲取到


當獲取到master地址後,通過BrokerOuterAPI的getAllTopicConfig方法,向master請求
BrokerOuterAPI的getAllTopicConfig方法:

 1 public TopicConfigSerializeWrapper getAllTopicConfig(
 2     final String addr) throws RemotingConnectException, RemotingSendRequestException,
 3     RemotingTimeoutException, InterruptedException, MQBrokerException {
 4     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
 5 
 6     RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
 7     assert response != null;
 8     switch (response.getCode()) {
 9         case ResponseCode.SUCCESS: {
10             return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
11         }
12         default:
13             break;
14     }
15 
16     throw new MQBrokerException(response.getCode(), response.getRemark());
17 }

首先構建GET_ALL_TOPIC_CONFIG求情指令,然後通過remotingClient的invokeSync進行同步傳送,注意這裡會通過MixAll的brokerVIPChannel方法,得到對應的master地址的VIP通道地址,就是埠號減2,這在我之前的部落格中介紹過
有關同步傳送在  【RocketMQ中Producer訊息的傳送原始碼分析】 中詳細介紹過

 

請求傳送給master後,來看看master是怎麼處理的
master端在收到請求後會通過AdminBrokerProcessor的processRequest方法判別請求指令:

1 case RequestCode.GET_ALL_TOPIC_CONFIG:
2     return this.getAllTopicConfig(ctx, request);

執行getAllTopicConfig方法:

 1 private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
 2     final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
 3     // final GetAllTopicConfigResponseHeader responseHeader =
 4     // (GetAllTopicConfigResponseHeader) response.readCustomHeader();
 5 
 6     String content = this.brokerController.getTopicConfigManager().encode();
 7     if (content != null && content.length() > 0) {
 8         try {
 9             response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
10         } catch (UnsupportedEncodingException e) {
11             log.error("", e);
12 
13             response.setCode(ResponseCode.SYSTEM_ERROR);
14             response.setRemark("UnsupportedEncodingException " + e);
15             return response;
16         }
17     } else {
18         log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress());
19         response.setCode(ResponseCode.SYSTEM_ERROR);
20         response.setRemark("No topic in this broker");
21         return response;
22     }
23 
24     response.setCode(ResponseCode.SUCCESS);
25     response.setRemark(null);
26 
27     return response;
28 }

這裡會將TopicConfigManager中儲存的topicConfigTable:

1 private final ConcurrentMap<String, TopicConfig> topicConfigTable =
2         new ConcurrentHashMap<String, TopicConfig>(1024);

將這個map通過encode方法轉換成json字串,再通過Netty傳送給slave


回到slave中,在同步傳送的情況下,會等待會送響應,收到響應後:

1 switch (response.getCode()) {
2     case ResponseCode.SUCCESS: {
3         return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
4     }
5     default:
6         break;
7 }

通過decode解碼,將json字串轉換為map封裝在 TopicConfigSerializeWrapper中


回到syncTopicConfig方法中:
得到TopicConfigSerializeWrapper例項後

 1 if (!this.brokerController.getTopicConfigManager().getDataVersion()
 2     .equals(topicWrapper.getDataVersion())) {
 3 
 4     this.brokerController.getTopicConfigManager().getDataVersion()
 5         .assignNewOne(topicWrapper.getDataVersion());
 6     this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
 7     this.brokerController.getTopicConfigManager().getTopicConfigTable()
 8         .putAll(topicWrapper.getTopicConfigTable());
 9     this.brokerController.getTopicConfigManager().persist();
10 
11     log.info("Update slave topic config from master, {}", masterAddrBak);
12 }

判斷版本是否一致,若不一致,會進行替換,這樣slave的Topic配置資訊就和master保持同步了

其他三種資訊的同步同理

 

 同步訊息資料

在master啟動時,會通過JDK的NIO方式啟動一個HA服務執行緒,用以處理slave的連線:

 1 public void run() {
 2     log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         try {
 6             this.selector.select(1000);
 7             Set<SelectionKey> selected = this.selector.selectedKeys();
 8 
 9             if (selected != null) {
10                 for (SelectionKey k : selected) {
11                     if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
12                         SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
13 
14                         if (sc != null) {
15                             HAService.log.info("HAService receive new connection, "
16                                 + sc.socket().getRemoteSocketAddress());
17 
18                             try {
19                                 HAConnection conn = new HAConnection(HAService.this, sc);
20                                 conn.start();
21                                 HAService.this.addConnection(conn);
22                             } catch (Exception e) {
23                                 log.error("new HAConnection exception", e);
24                                 sc.close();
25                             }
26                         }
27                     } else {
28                         log.warn("Unexpected ops in select " + k.readyOps());
29                     }
30                 }
31 
32                 selected.clear();
33             }
34         } catch (Exception e) {
35             log.error(this.getServiceName() + " service has exception.", e);
36         }
37     }
38 
39     log.info(this.getServiceName() + " service end");
40 }

這裡就是非常典型的JDK NIO的使用,在偵聽到連線取得SocketChannel後,將其封裝為HAConnection

 1 public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
 2     this.haService = haService;
 3     this.socketChannel = socketChannel;
 4     this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
 5     this.socketChannel.configureBlocking(false);
 6     this.socketChannel.socket().setSoLinger(false, -1);
 7     this.socketChannel.socket().setTcpNoDelay(true);
 8     this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
 9     this.socketChannel.socket().setSendBufferSize(1024 * 64);
10     this.writeSocketService = new WriteSocketService(this.socketChannel);
11     this.readSocketService = new ReadSocketService(this.socketChannel);
12     this.haService.getConnectionCount().incrementAndGet();
13 }

在構造方法內進行了對socketChannel的一些配置,還建立了一個WriteSocketService和一個ReadSocketService,這兩個是後續處理訊息同步的基礎


在建立完HAConnection後,呼叫其start方法:

1 public void start() {
2     this.readSocketService.start();
3     this.writeSocketService.start();
4 }

這裡會啟動兩個執行緒,分別處理讀取slave傳送的資料,以及向slave傳送資料

 


到這裡,先不急著分析master了,來看看slave端
slave在啟動時,會啟動HAClient的執行緒:

 1 public void run() {
 2     log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         try {
 6             if (this.connectMaster()) {
 7 
 8                 if (this.isTimeToReportOffset()) {
 9                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
10                     if (!result) {
11                         this.closeMaster();
12                     }
13                 }
14 
15                 this.selector.select(1000);
16 
17                 boolean ok = this.processReadEvent();
18                 if (!ok) {
19                     this.closeMaster();
20                 }
21 
22                 if (!reportSlaveMaxOffsetPlus()) {
23                     continue;
24                 }
25 
26                 long interval =
27                     HAService.this.getDefaultMessageStore().getSystemClock().now()
28                         - this.lastWriteTimestamp;
29                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
30                     .getHaHousekeepingInterval()) {
31                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
32                         + "] expired, " + interval);
33                     this.closeMaster();
34                     log.warn("HAClient, master not response some time, so close connection");
35                 }
36             } else {
37                 this.waitForRunning(1000 * 5);
38             }
39         } catch (Exception e) {
40             log.warn(this.getServiceName() + " service has exception. ", e);
41             this.waitForRunning(1000 * 5);
42         }
43     }
44 
45     log.info(this.getServiceName() + " service end");
46 }

在這個while迴圈中,首先通過connectMaster檢查是否和master連線了


connectMaster方法:

 1 private boolean connectMaster() throws ClosedChannelException {
 2     if (null == socketChannel) {
 3         String addr = this.masterAddress.get();
 4         if (addr != null) {
 5 
 6             SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
 7             if (socketAddress != null) {
 8                 this.socketChannel = RemotingUtil.connect(socketAddress);
 9                 if (this.socketChannel != null) {
10                     this.socketChannel.register(this.selector, SelectionKey.OP_READ);
11                 }
12             }
13         }
14 
15         this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
16 
17         this.lastWriteTimestamp = System.currentTimeMillis();
18     }
19 
20     return this.socketChannel != null;
21 }

若是socketChannel為null,意味著並沒有產生連線,或者連線斷開
需要重新根據masterAddress建立網路連線

只要是需要建立連線,都需要通過defaultMessageStore的getMaxPhyOffset方法,獲取本地最大的Offset,由currentReportedOffset儲存,後續用於向master報告;以及儲存了一個時間戳lastWriteTimestamp,用於之後的校對


當確保與master的連線建立成功後,通過isTimeToReportOffset方法,檢查是否需要向master報告當前的最大Offset

isTimeToReportOffset方法:

1 private boolean isTimeToReportOffset() {
2     long interval =
3         HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
4     boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
5         .getHaSendHeartbeatInterval();
6 
7     return needHeart;
8 }

這裡就通過lastWriteTimestamp和當前時間檢查,判斷是否達到了報告時間間隔HaSendHeartbeatInterval,預設5s


若是達到了,就需要通過reportSlaveMaxOffset方法,將記錄的currentReportedOffset這個最大的offset傳送給master


reportSlaveMaxOffset方法:

 1 private boolean reportSlaveMaxOffset(final long maxOffset) {
 2     this.reportOffset.position(0);
 3     this.reportOffset.limit(8);
 4     this.reportOffset.putLong(maxOffset);
 5     this.reportOffset.position(0);
 6     this.reportOffset.limit(8);
 7 
 8     for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
 9         try {
10             this.socketChannel.write(this.reportOffset);
11         } catch (IOException e) {
12             log.error(this.getServiceName()
13                 + "reportSlaveMaxOffset this.socketChannel.write exception", e);
14             return false;
15         }
16     }
17 
18     return !this.reportOffset.hasRemaining();
19 }

其中reportOffset是專門用來快取offset的ByteBuffer

1 private final ByteBuffer reportOffset = ByteBuffer.allocate(8);

將maxOffset存放在reportOffset中,然後通過socketChannel的write方法,完成向master的傳送

其中hasRemaining方法用來檢查當前位置是否已經達到緩衝區極限limit,確保reportOffset 中的內容能被完全傳送出去

傳送成功後,會呼叫selector的select方法,在超時時間內進行NIO的輪詢,等待master的回送


通過這我們可以看出slave在和master建立連線後,會定時向master報告自己當前的offset

 


來看看master收到offset後是如何處理的:

在master端會通過前面提到的ReadSocketService執行緒進行處理:

 1 public void run() {
 2     HAConnection.log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         try {
 6             this.selector.select(1000);
 7             boolean ok = this.processReadEvent();
 8             if (!ok) {
 9                 HAConnection.log.error("processReadEvent error");
10                 break;
11             }
12 
13             long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
14             if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
15                 log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
16                 break;
17             }
18         } catch (Exception e) {
19             HAConnection.log.error(this.getServiceName() + " service has exception.", e);
20             break;
21         }
22     }
23 
24     this.makeStop();
25 
26     writeSocketService.makeStop();
27 
28     haService.removeConnection(HAConnection.this);
29 
30     HAConnection.this.haService.getConnectionCount().decrementAndGet();
31 
32     SelectionKey sk = this.socketChannel.keyFor(this.selector);
33     if (sk != null) {
34         sk.cancel();
35     }
36 
37     try {
38         this.selector.close();
39         this.socketChannel.close();
40     } catch (IOException e) {
41         HAConnection.log.error("", e);
42     }
43 
44     HAConnection.log.info(this.getServiceName() + " service end");
45 }

這裡的while迴圈中首先也是通過selector的select方法,在超時時間內進行NIO的輪詢

輪詢結束後的進一步的處理由processReadEvent來完成:

 1 private boolean processReadEvent() {
 2         int readSizeZeroTimes = 0;
 3 
 4         if (!this.byteBufferRead.hasRemaining()) {
 5             this.byteBufferRead.flip();
 6             this.processPostion = 0;
 7         }
 8 
 9         while (this.byteBufferRead.hasRemaining()) {
10             try {
11                 int readSize = this.socketChannel.read(this.byteBufferRead);
12                 if (readSize > 0) {
13                     readSizeZeroTimes = 0;
14                     this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
15                     if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
16                         int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
17                         long readOffset = this.byteBufferRead.getLong(pos - 8);
18                         this.processPostion = pos;
19 
20                         HAConnection.this.slaveAckOffset = readOffset;
21                         if (HAConnection.this.slaveRequestOffset < 0) {
22                             HAConnection.this.slaveRequestOffset = readOffset;
23                             log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
24                         }
25 
26                         HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
27                     }
28                 } else if (readSize == 0) {
29                     if (++readSizeZeroTimes >= 3) {
30                         break;
31                     }
32                 } else {
33                     log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
34                     return false;
35                 }
36             } catch (IOException e) {
37                 log.error("processReadEvent exception", e);
38                 return false;
39             }
40         }
41 
42         return true;
43     }
44 }

這個方法其實就是通過socketChannel的read方法,將slave傳送過來的資料存入byteBufferRead中
在確保傳送過來的資料能達到8位元組時,取出long型別的offset值,然後交給HAConnection的slaveAckOffset成員進行儲存

其中slaveRequestOffset是用來處理第一次連線時的同步

notifyTransferSome方法是作為同步master時,進行相應的喚醒操作,非同步master則沒有要求,在後面具體分析


也就是說ReadSocketService這個執行緒,只是不斷地讀取並更新slave傳送來的offset資料

 

再來看看WriteSocketService執行緒是如何進行向slave的傳送:

  1 public void run() {
  2     HAConnection.log.info(this.getServiceName() + " service started");
  3 
  4     while (!this.isStopped()) {
  5         try {
  6             this.selector.select(1000);
  7 
  8             if (-1 == HAConnection.this.slaveRequestOffset) {
  9                 Thread.sleep(10);
 10                 continue;
 11             }
 12 
 13             if (-1 == this.nextTransferFromWhere) {
 14                 if (0 == HAConnection.this.slaveRequestOffset) {
 15                     long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
 16                     masterOffset =
 17                         masterOffset
 18                             - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
 19                             .getMapedFileSizeCommitLog());
 20 
 21                     if (masterOffset < 0) {
 22                         masterOffset = 0;
 23                     }
 24 
 25                     this.nextTransferFromWhere = masterOffset;
 26                 } else {
 27                     this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
 28                 }
 29 
 30                 log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
 31                     + "], and slave request " + HAConnection.this.slaveRequestOffset);
 32             }
 33 
 34             if (this.lastWriteOver) {
 35 
 36                 long interval =
 37                     HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
 38 
 39                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
 40                     .getHaSendHeartbeatInterval()) {
 41 
 42                     // Build Header
 43                     this.byteBufferHeader.position(0);
 44                     this.byteBufferHeader.limit(headerSize);
 45                     this.byteBufferHeader.putLong(this.nextTransferFromWhere);
 46                     this.byteBufferHeader.putInt(0);
 47                     this.byteBufferHeader.flip();
 48 
 49                     this.lastWriteOver = this.transferData();
 50                     if (!this.lastWriteOver)
 51                         continue;
 52                 }
 53             } else {
 54                 this.lastWriteOver = this.transferData();
 55                 if (!this.lastWriteOver)
 56                     continue;
 57             }
 58 
 59             SelectMappedBufferResult selectResult =
 60                 HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
 61             if (selectResult != null) {
 62                 int size = selectResult.getSize();
 63                 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
 64                     size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
 65                 }
 66 
 67                 long thisOffset = this.nextTransferFromWhere;
 68                 this.nextTransferFromWhere += size;
 69 
 70                 selectResult.getByteBuffer().limit(size);
 71                 this.selectMappedBufferResult = selectResult;
 72 
 73                 // Build Header
 74                 this.byteBufferHeader.position(0);
 75                 this.byteBufferHeader.limit(headerSize);
 76                 this.byteBufferHeader.putLong(thisOffset);
 77                 this.byteBufferHeader.putInt(size);
 78                 this.byteBufferHeader.flip();
 79 
 80                 this.lastWriteOver = this.transferData();
 81             } else {
 82 
 83                 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
 84             }
 85         } catch (Exception e) {
 86 
 87             HAConnection.log.error(this.getServiceName() + " service has exception.", e);
 88             break;
 89         }
 90     }
 91 
 92     HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
 93 
 94     if (this.selectMappedBufferResult != null) {
 95         this.selectMappedBufferResult.release();
 96     }
 97 
 98     this.makeStop();
 99 
100     readSocketService.makeStop();
101 
102     haService.removeConnection(HAConnection.this);
103 
104     SelectionKey sk = this.socketChannel.keyFor(this.selector);
105     if (sk != null) {
106         sk.cancel();
107     }
108 
109     try {
110         this.selector.close();
111         this.socketChannel.close();
112     } catch (IOException e) {
113         HAConnection.log.error("", e);
114     }
115 
116     HAConnection.log.info(this.getServiceName() + " service end");
117 }

這裡一開始會對slaveRequestOffset進行一次判斷,當且僅當slaveRequestOffset初始化的時候是才是-1

也就是說當slave還沒有傳送過來offset時,WriteSocketService執行緒只會乾等

當slave傳送來offset後
首先對nextTransferFromWhere進行了判斷,nextTransferFromWhere和slaveRequestOffset一樣,在初始化的時候為-1
也就代表著master和slave剛剛建立連線,並沒有進行過一次訊息的同步!

此時會對修改了的slaveRequestOffset進行判斷
若是等於0,說明slave沒有任何訊息的歷史記錄,那麼此時master會取得自身的MaxOffset,根據這個MaxOffset,通過:

1 masterOffset =  masterOffset
2                 - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
3                 .getMapedFileSizeCommitLog() /* 1G */);

計算出最後一個檔案開始的offset
也就是說,當slave沒有訊息的歷史記錄,master只會從本地最後一個CommitLog檔案開始的地方,將訊息資料傳送給slave


若是slave有資料,就從slave傳送來的offset的位置起,進行傳送,通過nextTransferFromWhere記錄這個offset值


接著對lastWriteOver進行了判斷,lastWriteOver是一個狀態量,用來表示上次傳送是否傳輸完畢,初始化是true


若是true,這裡會進行一次時間檢查,lastWriteTimestamp記錄最後一次傳送的時間
一次來判斷是否超過了時間間隔haSendHeartbeatInterval(預設5s)
也就是說至少有5s,master沒有向slave傳送任何訊息
那麼此時就會發送一個心跳包

其中byteBufferHeader是一個12位元組的ByteBuffer:

1 private final int headerSize = 8 + 4;
2 private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);

這裡就簡單地構造了一個心跳包,後續通過transferData方法來完成資料的傳送


若是 lastWriteOver為false,則表示上次資料沒有傳送完,就需要通過transferData方法,將剩餘資料繼續傳送,只要沒傳送完,只會重複迴圈,直到發完


先繼續往下看,下面就是傳送具體的訊息資料了:
首先根據nextTransferFromWhere,也就是剛才儲存的offset,通過DefaultMessageStore的getCommitLogData方法,其實際上呼叫的是CommitLog的getData方法,這個方法在

【RocketMQ中Broker的啟動原始碼分析(二)】中關於訊息排程(ReputMessageService)時詳細介紹過


根據offset找到對應的CommitLog檔案,將其從offset對應起始處所有資料讀入ByteBuffer中,由SelectMappedBufferResult封裝


這裡若是master已將將所有本地資料同步給了slave,那麼得到的SelectMappedBufferResult就會為null,會呼叫:

1 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);

將自身阻塞,超時等待100ms,要麼一直等到超時時間到了,要麼就會在後面所講的同步雙傳中被同步master喚醒

 

在得到SelectMappedBufferResult後,這裡會對讀取到的資料大小進行一次判斷,若是大於haTransferBatchSize(預設32K),將size改為32K,實際上就是對傳送資料大小的限制,大於32K會切割,每次最多隻允許傳送32k


通過thisOffset記錄nextTransferFromWhere即offset
更新nextTransferFromWhere值,以便下一次定位
還會將讀取到的資料結果selectResult交給selectMappedBufferResult儲存

然後構建訊息頭,這裡就和心跳包格式一樣,前八位元組存放offset,後四位元組存放資料大小


最後呼叫transferData方法,進行傳送:

 1 private boolean transferData() throws Exception {
 2     int writeSizeZeroTimes = 0;
 3     // Write Header
 4     while (this.byteBufferHeader.hasRemaining()) {
 5         int writeSize = this.socketChannel.write(this.byteBufferHeader);
 6         if (writeSize > 0) {
 7             writeSizeZeroTimes = 0;
 8             this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
 9         } else if (writeSize == 0) {
10             if (++writeSizeZeroTimes >= 3) {
11                 break;
12             }
13         } else {
14             throw new Exception("ha master write header error < 0");
15         }
16     }
17 
18     if (null == this.selectMappedBufferResult) {
19         return !this.byteBufferHeader.hasRemaining();
20     }
21 
22     writeSizeZeroTimes = 0;
23 
24     // Write Body
25     if (!this.byteBufferHeader.hasRemaining()) {
26         while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
27             int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
28             if (writeSize > 0) {
29                 writeSizeZeroTimes = 0;
30                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
31             } else if (writeSize == 0) {
32                 if (++writeSizeZeroTimes >= 3) {
33                     break;
34                 }
35             } else {
36                 throw new Exception("ha master write body error < 0");
37             }
38         }
39     }
40 
41     boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
42 
43     if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
44         this.selectMappedBufferResult.release();
45         this.selectMappedBufferResult = null;
46     }
47 
48     return result;
49 }

首先將byteBufferHeader中的12位元組訊息頭通過socketChannel的write方法傳送出去
然後將selectMappedBufferResult中的ByteBuffer的訊息資料傳送出去

若是selectMappedBufferResult等於null,說明是心跳包,只發送訊息頭
無論傳送什麼都會將時間記錄在lastWriteTimestamp中,以便後續傳送心跳包的判斷


看到這裡其實就會發現WriteSocketService執行緒開啟後,只要slave向master發出了第一個offset後,WriteSocketService執行緒都會不斷地將對應位置自己本地的CommitLog檔案中的內容傳送給slave,直到完全同步後,WriteSocketService執行緒才會稍微緩緩,進入阻塞100ms以及每隔五秒發一次心跳包的狀態

但是隻要當Producer向master傳送來訊息後,由刷盤執行緒完成持久化後,WriteSocketService執行緒又會忙碌起來,此時也才是體現同步雙寫和非同步複製的時候

先不急著說這個,來看看slave接收到訊息是如何處理的:


是在HAClient的執行緒中的processReadEvent方法處理的:

 1 private boolean processReadEvent() {
 2     int readSizeZeroTimes = 0;
 3     while (this.byteBufferRead.hasRemaining()) {
 4         try {
 5             int readSize = this.socketChannel.read(this.byteBufferRead);
 6             if (readSize > 0) {
 7                 lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
 8                 readSizeZeroTimes = 0;
 9                 boolean result = this.dispatchReadRequest();
10                 if (!result) {
11                     log.error("HAClient, dispatchReadRequest error");
12                     return false;
13                 }
14             } else if (readSize == 0) {
15                 if (++readSizeZeroTimes >= 3) {
16                     break;
17                 }
18             } else {
19                 log.info("HAClient, processReadEvent read socket < 0");
20                 return false;
21             }
22         } catch (IOException e) {
23             log.info("HAClient, processReadEvent read socket exception", e);
24             return false;
25         }
26     }
27 
28     return true;
29 }

在socketChannel通過read方法將master傳送的資料讀取到byteBufferRead緩衝區後,由dispatchReadRequest方法做進一步處理


dispatchReadRequest方法:

 1 private boolean dispatchReadRequest() {
 2     final int msgHeaderSize = 8 + 4; // phyoffset + size
 3     int readSocketPos = this.byteBufferRead.position();
 4 
 5     while (true) {
 6         int diff = this.byteBufferRead.position() - this.dispatchPostion;
 7         if (diff >= msgHeaderSize) {
 8             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
 9             int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
10 
11             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
12 
13             if (slavePhyOffset != 0) {
14                 if (slavePhyOffset != masterPhyOffset) {
15                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
16                         + slavePhyOffset + " MASTER: " + masterPhyOffset);
17                     return false;
18                 }
19             }
20 
21             if (diff >= (msgHeaderSize + bodySize)) {
22                 byte[] bodyData = new byte[bodySize];
23                 this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
24                 this.byteBufferRead.get(bodyData);
25 
26                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
27 
28                 this.byteBufferRead.position(readSocketPos);
29                 this.dispatchPostion += msgHeaderSize + bodySize;
30 
31                 if (!reportSlaveMaxOffsetPlus()) {
32                     return false;
33                 }
34 
35                 continue;
36             }
37         }
38 
39         if (!this.byteBufferRead.hasRemaining()) {
40             this.reallocateByteBuffer();
41         }
42 
43         break;
44     }
45 
46     return true;
47 }

這裡就首先將12位元組的訊息頭取出來
masterPhyOffset:8位元組offset ,bodySize :4位元組訊息大小
根據master發來的masterPhyOffset會和自己本地的slavePhyOffset進行校驗,以便安全備份


之後就會將byteBufferRead中存放在訊息頭後面的訊息資料取出來,呼叫appendToCommitLog方法持久化到的CommitLog中

 1 public boolean appendToCommitLog(long startOffset, byte[] data) {
 2     if (this.shutdown) {
 3         log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
 4         return false;
 5     }
 6 
 7     boolean result = this.commitLog.appendData(startOffset, data);
 8     if (result) {
 9         this.reputMessageService.wakeup();
10     } else {
11         log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
12     }
13 
14     return result;
15 }

實際上呼叫了commitLog的appendData方法將其寫入磁碟,這個方法我在前面部落格中介紹過

【RocketMQ中Broker的刷盤原始碼分析】


在完成寫入後,需要喚醒reputMessageService訊息排程,以便Consumer的消費
關於訊息排程詳見  【RocketMQ中Broker的啟動原始碼分析(二)】


當然前面說過master還會發送心跳訊息,但這裡明顯沒對心跳訊息進行處理,只是appendToCommitLog呼叫時,傳入了一個大小為0的byte陣列,顯然有些不合理,想不通


在完成後,還會呼叫reportSlaveMaxOffsetPlus方法:

 1 private boolean reportSlaveMaxOffsetPlus() {
 2     boolean result = true;
 3     long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
 4     if (currentPhyOffset > this.currentReportedOffset) {
 5         this.currentReportedOffset = currentPhyOffset;
 6         result = this.reportSlaveMaxOffset(this.currentReportedOffset);
 7         if (!result) {
 8             this.closeMaster();
 9             log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
10         }
11     }
12 
13     return result;
14 }

由於完成了寫入,那麼此時獲取到的offset肯定比currentReportedOffset中儲存的大,然後再次通過reportSlaveMaxOffset方法,將當前的offset報告給master


這其實上已經完成了非同步master的非同步複製過程


再來看看同步雙寫是如何實現的:
和刷盤一樣,都是在Producer傳送完訊息,Broker進行完訊息的儲存後進行的

【RocketMQ中Broker的訊息儲存原始碼分析】

 

在CommitLog的handleHA方法:

 1 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
 3         HAService service = this.defaultMessageStore.getHaService();
 4         if (messageExt.isWaitStoreMsgOK()) {
 5             // Determine whether to wait
 6             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
 7                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 8                 service.putRequest(request);
 9                 service.getWaitNotifyObject().wakeupAll();
10                 boolean flushOK =
11                     request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
12                 if (!flushOK) {
13                     log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
14                         + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
15                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
16                 }
17             }
18             // Slave problem
19             else {
20                 // Tell the producer, slave not available
21                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
22             }
23         }
24     }
25 
26 }

這裡就會檢查Broker的型別,看以看到只對SYNC_MASTER即同步master進行了操作

這個操作過程其實就和同步刷盤類似

【RocketMQ中Broker的刷盤原始碼分析】

 

根據Offset+WroteBytes建立一條記錄GroupCommitRequest,然後會將新增在List中
然後呼叫getWaitNotifyObject的wakeupAll方法,把阻塞中的所有WriteSocketService執行緒喚醒
因為master和slave是一對多的關係,那麼這裡就會有多個slave連線,也就有多個WriteSocketService執行緒,保證訊息能同步到所有slave中

在喚醒WriteSocketService執行緒工作後,呼叫request的waitForFlush方法,將自身阻塞,預示著同步複製的真正開啟


在HAService開啟時,還開啟了一個GroupTransferService執行緒:

 1 public void run() {
 2     log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         try {
 6             this.waitForRunning(10);
 7             this.doWaitTransfer();
 8         } catch (Exception e) {
 9             log.warn(this.getServiceName() + " service has exception. ", e);
10         }
11     }
12 
13     log.info(this.getServiceName() + " service end");
14 }

這裡的工作原理和同步刷盤GroupCommitService基本一致,相似的地方我就不仔細分析了


GroupTransferService同樣儲存兩張List:

1 private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
2 private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();

由這兩張List做一個類似JVM新生代的複製演算法
在handleHA方法中,就會將建立的GroupCommitRequest記錄新增在requestsWrite這個List中


其中doWaitTransfer方法:

 1 private void doWaitTransfer() {
 2     synchronized (this.requestsRead) {
 3         if (!this.requestsRead.isEmpty()) {
 4             for (CommitLog.GroupCommitRequest req : this.requestsRead) {
 5                 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
 6                 for (int i = 0; !transferOK && i < 5; i++) {
 7                     this.notifyTransferObject.waitForRunning(1000);
 8                     transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
 9                 }
10 
11                 if (!transferOK) {
12                     log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
13                 }
14 
15                 req.wakeupCustomer(transferOK);
16             }
17 
18             this.requestsRead.clear();
19         }
20     }
21 }

和刷盤一樣,這裡會通過複製演算法,將requestsWrite和requestsRead進行替換,那麼這裡的requestsRead實際上就存放著剛才新增的記錄


首先取出記錄中的NextOffset和push2SlaveMaxOffset比較

push2SlaveMaxOffset值是通過slave傳送過來的,在之前說過的ReadSocketService執行緒中的:

1 HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);

notifyTransferSome方法:

 1 public void notifyTransferSome(final long offset) {
 2     for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
 3         boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
 4         if (ok) {
 5             this.groupTransferService.notifyTransferSome();
 6             break;
 7         } else {
 8             value = this.push2SlaveMaxOffset.get();
 9         }
10     }
11 }

即便也多個slave連線,這裡的push2SlaveMaxOffset永遠會記錄最大的那個offset


所以在doWaitTransfer中,根據當前NextOffset(完成寫入後master本地的offset),進行判斷

其實這裡主要要考慮到WriteSocketService執行緒的工作原理,只要本地檔案有更新,那麼就會向slave傳送資料,所以這裡由於HA同步是發生在刷盤後的,那麼就有可能在這個doWaitTransfer執行前,有slave已經將資料進行了同步,並且向master報告了自己offset,更新了push2SlaveMaxOffset的值

那麼

1 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
2 ```

這個判斷就會為真,意味著節點中已經有了備份,所以就會直接呼叫

1 req.wakeupCustomer(transferOK);

以此來喚醒剛才在handleHA方法中的阻塞


若是判斷為假,就說明沒有一個slave完成同步,就需要

1 for (int i = 0; !transferOK && i < 5; i++) {
2     this.notifyTransferObject.waitForRunning(1000);
3     transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
4 }

通過waitForRunning進行阻塞,超時等待,最多五次等待,超過時間會向Producer傳送FLUSH_SLAVE_TIMEOUT


若是在超時時間內,有slave完成了同步,並向master傳送了offset後,在notifyTransferSome方法中:

 1 public void notifyTransferSome(final long offset) {
 2     for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
 3         boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
 4         if (ok) {
 5             this.groupTransferService.notifyTransferSome();
 6             break;
 7         } else {
 8             value = this.push2SlaveMaxOffset.get();
 9         }
10     }
11 }

就會更新push2SlaveMaxOffset,並通過notifyTransferSome喚醒上面所說的阻塞

然後再次判斷push2SlaveMaxOffset和getNextOffset
成功後喚醒剛才在handleHA方法中的阻塞,同步master的主從複製也就結束
由於同步master的刷盤是在主從複製前發生的,所以同步雙寫意味著master和slave都會完成訊息的持久化

 

至此,RocketMQ中Broker的HA策略分析到此