1. 程式人生 > >Spring Cloud原始碼分析之Eureka篇第五章:更新服務列表

Spring Cloud原始碼分析之Eureka篇第五章:更新服務列表

在上一章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中,會做以下幾件事:

  1. 週期性更新服務列表;
  2. 週期性服務續約;
  3. 服務註冊邏輯;

概覽

以下圖片來自Netflix官方,圖中顯示Eureka Client會向註冊中心發起Get Registry請求來獲取服務列表,接下來就去看下對應的程式碼實現;
在這裡插入圖片描述

結論提前知曉

看原始碼易犯困,又難保持注意力集中,因此先拋結論吧,這樣不看原始碼也有收穫:

  1. Eureka client從註冊中心更新服務列表,然後自身會做快取;
  2. 作為服務消費者,就是從這些快取資訊中獲取的服務提供者的資訊;
  3. 增量更新的服務以30秒為週期迴圈呼叫;
  4. 增量更新資料在服務端儲存時間為3分鐘,因此Eureka client取得的資料雖然被稱為"增量更新",仍然可能和30秒前取的資料一樣,所以Eureka client要自己來處理重複資訊;
  5. 由3、4兩點可以推斷出,Eureka client的增量更新,其實獲取的是Eureka server最近三分鐘內的變更,因此,如果Eureka client有超過三分鐘沒有做增量更新的話(例如網路問題),那麼再呼叫增量更新介面時,那三分鐘內Eureka server的變更就可能獲取不到了,這就造成了Eureka server和Eureka client之間的資料不一致,需要有個方案來及時發現這個問題;
  6. 正常情況下,Eureka client多次增量更新後,最終的服務列表資料應該Eureka server保持一致,但如果期間發生異常,可能導致和Eureka server的資料不一致,為了暴露這個問題,Eureka server每次返回的增量更新資料中,會帶有一致性雜湊碼,Eureka client用本地服務列表資料算出的一致性雜湊碼應該和Eureka server返回的一致,若不一致就證明增量更新出了問題導致Eureka client和Eureka server上的服務列表資訊不一致了,此時需要全量更新;
  7. Eureka server上的服務列表資訊對外提供JSON/XML兩種格式下載;
  8. Eureka client使用jersey的SDK,去下載JSON格式的服務列表資訊;

關於原始碼版本

本次分析的Spring Cloud版本為Edgware.RELEASE,對應的eureka-client版本為1.7.0;

如何做到週期性執行

更新服務列表和服務續約都是週期性迴圈執行的,這是如何實現的呢,來看initScheduledTasks方法的原始碼:
紅框部分其實是一次性任務

如上圖兩個紅框中所示,scheduler.schedule方法其實啟動的是一個延時執行的一次性任務,不過TimedSupervisorTask內有乾坤,會在每次執行完任務後再啟動一個同樣的任務,這樣就能實現週期性執行任務了,並且TimedSupervisorTask的功能還不止如此,它還負責任務超時、動態調節週期性間隔、執行緒池滿、未知異常等各種情況的處理,推薦您參考《Eureka的TimedSupervisorTask類(自動調節間隔的週期性任務)》瞭解更多細節;

來自官方文件的指導資訊

學習原始碼之前先看文件可以確定大方向,不會因為陷入原始碼細節導致偏離學習目標,如下圖所示:
在這裡插入圖片描述

對上文,我的理解:

  1. Eureka client從註冊中心更新服務列表,然後自身會做快取;
  2. 作為服務消費者,就是從這些快取資訊中獲取的服務提供者的資訊;
  3. 增量更新的服務以30秒為週期迴圈呼叫;
  4. 增量更新資料在服務端儲存時間為3分鐘,因此Eureka client取得的資料雖然被稱為"增量更新",仍然可能和30秒前取的資料一樣,所以Eureka client要自己來處理重複資訊;
  5. 由3、4兩點可以推斷出,Eureka client的增量更新,其實獲取的是Eureka server最近三分鐘內的變更,因此,如果Eureka client有超過三分鐘沒有做增量更新的話(例如網路問題),那麼再呼叫增量更新介面時,那三分鐘內Eureka server的變更就可能獲取不到了,這就造成了Eureka server和Eureka client之間的資料不一致,需要有個方案來及時發現這個問題;
  6. 正常情況下,Eureka client多次增量更新後,最終的服務列表資料應該Eureka server保持一致,但如果期間發生異常,可能導致和Eureka server的資料不一致,為了暴露這個問題,Eureka server每次返回的增量更新資料中,會帶有一致性雜湊碼,Eureka client用本地服務列表資料算出的一致性雜湊碼應該和Eureka server返回的一致,若不一致就證明增量更新出了問題導致Eureka client和Eureka server上的服務列表資訊不一致了,此時需要全量更新;
  7. Eureka server上的服務列表資訊對外提供JSON/XML兩種格式下載;
  8. Eureka client使用jersey的SDK,去下載JSON格式的服務列表資訊;
    準備工作就到此,接下來學習原始碼,整個過程應圍繞上述點八進行,不要過早陷入某些程式碼細節中;

