1. 程式人生 > >深入淺出高性能服務發現、配置框架Nacos系列 3: 服務發現:Nacos客戶端初始化流程

深入淺出高性能服務發現、配置框架Nacos系列 3: 服務發現:Nacos客戶端初始化流程

tor trim 如何 try 文件的 client 註冊 rgs erro

上一章節,我們從全局了解了一下Nacos項目的模塊架構,做到了心中有數,現在,我們去逐步去挖掘裏面的代碼細節,很多人在學習開源的時候,無從下手,代碼那麽多,從哪個地方開始看呢?我們可以從一個接口開始入手,這個接口是你使用過的,知道它大概做什麽事,有體感的,大家還記得第一章時,我們寫的HelloWorld嗎,對,就從裏面的接口開始剝洋蔥。

這個是Nacos的github代碼地址,開始之前先start關註一下,加上watch,後續Nacos的郵件列表也會通知到你,可以關註到Nacos的最新實時消息,及各大牛之間的精彩討論。

下面這段代碼,是第一章節發布一個服務的代碼:

public static void main(String[] args) throws NacosException, InterruptedException {
    //發布的服務名
    String serviceName = "helloworld.services";
    //構造一個Nacos實例,入參是Nacos server的ip和服務端口
    NamingService naming = NacosFactory.createNamingService("100.81.0.34:8080");
    //發布一個服務,該服務對外提供的ip為127.0.0.1,端口為8888
    naming.registerInstance(serviceName, "100.81.0.35", 8080);
    Thread.sleep(Integer.MAX_VALUE);
}

其中,第一步,是構造一個Nacos服務實例,構造實例的入參,是一個String,值的規範為ip:port,這個ip,就是我們任意一臺Nacos server的地址,我們點進去看這個方法:

public static NamingService createNamingService(String serverAddr) throws NacosException {
        return NamingFactory.createNamingService(serverAddr);
    }

同時我們看下創建配置服務實例的代碼:

public static ConfigService createConfigService(String serverAddr) throws NacosException {
   return ConfigFactory.createConfigService(serverAddr);
}

我們可以看到,NacosFactory實際上是一個服務發現和配置管理接口的統一入口,再由它不通的方法,創建不同服務的實例,我們可以直接使用NamingFactory,或者ConfigFactory直接創建Nacos的服務實例,也能work

技術分享圖片

接下來,看一下,是如何構造出這個Nacos naming實例的:

public static NamingService createNamingService(String serverList) throws NacosException {
   try {
      Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
      Constructor constructor = driverImplClass.getConstructor(String.class);
      NamingService vendorImpl = (NamingService) constructor.newInstance(serverList);
      return vendorImpl;
   } catch (Throwable e) {
      throw new NacosException(-400, e.getMessage());
   }
}

通過反射實例化出了一個NamingService的實例NacosNamingService,構造器是一個帶String入參的,我們順著往下看,構造函數裏面做了哪些事情:

public NacosNamingService(String serverList) {

    this.serverList = serverList;
    init();
    eventDispatcher = new EventDispatcher();
    serverProxy = new NamingProxy(namespace, endpoint, serverList);
    beatReactor = new BeatReactor(serverProxy);
    hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir);
}

入參serverList就是我們剛才傳入的服務端地址,值賦給了實例的serverList字段,接下來調用了一個init方法,這個方法裏面如下:

private void init() {
    namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
    if (StringUtils.isEmpty(namespace)) {
        namespace = UtilAndComs.DEFAULT_NAMESPACE_ID;
    }
    logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
    if (StringUtils.isEmpty(logName)) {
        logName = "naming.log";
    }
    cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
    if (StringUtils.isEmpty(cacheDir)) {
        cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
    }
}

這面做了3件事,給namespace,logName,cacheDir賦值,namespace我們麽有傳入,默認是default,namespace在Nacos裏面的作用,是用來進行本地緩存隔離的,一臺機器上,啟動一個Nacos的客戶端進程,默認的本地緩存路徑是default,如果再啟動一個,需要重新設置一個namespace,否則就會復用之前的緩存,造成沖突;logName和cacheDir,這2個字段就不解釋了,字面理解。這裏多說一句,這些值的設置,可以在java啟動時,通過系統參數的形式傳入,並且是第一優先級的。

init方法執行完之後,接下來是實例化一些框架組件,EventDispatcher,這個是一個經典的事件分發組件,它的工作模式如下:

技術分享圖片

