RocketMQ原始碼解讀系列——1、namesrv原始碼
阿新 • • 發佈:2018-12-22
我看的原始碼是RocketMQ的3.2.6版本,該版本是RocketMQ被apache組織孵化之前的版本了,但是不影響原始碼閱讀,閱讀原始碼主要是想學一下大牛們的開發思想
namesrv的專案結構:
NamesrvStartup類作為namesrv的啟動入口,主要作用是載入配置檔案,環境檢查,呼叫NamesrvController類啟動server服務
NamesrvController作為namesrv服務控制類,作為namesrv服務的控制核心,其主要方法如下:
public boolean initialize() { // 載入KV配置 this.kvConfigManager.load(); // 初始化通訊層 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 初始化執行緒池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); // 增加定時任務--掃描過期的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //列印所有的config this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // // @Override // public void run() { // NamesrvController.this.routeInfoManager.printAllPeriodically(); // } // }, 1, 5, TimeUnit.MINUTES); return true; }
/**
* 啟動namesrv服務端
* @throws Exception
* @author: yangcheng
*/
public void start() throws Exception {
this.remotingServer.start();
}
RouteInfoManager.java類,原註釋是“執行過程中的路由資訊,資料只在記憶體中因此宕機後資料消失;但是Broker會定期推送最新資料”,其主要屬性如下(類中其他方法主要作用就是對這些屬性的增刪查改):
/** * topic與該topic相關連的所有queue--queue中包含了brokerName值 */ private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; /** * broker的資訊快取 */ private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; /** * 叢集名稱與叢集中所有brokername的關係快取 */ private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; /** * 啟用狀態的Broker */ private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;