1. 程式人生 > >深入理解Eureka獲取註冊資訊(七)

深入理解Eureka獲取註冊資訊(七)

Eureka-Client獲取資訊

啟動獲取

在客戶端應用啟動時,初始化DiscoverClient的時候,會主動去獲取一次註冊資訊

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    // ...省略N多程式碼
    // 如果fetch-registry = true , 則去Eureka Server拉取註冊資訊
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 如果所有的Eureka Server都不可用,那麼從備用的服務裡面去取資料   fetchRegistryFromBackup(); } // ...省略N多程式碼 // 設定定時器 initScheduledTasks(); }
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig
config, AbstractDiscoveryClientOptionalArgs args,               Provider<BackupRegistry> backupRegistryProvider) { // ...省略N多程式碼 // 如果fetch-registry = true , 則去Eureka Server拉取註冊資訊 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 如果所有的Eureka Server都不可用,那麼從備用的服務裡面去取資料
  fetchRegistryFromBackup(); } // ...省略N多程式碼 // 設定定時器 initScheduledTasks(); }

shouldFetchRegistry : 預設truefetchRegistry : 獲取註冊資訊,此處傳入的是false, 表面上看是不需要全量獲取,但是應用第一次啟動的時候,本地快取為空,所以還是會全量獲取的。

PS: 啟動時獲取註冊資訊為全量。

定時器獲取


private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    // ...省略N多程式碼
}private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    // ...省略N多程式碼
}

registryFetchIntervalSeconds : 預設值為30秒 ,每30秒重新整理一次、

定時器初始化,,直接看CacheRefreshThread()

class CacheRefreshThread implements Runnable {
    public void run() {
        // 重新整理註冊資訊
        refreshRegistry();
    }
}
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
​
        boolean remoteRegionsModified = false;
        
        // 判斷是否需要全量獲取 , remoteRegionsModified  這個值來決定
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }
        // 獲取註冊資訊
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }
        // 日誌輸出 , 省略。。
        
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
} CacheRefreshThread implements Runnable {
    public void run() {
        // 重新整理註冊資訊
        refreshRegistry();
    }
}
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
​
        boolean remoteRegionsModified = false;
        
        // 判斷是否需要全量獲取 , remoteRegionsModified  這個值來決定
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }
        // 獲取註冊資訊
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }
        // 日誌輸出 , 省略。。
        
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
}

由上可以看到,系統在啟動的時候,初始化了一個定時器,每30秒一次,用來重新整理本地快取資訊。

獲取註冊資訊


private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
​
    try {
        // 獲取本地的快取資訊 , 也就是客戶端註冊資訊
        Applications applications = getApplications();
        
        // 判斷是否需要全量獲取
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            // 增量獲取
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
​
    // 釋出快取重新整理事件。
    onCacheRefreshed();
​
    // 更新本地應用的狀態
    updateInstanceRemoteStatus();
​
    // registry was fetched successfully, so return true
    return true;
}private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
​
    try {
        // 獲取本地的快取資訊 , 也就是客戶端註冊資訊
        Applications applications = getApplications();
        
        // 判斷是否需要全量獲取
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            // 增量獲取
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
​
    // 釋出快取重新整理事件。
    onCacheRefreshed();
​
    // 更新本地應用的狀態
    updateInstanceRemoteStatus();
​
    // registry was fetched successfully, so return true
    return true;
}

clientConfig.shouldDisableDelta() : 是否禁用增量獲取, 預設為false , 如果禁用了的話,那就只能是全量獲取了,總要獲取一下不是。clientConfig.getRegistryRefreshSingleVipAddress() : 當這個屬性不為空的時候,則全量獲取。具體作用不是很清楚(苦笑)forceFullRegistryFetch : 傳入的引數,表示是否需要全量獲取applications : 本地註冊資訊的快取,如果本地快取為空,或者裡面的版本號為-1,那麼就需要全量獲取,表示首次載入時。onCacheRefreshed() : 釋出快取重新整理的事件,使用者可以自定義是否監聽這個事件,比如需要將註冊資訊的變化落庫。

全量獲取


private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
​
    logger.info("Getting all instance registry info from the eureka server");
​
    Applications apps = null;
    // 傳送HTTP請求,去服務端獲取註冊資訊
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());
​
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 設定到本地快取裡面去
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
​
    logger.info("Getting all instance registry info from the eureka server");
​
    Applications apps = null;
    // 傳送HTTP請求,去服務端獲取註冊資訊
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());
​
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 設定到本地快取裡面去
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

在傳送HTTP請求去服務端獲取註冊資訊之前,做了一個判斷, 判斷registryRefreshSingleVipAddress是否為空, 這個欄位

表示的意思是 “此客戶端只對一個單一的VIP登錄檔的資訊感興趣”,預設為null , 也就是說如果客戶端只對其中一個VIP 感興趣那麼就只獲取這一個, 否則全部獲取

this.filterAndShuffle(apps) : 是否需要過濾客戶端資訊的狀態,如果設定了eureka.shouldFilterOnlyUpInstances = true 這個屬性的話,

客戶端獲取到註冊資訊之後,會剔除非UP狀態的客戶端資訊。

localRegionApps.set(this.filterAndShuffle(apps)) : 將註冊資訊設定到本地記憶體裡面去,使用AtomicReference型別做儲存、


private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

增量獲取


private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
​
    Applications delta = null;
    // 增量獲取資訊
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
​
    if (delta == null) {
        // 增量獲取為空,則全量返回
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        //這裡設定原子鎖的原因是怕某次排程網路請求時間過長,導致同一時間有多執行緒拉取到增量資訊併發修改
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 將獲取到的增量資訊和本地快取資訊合併。 
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                // 釋放鎖
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // ( HashCode 不一致|| 列印增量和全量的差異 )= true 重新去全量獲取
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
​
    Applications delta = null;
    // 增量獲取資訊
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
​
    if (delta == null) {
        // 增量獲取為空,則全量返回
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        //這裡設定原子鎖的原因是怕某次排程網路請求時間過長,導致同一時間有多執行緒拉取到增量資訊併發修改
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 將獲取到的增量資訊和本地快取資訊合併。 
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                // 釋放鎖
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // ( HashCode 不一致|| 列印增量和全量的差異 )= true 重新去全量獲取
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

步驟說明:

1.發起http請求,將服務端的客戶端變化的資訊拉取過來,如: register, cancle, modify 有過這些操作的資料

2.上鎖,防止某次排程網路請求時間過長,導致同一時間有多執行緒拉取到增量資訊併發修改

3.將請求過來的增量資料和本地的資料做合併

4.計算hashCode

5.如果hashCode不一致,或者clientConfig.shouldLogDeltaDiff() = true 的話,則又會去服務端發起一次全量獲取

合併資料


private void updateDelta(Applications delta) {
    int deltaCount = 0;
    // 迴圈拉取過來的應用列表
    for (Application app : delta.getRegisteredApplications()) {
        // 迴圈這個應用裡面的例項(有多個例項代表是叢集的。)
        for (InstanceInfo instance : app.getInstances()) {
            // 獲取本地的註冊應用列表
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }
            
            ++deltaCount;
            if (ActionType.ADDED.equals(instance.getActionType())) {// 新增事件
                //根據AppName 獲取本地的資料,看這個應用是否存在
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    // 不存在,則加到本地的應用裡面去
                    applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                // 為本地這個應用新增這個例項
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改事件
                //根據AppName 獲取本地的資料,看這個應用是否存在
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    // 不存在,則加到本地的應用裡面去