RocketMQ NameServer 原理分析
概述
NameServer 是RocketMQ 訊息佇列的狀態伺服器(服務發現功能),叢集中的各個服務都需要通過 NameServer 來了解叢集中各個服務的狀態。相當於 SpringCloud 中的 Eureka 的功能。

NameServer 中維護著 Producer 叢集、Broker 叢集、 Consumer 叢集的服務狀態。通過定時傳送心跳資料包進行維護更新各個服務的狀態。
當有新的Producer 加入叢集時,通過上報自身的服務資訊,及獲取各個 Broker Master的資訊(Broker 地址、Topic、Queue 等資訊),這樣就可以決定把對應的Topic訊息儲存到那個Broker、哪個Queue 上。Consumer 同理。
NameServer 可以部署多個,多個NameServer互相獨立,不會交換訊息。Producer、Broker、Consumer 啟動的時候都需要指定多個 NameServer,各個服務的資訊會同時註冊到多個 NameServer 上,從而能到達高可用。
NameServer 模組結構

可以看出 NameServer 中的類比較少,8個類。分析起來也比較輕鬆。
NameServer 啟動
org.apache.rocketmq.namesrv.NamesrvStartup 是 NameServer 的啟動類。

通過 createNamesrvController 方法建立 NamesrvController 。

NameServer 啟動時首先判斷是否傳入了命令列引數。
命令列引數有兩個,-p 和 -c
-c 可以指定 NameServer 的配置檔案,如果不指定,則使用預設值。
-p 列印 NameServer 的配置引數資訊。列印完引數後退出程序。
下面是列印 NameServer 預設的配置引數資訊。

如果想修改這些預設的引數,則可以使用 -c 引數,指定配置檔案,進行更改。
初始化 NamesrvController

1、呼叫NamesrvController.initialize() 初始化 NamesrvController,然後呼叫 NamesrvController.start() 方法來開啟 NameServer 服務。
2、註冊 ShutdownHookThread 服務。在 JVM 退出之前,呼叫 ShutdownHookThread 來進行關閉服務,釋放資源。
注意:使用 kill -9 強制殺程序是不會執行 ShutdownHook 的。
NamesrvController.initialize()
public boolean initialize() { //從 /namesrv/kvConfig.json 中載入 NameServer 的配置 this.kvConfigManager.load(); //建立 Netty Server this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 建立 Netty Server 執行的執行緒池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //註冊 NameServer 服務接受請求的處理類 this.registerProcessor(); //定時清理超時的Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定時列印 NameServer 的配置資訊 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); ... }
- 1、KVConfigManager類預設是從 /namesrv/kvConfig.json 配置檔案中載入NameServer的配置引數.將配置引數載入儲存到
HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();
變數中。
kvConfig.json 檔案的預設路徑為:
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
-
2、建立並初始化 NettyRemotingServer 。
NettyRomotingServer 是 NameServer 對外提供服務功能的。
-
3、建立 Netty Server 執行使用的執行緒池。
-
4、註冊預設的處理類DefaultRequestProcessor,所有的請求均由該處理類的processRequest方法來處理。
-
5、建立一個定時清理超時的 Broker 定時任務。
每隔10秒檢查一遍所有Broker的狀態的定時任務,判斷每一個Broker 最近兩分鐘是否更新過。如果沒有更新則把該 Broker 的 channel 關閉(關閉該Broker
的長連線),並清除相關資料。
-
6、建立一個列印 NameServer 配置的定時任務。
每隔10分鐘列印一次NameServer的配置引數。即KVConfigManager.configTable變數的內容。
NamesrvController.registerProcessor()
註冊接收請求的處理類。 private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } }
預設註冊的是 DefaultRequestProcessor 處理器。
如果設定了 NamesrvConfig.clusterTest = true,則會註冊 ClusterTestRequestProcessor 處理器。
ClusterTestRequestProcessor繼承DefaultRequestProcessor。
ClusterTestRequestProcessor.getRouteInfoByTopic 方法