會有一個單獨線程從blockQueue中獲取事件,這個事件在Nacos這裏, 就是服務端推送下來的數據,listener在我們訂閱一條數據時,會從生成一個listener實例,在事件到了隊列中,找到對應的listener,去執行裏面listener的回調函數onEvent。如果對這個模式不熟悉的同學,可以再翻看下EventDispatcher的代碼,這個屬於基礎知識了,和業務沒有關系,這裏就不過多詳細講解,篇幅太長。

接下來,實例化了一個NameProxy的組件,這個東西是幹嘛的呢?我們看下裏面代碼:

public NamingProxy(String namespace, String endpoint, String serverList) {
    this.namespace = namespace;
    this.endpoint = endpoint;
    if (StringUtils.isNotEmpty(serverList)) {
        this.serverList = Arrays.asList(serverList.split(","));
        if (this.serverList.size() == 1) {
            this.nacosDomain = serverList;
        }
    }
    executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.taobao.vipserver.serverlist.updater");
            t.setDaemon(true);
            return t;
        }
    });
    executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            refreshSrvIfNeed();
        }
    }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
    refreshSrvIfNeed();
}

這裏面邏輯有些多,我總結下,主要是啟動了一個線程,每隔30s,去執行refreshSrvIfNeed()這個方法,
refreshSrvIfNeed()這個方法裏面,做的事情,是通過一個http請求,去Nacos server獲取一串Nacos server集群的地址列表,具體代碼如下:

  private void refreshSrvIfNeed() {
        try {
            if (!CollectionUtils.isEmpty(serverList)) {
                LogUtils.LOG.info("server list provided by user: " + serverList);
                return;
            }
            if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
                return;
            }
            List<String> list = getServerListFromEndpoint();
            if (list.isEmpty()) {
                throw new Exception("Can not acquire vipserver list");
            }
            if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
                LogUtils.LOG.info("SERVER-LIST", "server list is updated: " + list);
            }
            serversFromEndpoint = list;
            lastSrvRefTime = System.currentTimeMillis();
        } catch (Throwable e) {
            LogUtils.LOG.warn("failed to update server list", e);
        }
    }

獲取完地址列表後,賦值給serversFromEndpoint,並且記錄當前更新時間,在下一次更新時,小於30s,就不更新,避免頻繁更新,總的來說,NameProxy的目的就是定時在客戶端維護Nacos服務端的最新地址列表。

我們繼續往下看,接下來初始化了BeatReactor這個組件,從名字可以猜測,應該是和心跳相關的事情,它初始化的代碼如下:

public BeatReactor(NamingProxy serverProxy) {
    this.serverProxy = serverProxy;
    executorService.scheduleAtFixedRate(new BeatProcessor(), 0, clientBeatInterval, TimeUnit.MILLISECONDS);
}

起了一個定時間隔為10s的任務,去執行BeatProcessor裏面的邏輯,BeatProcessor的代碼裏面,是循環的去取當前客戶端註冊好的實例,然後向服務端發送一個http的心跳通知請求,告訴客戶端,這個服務的健康狀態,具體代碼如下:

class BeatTask implements Runnable {
    BeatInfo beatInfo;
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    @Override
    public void run() {
        Map<String, String> params = new HashMap<String, String>(2);
        params.put("beat", JSON.toJSONString(beatInfo));
        params.put("dom", beatInfo.getDom());
        try {
            String result = serverProxy.callAllServers(UtilAndComs.NACOS_URL_BASE + "/api/clientBeat", params);
            JSONObject jsonObject = JSON.parseObject(result);

            if (jsonObject != null) {
                clientBeatInterval = jsonObject.getLong("clientBeatInterval");
            }
        } catch (Exception e) {
            LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
        }
    }
}

這裏就是naocs的客戶端主動上報服務健康狀況的邏輯了,是服務發現功能,比較重要的一個概念,服務健康檢查機制,常用的還有服務端主動去探測客戶端的接口返回。

最後一步,就是初始化了一個叫HostReactor的實例,我們來看下,它幹了些啥:

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {
    this.eventDispatcher = eventDispatcher;
    this.serverProxy = serverProxy;
    this.cacheDir = cacheDir;
    this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushRecver = new PushRecver(this);
}

第五行,是從緩存文件中加載數據到serviceInfoMap的內存map中,接下來,初始化了一個FailoverReactor的組件,這個是Nacos客戶端緩存容災相關的,它裏面的初始化代碼如下:

