1. 程式人生 > >分散式訊息佇列RocketMQ原始碼分析之2 -- Broker與NameServer心跳機制

分散式訊息佇列RocketMQ原始碼分析之2 -- Broker與NameServer心跳機制

我們知道,Kafka是通過ZK的臨時節點來監測Broker的死亡的。當一個Broker掛了之後,ZK上面對應的臨時節點被刪除,同時其他Broker收到通知。

那麼在RocketMQ中,對應的NameServer是如何判斷一個Broker的死亡呢?

有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。
或掃描如下二維碼:
這裡寫圖片描述

NameSrv監測Broker的死亡

機制之一:監測連線斷掉

當Broker和NameSrv之間的長連線斷掉之後,下面的ChannelEventListener裡面的函式就會被回撥,從而觸發NameServer的路由資訊更新。

這裡的ChannelEventListener是RocketMQ封裝Netty向外暴露的一個介面層。

//NameSrv裡面的BrokerHouseKeepingService
public class BrokerHousekeepingService implements ChannelEventListener {
。。。

    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
public void onChannelException(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } }

機制之二:心跳

首先,每個Broker會每隔30s向NameSrv更新自身topic資訊

//BrokerController.start()
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

NameServer收到RegisterBroker資訊,更新自己的brokerLiveTable結構

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

然後NameServer會每10s,掃描一次這個結構。如果發現上次更新時間距離當前時間超過了BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2(2分鐘),則認為此broker死亡。

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {  //上1次更新時間距離當前時間小於2分鐘
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

Producer/Consumer如何得知Broker死亡

當某個Broker死亡之後,NameSrv並不會主動通知Producer和Consumer。

而是Producer/Consumer週期性的去NameSrv取。

//MQClientInstance的startScheduledTask()函式程式碼

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);

這裡的pollNameServerInteval預設是30s。這也就是意味著,預設情況下,當某個Broker掛了之後,Client需要30s的延遲才會得知此訊息。

 private int pollNameServerInteval = 1000 * 30;