ClusterTestRequestProcessor僅重寫了 getRouteInfoByTopic()方法。
判斷如果獲取不到 topicRouteData資料,則會去其它的NameServer 上查詢該資料並返回。
DefaultRequestProcessor
通過 processRequest 方法來處理客戶端發過來的請求。
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
所有請求的操作說明如下:
requectcode | 說明 |
---|---|
PUT_KV_CONFIG | 向Namesrv追加KV配置 |
GET_KV_CONFIG | 從Namesrv獲取KV配置 |
DELETE_KV_CONFIG | 從Namesrv獲取KV配置 |
QUERY_DATA_VERSION | 獲取版本資訊 |
REGISTER_BROKER | 註冊一個Broker,資料都是持久化的,如果存在則覆蓋配置 |
UNREGISTER_BROKER | 解除安裝一個Broker,資料都是持久化的 |
GET_ROUTEINTO_BY_TOPIC | 根據Topic獲取Broker Name、topic配置資訊 |
GET_BROKER_CLUSTER_INFO | 獲取註冊到Name Server的所有Broker叢集資訊 |
WIPE_WRITE_PERM_OF_BROKER | 去掉BrokerName的寫許可權 |
GET_ALL_TOPIC_LIST_FROM_NAMESERVER | 從Name Server獲取完整Topic列表 |
DELETE_TOPIC_IN_NAMESRV | 從Namesrv刪除Topic配置 |
GET_KVLIST_BY_NAMESPACE | 通過NameSpace獲取所有的KV List |
GET_TOPICS_BY_CLUSTER | 獲取指定叢集下的所有 topic |
GET_SYSTEM_TOPIC_LIST_FROM_NS | 獲取所有系統內建 Topic 列表 |
GET_UNIT_TOPIC_LIST | 單元化相關 topic |
GET_HAS_UNIT_SUB_TOPIC_LIST | 獲取含有單元化訂閱組的 Topic 列表 |
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST | 獲取含有單元化訂閱組的非單元化 |
UPDATE_NAMESRV_CONFIG | 更新Name Server配置 |
根據 processRequest 方法分析原始碼,發現接收到的所有請求操作的資料都儲存在 RouteInfoManager 類中,所有的操作都是對RouteInfoManager 類的操作。
RouteInfoManager
public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
1、topicQueueTable
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
Map 中 key儲存的是 Topic 的名稱, value 儲存的是 QueueData 的集合。
QueueData 的集合 size 等於 Topic 對應的 Broker Master 的個數。
QueueData 的資料結構如下:
public class QueueData implements Comparable<QueueData> { private String brokerName;//broker 名字 private int readQueueNums;//可讀 queue 數 private int writeQueueNums;//可寫 queue 數 private int perm;//讀寫許可權 private int topicSynFlag;//同步標識
2、brokerAddrTable
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
Map 中 key 儲存的是 Broker Name, value 儲存的是 BrokerData 資料(Broker 的相關資訊)。
BrokerData 的資料結構如下:
public class BrokerData implements Comparable<BrokerData> { private String cluster;// 叢集名稱 private String brokerName;// Broker Name // 儲存的是該 Broker Name 對應的多個 Broker 地址資訊。 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
因為相同的名稱的 BrokerName 可以多有個。一個 Master 和多個 Slave。所有使用 brokerAddrs 來儲存相同 BrokerName 下所有的 Broker 資訊(判斷Master 和 Slave 的關係是通過 Master 和 Slave 名稱是否相同,brokerId 為 0 的是Master, 大於0 的是 Slave)。
3、clusterAddrTable
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
Map 中 key儲存的是 clusterName 的名稱, value 儲存的是 brokerName 的集合。
4、brokerLiveTable
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
Map 中 key 儲存的是 brokerAddr 資訊,value 儲存的是 BrokerLiveInfo 資訊,BrokerLiveInfo 中儲存了 Broker 的實時狀態。
class BrokerLiveInfo { // 最後更新時間 private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr;
上面介紹的 NamesrvController.initialize() 中有一個schedule定時任務,每個10秒鐘定時呼叫 scanNotActiveBroker() 方法進行掃描不活動的 Broker,並把不活動的 Broker 刪除掉,就是判斷的 這個 lastUpdateTimestamp 這個資料。
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
超過 2分鐘沒有更新這個值,就認為 Broker 不可用了。
5、filterServerTable
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Map 中 key 儲存的是 brokerAddr 資訊,value 儲存的是 Filter Server 資訊。
Filter Server 是訊息的過濾伺服器,一個 Broker 可以對應多個 Filter Server。