public void init() {
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
    // backup file on startup if failover directory is empty.
    executorService.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                File cacheDir = new File(failoverDir);

                if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                    throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                }

                File[] files = cacheDir.listFiles();
                if (files == null || files.length <= 0) {
                    new DiskFileWriter().run();
                }
            } catch (Throwable e) {
                LogUtils.LOG.error("NA", "failed to backup file on startup.", e);
            }

        }
    }, 10000L, TimeUnit.MILLISECONDS);
}

初始化了3個定時任務,第一個任務的代碼如下:

class SwitchRefresher implements Runnable {
    long lastModifiedMillis = 0L;
    @Override
    public void run() {
        try {
            File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
            if (!switchFile.exists()) {
                switchParams.put("failover-mode", "false");
                LogUtils.LOG.debug("failover switch is not found, " + switchFile.getName());
                return;
            }
            long modified = switchFile.lastModified();
            if (lastModifiedMillis < modified) {
                lastModifiedMillis = modified;
                String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString());
                if (!StringUtils.isEmpty(failover)) {
                    List<String> lines = Arrays.asList(failover.split(DiskCache.getLineSeperator()));
                    for (String line : lines) {
                        String line1 = line.trim();
                        if ("1".equals(line1)) {
                            switchParams.put("failover-mode", "true");
                            LogUtils.LOG.info("failover-mode is on");
                            new FailoverFileReader().run();
                        } else if ("0".equals(line1)) {
                            switchParams.put("failover-mode", "false");
                            LogUtils.LOG.info("failover-mode is off");
                        }
                    }
                } else {
                    switchParams.put("failover-mode", "false");
                }
            }
        } catch (Throwable e) {
            LogUtils.LOG.error("NA", "failed to read failover switch.", e);
        }
    }
}

首先判定下容災開關是否有,容災開關是一個磁盤文件的形式存在,通過容災開關文件名字,判定容災開關是否打開,1表示打開,0為關閉,讀取到容災開關後,將值更新到內存中,後續解析地址列表時,首先會判定一下容災開關是否打開,如果打開了,就讀緩存的數據,否則從服務端獲取最新數據。

第二個定時任務,做的事情如下:

class DiskFileWriter extends TimerTask {
    public void run() {
        Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
        for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
            ServiceInfo serviceInfo = entry.getValue();
            if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY)
                    || StringUtils.equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00")
                    || StringUtils.equals(serviceInfo.getName(), "vipclient.properties")
                    || StringUtils.equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00")) {
                continue;
            }
            DiskCache.write(serviceInfo, failoverDir);
        }
    }
}

每隔24小時,把內存中所有的服務數據,寫一遍到磁盤中,其中需要過濾掉一些非域名數據的特殊數據,具體可看代碼中的描述。最後一個定時任務,是每隔10s,是檢查緩存目錄是否存在,同時如果緩存裏面值沒有的話,主動觸發一次緩存寫磁盤的操作。

以上就是客戶端構造一個Nacos實例的初始化全部流程,大部分都是在初始化多個線程池或者定時任務,各司其職,這個也是我們寫後端程序的一些基本套路,提高系統的並發能力,同時在對任務的分發和執行,引入一些常用的異步編程模型如隊列模型的事件分發,這些都是異步和並發的很好學習素材,這2點也是寫高性能程序的基本要求。

總結

這一章節,我們通過Nacos的NacosFactory構造一個nacos服務實例作為切入點,把客戶端的初始化流程給串了一遍,概述下客戶端初始化流程做的幾件事:

  • 初始化事件分發組件,用於處理服務端主動通知下來的變更數據
  • 初始化Nacos服務集群地址列表更新組件,用於客戶端維護Nacos服務端的最新地址列表
  • 初始化服務健康檢查模塊,主動給服務端上報服務的健康情況
  • 初始化客戶端的緩存,10s檢查一次,如果沒有,則創建
  • 24小時備份一次客戶端的緩存文件
  • 5s檢查一次容災開關,更新到內存中,容災模式情況下,服務地址讀的都是緩存

以上就是Nacos客戶端實例初始化的整體流程,是不是感覺做的事情挺多的,還有一些代碼的細節點,大家自己多精讀一下,如果有什麽不明白的,可以留言,或者在社區找@超哥幫你解答,如果能發現bug或者其他建議,可以在社區提issue。

轉載請聯系:微信(zjjxg2018)

深入淺出高性能服務發現、配置框架Nacos系列 3: 服務發現:Nacos客戶端初始化流程