分散式訊息佇列RocketMQ原始碼分析之2 -- Broker與NameServer心跳機制
阿新 • • 發佈:2019-02-03
我們知道,Kafka是通過ZK的臨時節點來監測Broker的死亡的。當一個Broker掛了之後,ZK上面對應的臨時節點被刪除,同時其他Broker收到通知。
那麼在RocketMQ中,對應的NameServer是如何判斷一個Broker的死亡呢?
有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。
或掃描如下二維碼:
NameSrv監測Broker的死亡
機制之一:監測連線斷掉
當Broker和NameSrv之間的長連線斷掉之後,下面的ChannelEventListener裡面的函式就會被回撥,從而觸發NameServer的路由資訊更新。
這裡的ChannelEventListener是RocketMQ封裝Netty向外暴露的一個介面層。
//NameSrv裡面的BrokerHouseKeepingService
public class BrokerHousekeepingService implements ChannelEventListener {
。。。
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
機制之二:心跳
首先,每個Broker會每隔30s向NameSrv更新自身topic資訊
//BrokerController.start()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
NameServer收到RegisterBroker資訊,更新自己的brokerLiveTable結構
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
然後NameServer會每10s,掃描一次這個結構。如果發現上次更新時間距離當前時間超過了BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2(2分鐘),則認為此broker死亡。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //上1次更新時間距離當前時間小於2分鐘
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
Producer/Consumer如何得知Broker死亡
當某個Broker死亡之後,NameSrv並不會主動通知Producer和Consumer。
而是Producer/Consumer週期性的去NameSrv取。
//MQClientInstance的startScheduledTask()函式程式碼
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
這裡的pollNameServerInteval預設是30s。這也就是意味著,預設情況下,當某個Broker掛了之後,Client需要30s的延遲才會得知此訊息。
private int pollNameServerInteval = 1000 * 30;