1. 程式人生 > >Eureka原始碼分析之Eureka Client獲取例項資訊流程

Eureka原始碼分析之Eureka Client獲取例項資訊流程

下方是Eureka Client從Eureka Server獲取例項資訊的總體流程圖,後面會詳細介紹每個步驟。

Eureka Client在剛啟動的時候會從Eureka Server全量獲取一次註冊資訊,同時初始化Eureka Client本地例項資訊快取定時更新任務,預設30s一次 registryFetchIntervalSeconds = 30。

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    //略去部分無用程式碼.....

    //此時正在建立Client例項物件,也就是說Client剛啟動的時候,全量獲取一次例項資訊儲存到本地
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        //重點方法fetchRegistry(boolean forceFullRegistryFetch)
        //沒有獲取成功,即fetchRegistry方法返回false,那麼從備份中獲取註冊資訊
        //獲取例項資訊異常,可以實現BackupRegistry介面,讓eureka client獲取一些其他的例項資訊
        fetchRegistryFromBackup();
    }
    //略去部分無用程式碼.....
    // finally, init the schedule tasks 
    //初始化排程任務,重新整理Client本地快取、心跳、例項狀態資訊複製
    initScheduledTasks();

    //略去部分無用程式碼.....
}

重點看fetchRegistry(boolean forceFullRegistryFetch)這個方法,引數forceFullRegistryFetch表示是否全量獲取例項資訊,可以通過這個引數配置eureka.client.disable-delta,預設是false,即預設採取增量獲取模式,後面會講增量與全量的區別,以及為什麼預設採取增量模式。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    try {
        // If the delta is disabled or if it is the first time, get all applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            //略去log.....
            //全量獲取,並儲存到本地快取
            getAndStoreFullRegistry();
        } else {
            //增量獲取,並更新本地快取,如果增量獲取失敗,進行一次全量獲取
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();
    return true;
}

主要看一下增量獲取方法的實現,getAndUpdateDelta(Applications applications),delta的意思是數學阿拉伯字元\Delta,代表著增量。

注意下面delta == null的條件判斷,如果增量獲取沒有獲取到例項資訊返回的是new Applications(),而不是null,返回null說明發生了異常。只有增量獲取發生了異常,才會再進行一次全量獲取。

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    //增量獲取,getDelta的實現
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    //如果增量獲取失敗,進行一次全量獲取
    //注意這個地方,是獲取失敗了,如果沒有獲取到,那麼返回的是個new Applications(); 
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                updateDelta(delta);
                //根據所有例項的資訊,生成一個HashCode
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        //比較增量的AppsHashCode和更新之後快取applications的HashCode,如果不一致,進行一次全量獲取。
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

 

//負責讀動作的http請求,全量獲取以及增量獲取
private EurekaHttpClient queryClient;

EurekaHttpClient的實現類很多,最終發起http呼叫的是AbstractJerseyEurekaHttpClient這個類,獲取例項資訊的主要兩個方法

注意下面getApplicationsInternal方法,裡面的try程式碼塊,並沒有catch(並不是我給刪了),甚至連log都不打。也就是說,在獲取例項資訊的時候,即便發生異常也不管,如果發生異常就返回null的Applications。

@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);
}

@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
    return getApplicationsInternal("apps/delta", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            applications = response.getEntity(Applications.class);
        }
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        //省略...
    }
}

我們可以在eureka-core工程下的resources包下面找到Eureka Server暴露的REST介面,getDelta對應的介面在ApplicationsResource裡面

@Path("delta")
@GET
public Response getContainerDifferential(
        @PathParam("version") String version,
        @HeaderParam(HEADER_ACCEPT) String acceptHeader,
        @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
        @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
        @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

    //省略部分程式碼...
    //快取的KEY包含實體的型別、增量或全量獲取/ApplicationName、region資訊、keytype(JSON, XML)
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        return Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        //responseCache為ResponseCacheImpl物件,是Eureka Server快取
        //包含兩級快取,一級快取為Google Guava Cache,二級快取為ConcurrentMap<Key, Value>
        return Response.ok(responseCache.get(cacheKey))
                .build();
    }
}

再來看這個responseCache.get(cacheKey)方法,直接到ResponseCacheImpl類裡面找get方法

public String get(final Key key) {
    return get(key, shouldUseReadOnlyResponseCache);
}
String get(final Key key, boolean useReadOnlyCache) {
    Value payload = getValue(key, useReadOnlyCache);
    if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
        return null;
    } else {
        return payload.getPayload();
    }
}
Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
        	//開啟二級快取
            //從二級快取readOnlyCacheMap中讀取
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                //如果沒讀到,那麼從一級快取中讀,然後再儲存到二級快取
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
        	//不開啟二級快取,直接從一級快取readWriteCacheMap中讀取
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
}