原始碼分析

  1. 如下圖紅框所示,更新服務列表的邏輯已經封裝在CacheRefreshThread類中:
    在這裡插入圖片描述

  2. CacheRefreshThread類中又是呼叫refreshRegistry方法來實現服務列表更新的,refreshRegistry方法如下:
    在這裡插入圖片描述

如上圖所示,本文假設應用部署在非AWS環境,所以Eureka client不做region和zone相關的配置,因此上圖綠框中的程式碼不會執行,我們聚焦紅框中的程式碼,先看fetchRegistry方法;

  1. fetchRegistry方法原始碼如下,請注意中文註釋:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        //用Stopwatch做耗時分析
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // 取出本地快取的,之氣獲取的服務列表資訊
            Applications applications = getApplications();

            //判斷多個條件,確定是否觸發全量更新,如下任一個滿足都會全量更新:
            //1. 是否禁用增量更新;
            //2. 是否對某個region特別關注;
            //3. 外部呼叫時是否通過入參指定全量更新;
            //4. 本地還未快取有效的服務列表資訊;
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
            	//這些詳細的日誌可以看出觸發全量更新的原因
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                //全量更新
                getAndStoreFullRegistry();
            } else {
                //增量更新
                getAndUpdateDelta(applications);
            }
            //重新計算和設定一致性hash碼
            applications.setAppsHashCode(applications.getReconcileHashCode());
            //日誌列印所有應用的所有例項數之和
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        //將本地快取更新的事件廣播給所有已註冊的監聽器,注意該方法已被CloudEurekaClient類重寫
        onCacheRefreshed();

        //檢查剛剛更新的快取中,有來自Eureka server的服務列表,其中包含了當前應用的狀態,
        //當前例項的成員變數lastRemoteInstanceStatus,記錄的是最後一次更新的當前應用狀態,
        //上述兩種狀態在updateInstanceRemoteStatus方法中作比較 ,如果不一致,就更新lastRemoteInstanceStatus,並且廣播對應的事件
        updateInstanceRemoteStatus();

        return true;
    }

上述程式碼中已有註釋詳細說明,就不另外贅述了,接下來細看getAndStoreFullRegistry和getAndUpdateDelta這兩個方法,瞭解全量增量更新的細節;

全量更新本地快取的服務列表

  1. getAndStoreFullRegistry方法負責全量更新,程式碼如下所示,非常簡單的邏輯:
private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        //由於並沒有配置特別關注的region資訊,因此會呼叫eurekaTransport.queryClient.getApplications方法從服務端獲取服務列表
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //返回物件就是服務列表
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } 
	//考慮到多執行緒同步,只有CAS成功的執行緒,才會把自己從Eureka server獲取的資料來替換本地快取
        else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //localRegionApps就是本地快取,是個AtomicReference例項
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
  1. getAndStoreFullRegistry方法中並無複雜邏輯,只有eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())這段需要展開細看,和Eureka server互動的邏輯都在這裡面,方法getApplications的具體實現是在EurekaHttpClientDecorator類:
@Override
    public EurekaHttpResponse<Applications> getApplications(final String... regions) {
        return execute(new RequestExecutor<Applications>() {
            @Override
            public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
                return delegate.getApplications(regions);
            }

            @Override
            public RequestType getRequestType() {
            	//本次向Eureka server請求的型別:獲取服務列表
                return RequestType.GetApplications;
            }
        });
    }

EurekaHttpClientDecorator類從名字看是個裝飾者模式的實現,看它的其他程式碼,發現各類遠端服務都在此被封裝成API了,例如註冊的:

@Override
    public EurekaHttpResponse<Void> register(final InstanceInfo info) {
        return execute(new RequestExecutor<Void>() {
            @Override
            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                return delegate.register(info);
            }

            @Override
            public RequestType getRequestType() {
                return RequestType.Register;
            }
        });
    }

還有續租的:

@Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
                                                          final String id,
                                                          final InstanceInfo info,
                                                          final InstanceStatus overriddenStatus) {
        return execute(new RequestExecutor<InstanceInfo>() {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
                return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
            }

            @Override
            public RequestType getRequestType() {
                return RequestType.SendHeartBeat;
            }
        });
    }
  1. 再繼續追蹤 delegate.register(info),進入了AbstractJerseyEurekaHttpClient類,這裡面是各種網路請求的具體實現,EurekaHttpClientDecorator類中的getApplications、register、sendHeartBeat等方法對應的網路請求響應邏輯在AbstractJerseyEurekaHttpClient中都有具體實現,篇幅所限我們只關注getApplications:
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
	//取全量資料的path是""apps"
	return getApplicationsInternal("apps/", regions);
}

@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
	//取增量資料的path是""apps/delta"
	return getApplicationsInternal("apps/delta", regions);
}

