RocketMQ 源碼分析 RouteInfoManager(四)
在上一章分析了NamesrvController的構造函數時,會生成一個RouteInfoManager對象,該對象存放著整個消息集群的相關消息,所以這裏單獨拿出來分析。其實試想一下namesrv的功能不就是一個提供了通信功能的一個隊列嘛,而RouteInfoManager保留了所有信息的路由。所以要想弄明白RocketMQ,RouteInfoManager必須要攻下。
RouteInfoManager的構造函數
主要提供了topic、broker、cluster、liveBroker的路由信息。
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// 讀寫鎖
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic,以及對應的隊列信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 以Broker Name為單位的Broker集合
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群以及屬於該集群的Broker列表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 存活的Broker地址列表
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker對應的Filter Server列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
..........省略
topicQueueTable
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
1
存放了Topic的一個hashMap,在RocketMQ的插件中RocketMQ控制臺,可以看到Topic的列表,這個就是topicQueueTable的一個可視化:
查看QueueData的類信息:
public class QueueData implements Comparable<QueueData> {
// 隊列所屬的Broker名稱
private String brokerName;
// 讀隊列數量
private int readQueueNums;
// 寫隊列數量
private int writeQueueNums;
// Topic的讀寫權限(2是寫 4是讀 6是讀寫)
private int perm;
private int topicSynFlag;
點開一個一個topic的配置可以看到和上面類對應的變量:
clusterAddrTable
private final HashMap<String/* clusterName www.quwanyule157.com*/, Set<String/* brokerName */>> clusterAddrTable;
1
根據一個集群名,獲得對應的一組BrokerName的列表,在可視化界面就是如下:
雖然只得到了BrokerName,只要調用對應的接口就能得到Broker對應的屬性值。
brokerAddrTable
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
1
以Broker Name為Key,BrokerData為Value:
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
//同一個brokerName下可以有一個Master和多個Slave,所以brokerAddrs是一個集合
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。
brokerLiveTable
private final HashMap<String/*www.yongshiyule178.com brokerAddr */, BrokerLiveInfo> brokerLiveTable;
1
用於存放存活的Broker信息,BrokerLiveInfo:
class BrokerLiveInfo {
// 最後一次更新時間
private long lastUpdateTimestamp;
// 版本號
private DataVersion dataVersion;
// Netty的Channel
private Channel channel;
/**
* HA Broker的地址
* 是Slave從Master拉取數據時鏈接的地址,由brokerIp2+HA端口構成 */
private String haServerAddr;
registerBroker
上面的類的解釋可能還存在很多遺憾,下面以registerBroker方法,來實際操作一下上面提到類,看看註冊一個Broker發生了什麽:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
//TopicConfigSerializeWrapper比較復雜的數據結構,主要包含了broker上所有的topic信息
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock(www.feifanyule.cn/).lockInterruptibly(www.haom178.com);
// 更新集群信息(根據集群名字,獲取當前集群下面的所有brokerName)
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
// 如果當前集群下面brokerNames為空,則將當前請求broker加入到clusterAddrTable中
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//獲取所有的名稱為brokerNamed 的brokerData,brokerData保留著了。也就是說同一個BrokerName,有可能有多條的brokerId和broker address與之對應
// HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(www.michenggw.com clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs(www.wanmeiyuele.cn).put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 更新Topic信息
//如果topicConfigWrapper不為空,且當前brokerId == 0,即為當前broker為master
//這裏會判斷只有master才會創建QueueData,因為只有master才包含了讀寫隊列的信息
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// 如果Topic配置信息發生變更或者該broker為第一次註冊
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst)www.furggw.com {
// 獲取所有topic信息
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
// 遍歷所有Topic
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 根據brokername及topicconfig(read、write queue數量等)新增或者更新到topicQueueTable中
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新最後變更時間(將brokerLiveTable中保存的對應的broker的更新時間戳,設置為當前時間)
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 返回值(如果當前broker為slave節點)則將haServerAddr、masterAddr等信息設置到result返回值中
if (MixAll.MASTER_ID != brokerId) {
// 通過brokename的brokedate獲取當前slave節點的master節點addr
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
插入過程可以看如下這張圖:
疑惑
topicQueueTable有一點不明白,為什麽一個topic對應的是一個List,難道不是一個topic對應一個QueueData嗎?難道可以存在相同的topic?
因為每一個topic可以存在不同的broker中,不同的broker就有不同的QueueData。
brokerAddrTable根據一個brokername獲得一個BrokerData,可是為什麽一個BrokerData中有一個hashMap的brokerAddrs呢?我能理解的是一個brokername,會有多個address和id,但是cluster為什麽又是同一個呢?
這是因為你理解錯了,以為BrokerName是唯一的,實際上是指定多個broker為一個name,只不過指定了不同的id和addres來區分是master還是slave。
RocketMQ 源碼分析 RouteInfoManager(四)