1. 程式人生 > >RocketMQ 源碼分析 RouteInfoManager(四)

RocketMQ 源碼分析 RouteInfoManager(四)

uwa iyu 函數 配置 rfi 查看 時間戳 actor rocket

  在上一章分析了NamesrvController的構造函數時,會生成一個RouteInfoManager對象,該對象存放著整個消息集群的相關消息,所以這裏單獨拿出來分析。其實試想一下namesrv的功能不就是一個提供了通信功能的一個隊列嘛,而RouteInfoManager保留了所有信息的路由。所以要想弄明白RocketMQ,RouteInfoManager必須要攻下。
  
  RouteInfoManager的構造函數
  
  主要提供了topic、broker、cluster、liveBroker的路由信息。
  
  public class RouteInfoManager {
  
  private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  
  private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
  
  // 讀寫鎖
  
  private final ReadWriteLock lock = new ReentrantReadWriteLock();
  
  // Topic,以及對應的隊列信息
  
  private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
  
  // 以Broker Name為單位的Broker集合
  
  private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  
  // 集群以及屬於該集群的Broker列表
  
  private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
  
  // 存活的Broker地址列表
  
  private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  
  // Broker對應的Filter Server列表
  
  private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  
  public RouteInfoManager() {
  
  this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
  
  this.brokerAddrTable = new HashMap<String, BrokerData>(128);
  
  this.clusterAddrTable = new HashMap<String, Set<String>>(32);
  
  this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
  
  this.filterServerTable = new HashMap<String, List<String>>(256);
  
  }
  
  ..........省略
  
  topicQueueTable
  
  private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
  
  1
  
  存放了Topic的一個hashMap,在RocketMQ的插件中RocketMQ控制臺,可以看到Topic的列表,這個就是topicQueueTable的一個可視化:
  
  查看QueueData的類信息:
  
  public class QueueData implements Comparable<QueueData> {
  
  // 隊列所屬的Broker名稱
  
  private String brokerName;
  
  // 讀隊列數量
  
  private int readQueueNums;
  
  // 寫隊列數量
  
  private int writeQueueNums;
  
  // Topic的讀寫權限(2是寫 4是讀 6是讀寫)
  
  private int perm;
  
  private int topicSynFlag;
  
  點開一個一個topic的配置可以看到和上面類對應的變量:
  
  clusterAddrTable
  
  private final HashMap<String/* clusterName www.quwanyule157.com*/, Set<String/* brokerName */>> clusterAddrTable;
  
  1
  
  根據一個集群名,獲得對應的一組BrokerName的列表,在可視化界面就是如下:
  
  雖然只得到了BrokerName,只要調用對應的接口就能得到Broker對應的屬性值。
  
  brokerAddrTable
  
  private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  
  1
  
  以Broker Name為Key,BrokerData為Value:
  
  public class BrokerData implements Comparable<BrokerData> {
  
  private String cluster;
  
  private String brokerName;
  
  //同一個brokerName下可以有一個Master和多個Slave,所以brokerAddrs是一個集合
  
  private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
  
  private final Random random = new Random();
  
  Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。
  
  brokerLiveTable
  
  private final HashMap<String/*www.yongshiyule178.com brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  
  1
  
  用於存放存活的Broker信息,BrokerLiveInfo:
  
  class BrokerLiveInfo {
  
  // 最後一次更新時間
  
  private long lastUpdateTimestamp;
  
  // 版本號
  
  private DataVersion dataVersion;
  
  // Netty的Channel
  
  private Channel channel;
  
  /**
  
  * HA Broker的地址
  
  * 是Slave從Master拉取數據時鏈接的地址,由brokerIp2+HA端口構成 */
  
  private String haServerAddr;
  
  registerBroker
  
  上面的類的解釋可能還存在很多遺憾,下面以registerBroker方法,來實際操作一下上面提到類,看看註冊一個Broker發生了什麽:
  
  public RegisterBrokerResult registerBroker(
  
  final String clusterName,
  
  final String brokerAddr,
  
  final String brokerName,
  
  final long brokerId,
  
  final String haServerAddr,
  
  //TopicConfigSerializeWrapper比較復雜的數據結構,主要包含了broker上所有的topic信息
  
  final TopicConfigSerializeWrapper topicConfigWrapper,
  
  final List<String> filterServerList,
  
  final Channel channel) {
  
  RegisterBrokerResult result = new RegisterBrokerResult();
  
  try {
  
  try {
  
  this.lock.writeLock(www.feifanyule.cn/).lockInterruptibly(www.haom178.com);
  
  // 更新集群信息(根據集群名字,獲取當前集群下面的所有brokerName)
  
  Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
  
  // 如果當前集群下面brokerNames為空,則將當前請求broker加入到clusterAddrTable中
  
  if (null == brokerNames) {
  
  brokerNames = new HashSet<String>();
  
  this.clusterAddrTable.put(clusterName, brokerNames);
  
  }
  
  brokerNames.add(brokerName);
  
  boolean registerFirst = false;
  
  //獲取所有的名稱為brokerNamed 的brokerData,brokerData保留著了。也就是說同一個BrokerName,有可能有多條的brokerId和broker address與之對應
  
  // HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs
  
  BrokerData brokerData = this.brokerAddrTable.get(brokerName);
  
  if (null == brokerData) {
  
  registerFirst = true;
  
  brokerData = new BrokerData(www.michenggw.com clusterName, brokerName, new HashMap<Long, String>());
  
  this.brokerAddrTable.put(brokerName, brokerData);
  
  }
  
  String oldAddr = brokerData.getBrokerAddrs(www.wanmeiyuele.cn).put(brokerId, brokerAddr);
  
  registerFirst = registerFirst || (null == oldAddr);
  
  // 更新Topic信息
  
  //如果topicConfigWrapper不為空,且當前brokerId == 0,即為當前broker為master
  
  //這裏會判斷只有master才會創建QueueData,因為只有master才包含了讀寫隊列的信息
  
  if (null != topicConfigWrapper
  
  && MixAll.MASTER_ID == brokerId) {
  
  // 如果Topic配置信息發生變更或者該broker為第一次註冊
  
  if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
  
  || registerFirst)www.furggw.com {
  
  // 獲取所有topic信息
  
  ConcurrentMap<String, TopicConfig> tcTable =
  
  topicConfigWrapper.getTopicConfigTable();
  
  if (tcTable != null) {
  
  // 遍歷所有Topic
  
  for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
  
  // 根據brokername及topicconfig(read、write queue數量等)新增或者更新到topicQueueTable中
  
  this.createAndUpdateQueueData(brokerName, entry.getValue());
  
  }
  
  }
  
  }
  
  }
  
  // 更新最後變更時間(將brokerLiveTable中保存的對應的broker的更新時間戳,設置為當前時間)
  
  BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
  
  new BrokerLiveInfo(
  
  System.currentTimeMillis(),
  
  topicConfigWrapper.getDataVersion(),
  
  channel,
  
  haServerAddr));
  
  if (null == prevBrokerLiveInfo) {
  
  log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
  
  }
  
  if (filterServerList != null) {
  
  if (filterServerList.isEmpty()) {
  
  this.filterServerTable.remove(brokerAddr);
  
  } else {
  
  this.filterServerTable.put(brokerAddr, filterServerList);
  
  }
  
  }
  
  // 返回值(如果當前broker為slave節點)則將haServerAddr、masterAddr等信息設置到result返回值中
  
  if (MixAll.MASTER_ID != brokerId) {
  
  // 通過brokename的brokedate獲取當前slave節點的master節點addr
  
  String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
  
  if (masterAddr != null) {
  
  BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
  
  if (brokerLiveInfo != null) {
  
  result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
  
  result.setMasterAddr(masterAddr);
  
  }
  
  }
  
  }
  
  } finally {
  
  this.lock.writeLock().unlock();
  
  }
  
  } catch (Exception e) {
  
  log.error("registerBroker Exception", e);
  
  }
  
  return result;
  
  插入過程可以看如下這張圖:
  
  疑惑
  
  topicQueueTable有一點不明白,為什麽一個topic對應的是一個List,難道不是一個topic對應一個QueueData嗎?難道可以存在相同的topic?
  
  因為每一個topic可以存在不同的broker中,不同的broker就有不同的QueueData。
  
  brokerAddrTable根據一個brokername獲得一個BrokerData,可是為什麽一個BrokerData中有一個hashMap的brokerAddrs呢?我能理解的是一個brokername,會有多個address和id,但是cluster為什麽又是同一個呢?
  
  這是因為你理解錯了,以為BrokerName是唯一的,實際上是指定多個broker為一個name,只不過指定了不同的id和addres來區分是master還是slave。

RocketMQ 源碼分析 RouteInfoManager(四)