1. 程式人生 > >RocketMQ原始碼解讀系列——1、namesrv原始碼

RocketMQ原始碼解讀系列——1、namesrv原始碼

我看的原始碼是RocketMQ的3.2.6版本,該版本是RocketMQ被apache組織孵化之前的版本了,但是不影響原始碼閱讀,閱讀原始碼主要是想學一下大牛們的開發思想

namesrv的專案結構:

NamesrvStartup類作為namesrv的啟動入口,主要作用是載入配置檔案,環境檢查,呼叫NamesrvController類啟動server服務

NamesrvController作為namesrv服務控制類,作為namesrv服務的控制核心,其主要方法如下:

    public boolean initialize() {
        // 載入KV配置
        this.kvConfigManager.load();

        // 初始化通訊層
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 初始化執行緒池
        this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
                    new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        // 增加定時任務--掃描過期的broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //列印所有的config
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        //
        // @Override
        // public void run() {
        // NamesrvController.this.routeInfoManager.printAllPeriodically();
        // }
        // }, 1, 5, TimeUnit.MINUTES);

        return true;
    }
    /**
     * 啟動namesrv服務端
     * @throws Exception
     * @author: yangcheng
     */
    public void start() throws Exception {
        this.remotingServer.start();
    }

RouteInfoManager.java類,原註釋是“執行過程中的路由資訊,資料只在記憶體中因此宕機後資料消失;但是Broker會定期推送最新資料”,其主要屬性如下(類中其他方法主要作用就是對這些屬性的增刪查改):

    /**
     * topic與該topic相關連的所有queue--queue中包含了brokerName值
     */
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    /**
     * broker的資訊快取
     */
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    /**
     * 叢集名稱與叢集中所有brokername的關係快取
     */
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    /**
     * 啟用狀態的Broker
     */
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;