//具體的請求響應處理都在此方法中
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
        ClientResponse response = null;
        String regionsParamValue = null;
        try {
            //jersey、resource這些關鍵詞都預示著這是個restful請求	
            WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
            if (regions != null && regions.length > 0) {
                regionsParamValue = StringUtil.join(regions);
                webResource = webResource.queryParam("regions", regionsParamValue);
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            //發起網路請求,將響應封裝成ClientResponse例項
            response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

            Applications applications = null;
            if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            	//取得全部應用資訊
                applications = response.getEntity(Applications.class);
            }
            return anEurekaHttpResponse(response.getStatus(), Applications.class)
                    .headers(headersOf(response))
                    .entity(applications)
                    .build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                        serviceUrl, urlPath,
                        regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                        response == null ? "N/A" : response.getStatus()
                );
            }
            if (response != null) {
                response.close();
            }
        }
    }

上述程式碼中,利用jersey-client庫的API向Eureka server發起restful請求,並將響應資料封裝到EurekaHttpResponse例項中返回;

小結:獲取全量資料,是通過jersey-client庫的API向Eureka server發起restful請求實現的,並將響應的服務列表資料放在一個成員變數中作為本地快取;

獲取服務列表資訊的增量更新

獲取服務列表資訊的增量更新是通過getAndUpdateDelta方法完成的,具體分析請看下面的中文註釋:

private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        //增量資訊是通過eurekaTransport.queryClient.getDelta方法完成的
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //delta中儲存了Eureka server返回的增量更新
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            //如果增量資訊為空,就直接發起一次全量更新
            getAndStoreFullRegistry();
        } 
        //考慮到多執行緒同步問題,這裡通過CAS來確保請求發起到現在是執行緒安全的,
        //如果這期間fetchRegistryGeneration變了,就表示其他執行緒也做了類似操作,因此放棄本次響應的資料
        else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    //用Eureka返回的增量資料和本地資料做合併操作,這個方法稍後會細說
                    updateDelta(delta);
                    //用合併了增量資料之後的本地資料來生成一致性雜湊碼
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            //Eureka server在返回增量更新資料時,也會返回服務端的一致性雜湊碼,
            //理論上每次本地快取資料經歷了多次增量更新後,計算出的一致性雜湊碼應該是和服務端一致的,
            //如果發現不一致,就證明本地快取的服務列表資訊和Eureka server不一致了,需要做一次全量更新
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                //一致性雜湊碼不同,就在reconcileAndLogDifference方法中做全量更新
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

上述程式碼中有幾處需要注意:
a. 獲取增量更新資料使用的方法是:eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
b. 將增量更新的資料和本地快取合併的方法是: updateDelta(delta);
c. 通過檢查一致性雜湊碼可以確定歷經每一次增量更新後,本地的服務列表資訊和Eureka server上的是否還保持一致,若不一致就要做一次全量更新,通過呼叫reconcileAndLogDifference方法來完成;

上述a、b、c三點,接下來依次展開:

  1. 向Eureka server發起網路請求的邏輯和前面全量更新的差不多,也是EurekaHttpClientDecorator和AbstractJerseyEurekaHttpClient這兩個類合作實現的,先看EurekaHttpClientDecorator部分:
@Override
    public EurekaHttpResponse<Applications> getDelta(final String... regions) {
        return execute(new RequestExecutor<Applications>() {
            @Override
            public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
                return delegate.getDelta(regions);
            }

            @Override
            public RequestType getRequestType() {
                return RequestType.GetDelta;
            }
        });
    }
  1. 再看AbstractJerseyEurekaHttpClient類中的getDelta方法,居然和全量獲取服務列表資料呼叫了相同的方法getApplicationsInternal,只是ur引數不一樣而已;
    @Override
    public EurekaHttpResponse<Applications> getDelta(String... regions) {
        return getApplicationsInternal("apps/delta", regions);
    }

由上述程式碼可見,從Eureka server的獲取增量更新,和一些常見的方式略有區別:
a. 一般的增量更新是在請求中增加一個時間戳或者上次更新的tag號等引數,由服務端根據引數來判斷哪些資料是客戶端沒有的;
b. 而這裡的Eureka client卻沒有這類引數,聯想到前面官方文件中提到的“Eureka會把更新資料保留三分鐘”,就可以理解了:Eureka把最近的變更資料保留三分鐘,這三分鐘內每個Eureka client來請求增量更新是,server都返回同樣的快取資料,只要client能保證三分鐘之內有一次請求,就能保證自己的資料和Eureka server端的保持一致;
c. 那麼如果client有問題,導致超過三分鐘才來獲取增量更新資料,那就有可能client和server資料不一致了,此時就要有一種方式來判斷是否不一致,如果不一致,client就會做一次全量更新,這種判斷就是一致性雜湊碼;

  1. Eureka client獲取到增量更新後,通過updateDelta方法將增量更新資料和本地資料做合併:
private void updateDelta(Applications delta) {
        int deltaCount = 0;
        //遍歷所有服務
        for (Application app : delta.get