預設先從二級快取中讀取,如果二級快取沒有命中,那麼接著從再去一級快取中讀取,那麼如果一級快取也沒有命中呢,我們再來看看這個readWriteCacheMap,這個採用了Google Guava Cache,在remove的時候可以自定義回撥,在get方法沒有返回值的時候,去呼叫load(Key key)方法將返回值返回並儲存。

//谷歌Guava快取,有自動過期功能,這裡是預設3分鐘
this.readWriteCacheMap =
    CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
            .removalListener(new RemovalListener<Key, Value>() {
                @Override
                public void onRemoval(RemovalNotification<Key, Value> notification) {
                    Key removedKey = notification.getKey();
                    if (removedKey.hasRegions()) {
                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                    }
                }
            })
            .build(new CacheLoader<Key, Value>() {
                @Override
                public Value load(Key key) throws Exception {
                    if (key.hasRegions()) {
                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                        regionSpecificKeys.put(cloneWithNoRegions, key);
                    }
                    //此處從Eureka Server底層的雙層map去取例項資訊
                    Value value = generatePayload(key);
                    return value;
                }
            });

全量獲取和增量獲取的區別就在這兩個方法裡getApplicationsFromMultipleRegions,getApplicationDeltasFromMultipleRegions,這兩個方法在AbstractInstanceRegistry類裡面,這個類可以看做是Eureka Server大部分功能的實現類

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();
                //全量獲取
                if (ALL_APPS.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } //增量獲取
                else if (ALL_APPS_DELTA.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                } //按appName獲取
                else {
                    tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            case VIP:
            case SVIP:
                tracer = serializeViptimer.start();
                payload = getPayLoad(key, getApplicationsForVip(key, registry));
                break;
            default:
                logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                payload = "";
                break;
        }
        return new Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
}

看看增量獲取的方法getApplicationDeltasFromMultipleRegions內部是怎麼實現的。裡面有一個recentlyChangedQueue,先從本地最近改變的佇列裡面獲取,再從遠端區域的最近改變的佇列裡面獲取。

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
    	//allKnownRemoteRegions從Eureka Server 配置檔案中配置
        remoteRegions = allKnownRemoteRegions; // null means all remote regions.
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;

    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        write.lock();
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
        //從最近改變的佇列裡面獲取最近改變的例項資訊
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            logger.debug("The instance id {} is found with status {} and actiontype {}",
                    instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
        }
        //遍歷所有的remoteRegions
        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
            	//獲取註冊到其他Region的例項資訊
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                Application appInstanceTillNow =
                                        apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                }
                            }
                        }
                    }
                }
            }
        }

        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

那麼這個recentlyChangedQueue裡面都放著什麼樣的資料呢,或者說什麼時候會向這個佇列裡面新增資料呢?

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        //省略部分程式碼...
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        //省略部分程式碼...
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        //發生了新的註冊,失效readWriteCacheMap部分快取,且不是清空全部快取
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());       
    } finally {
        read.unlock();
    }
}

這裡只列出了註冊方法,呼叫internalCancel、statusUpdate、deleteStatusOverride方法也會向這個佇列裡面新增資料,當Client發起增量獲取請求的時候,就會從這個佇列裡面消費。

細心考慮的話,會發現register、internalCancel、statusUpdate、deleteStatusOverride這幾個方法,在一個平穩執行的生產環境,這幾個方法都不會經常被呼叫(相信生產環境不會經常有例項發生狀態改變,也不會經常有例項上下線),換句話說,這個佇列裡面基本上會一直保持空的狀態。

回到最初的問題,為什麼Eureka Client預設設定的是開啟增量獲取。

正是因為,正常執行的環境,各個例項的狀態不會經常發生改變,Client不需要經常去做遍歷覆蓋等操作,client端儲存的例項資訊就是最新的,也是沒有改變過的。client預設每30s向Eureka Server 傳送一次獲取例項資訊請求,這30s內發生例項狀態改變的概率還是非常小的。即便服務例項的數量很大,那麼也不會每30s就會發生一次例項狀態改變。

當然,如果服務例項的數量很大(1000個例項),如果關閉了增量獲取,那麼Client每30秒就要從Server端獲取包含這1000個例項的response,也是一件很消耗流量的事情。

另外,這個recentlyChangedQueue佇列會定時更新,有個定時任務會預設每30s執行一次清除操作,會清除30s(預設)前加進來的例項資訊,也就是說Client從這個佇列裡面拿到的都是最多30s*2內發生改變的例項資訊。

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                //預設只保留30s內加入到這個佇列面的item,超過30秒的,全部移除
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

這裡只講了一個大概的流程,有些地方沒有很詳細的講,有興趣的同學可以對照最上面的流程圖去看原始碼,也可以對照著去debug。如果有任何疑問或者講的不夠準確的地方,感謝提出!