1. 程式人生 > >【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!

【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!

前言

前情回顧

上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()

本講目錄

回頭看了下之前的部落格,沒有一個總目錄說明,每篇都是直接原始碼分析了。從此篇文章開始都會加上目錄,以及文章最後會加上總結及讀此篇原始碼的感受。希望這個部落格系列的文章會越來越好。

目錄如下:

  1. client端第一次註冊全量抓取登錄檔的邏輯
  2. server端返回登錄檔資訊集合的多級快取機制
  3. server端登錄檔多級快取過期機制:主動+定時+被動
  4. client端增量抓取登錄檔邏輯

技術亮點:

  1. 登錄檔抓取的多級快取機制
  2. 增量抓取返回的全量資料hashCode,和本地資料hashCode對比,保證資料一致性

這裡再囉嗦一點,之前一直吐槽EurekaClient註冊的邏輯,今天看了EurekaClient登錄檔抓取的邏輯後,不由的感嘆設計的精妙之處,這裡說的精妙是指EurekaServer端對於登錄檔讀取邏輯的設計,快取邏輯以及增量獲取時Hash一致性的判斷,真的很妙,感覺又學到了不少東西。讀完這段程式碼 一大早就很興奮,哈哈哈,一起看看吧。

說明

原創不易,如若轉載 請標明來源:一枝花算不算浪漫

EurekaClient全量抓取登錄檔邏輯

一直在想著怎麼才能把自己看完程式碼後的理解用文字表達出來,這裡採用一種新模式吧,先畫圖,然後原始碼,然後解讀。

圖片看起來很簡單,Client傳送Http請求給Server端,Server端返回全量的登錄檔資訊給Client端。接下來就是跟進程式碼一步步分析,這裡先有個大概印象

原始碼解析

  1. Client端傳送獲取全量登錄檔請求
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {

    // 省略很多無關程式碼

    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

}

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // 刪減掉一些程式碼

    // registry was fetched successfully, so return true
    return true;
}

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

這裡就不再贅述Client端是如何一步步跟進到發請求的程式碼的,因為之前通過單元測試程式碼已經搞清楚了Server端接受請求的類是ApplicationsResource.java, Client端主要核心的程式碼也在 DiscoveryClient.java中。

程式碼還是之前看了好多遍的祖傳程式碼,只是省略了很多內容,只展示我們需要分析的地方。
clientConfig.shouldFetchRegistry() 這個配置預設是true,然後fetchRegistry方法中getAndStoreFullRegistry(),因為第一次都是獲取全量登錄檔資訊,繼續往後。

getAndStoreFullRegistry 方法中可以看到就是傳送Http請求給Server端,然後等待Server端返回全量登錄檔資訊。

這裡獲取全量請求執行的是eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())

然後再一路往下,跟蹤到 AbstractJersey2EurekaHttpClient.java中,getApplicationsInternal方法,發下傳送的是GET請求,於是到Server端ApplicationsResource.java中的GET方法getContainers中檢視邏輯

server端返回登錄檔資訊集合的多級快取機制

上面已經看了Client端 傳送抓取全量登錄檔的邏輯,到了Server端檢視ApplicationsResource.java中的GET方法getContainers,接著看看這部分的原始碼

private final ResponseCache responseCache;

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

    // 省略部分程式碼

    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey))
                .build();
    }
    CurrentRequestVersion.remove();
    return response;
}

這裡接收到Client端的請求後,會去responseCache 中去拿去全量的資料資訊。
從屬性名字就可以看出來,這個是從快取中獲取資料。

ResponseCacheImpl.java

String get(final Key key, boolean useReadOnlyCache) {
    Value payload = getValue(key, useReadOnlyCache);
    if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
        return null;
    } else {
        return payload.getPayload();
    }
}

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
}

這裡主要關注getValue方法,這裡主要有兩個map,一個是readOnlyCacheMap 另一個是readWriteCacheMap, 這裡我們光看名字就可以知道一個是隻讀快取,一個是讀寫快取,這裡用了兩層的快取結構,如果只讀快取不為空 則直接返回,如果為空查詢可讀快取。

關於快取的講解 我們繼續往下看。

server端登錄檔多級快取過期機制:主動+定時+被動

繼續看快取相關,用到了多級快取這裡可能就會存在一些疑問:

  1. 兩級快取資料如何儲存同步?
  2. 快取資料如何過期?

帶著疑問我們來繼續看原始碼

private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    // 省略部分程式碼

    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            Value value = generatePayload(key);
                            return value;
                        }
                    });

    // 省略部分程式碼

}
  1. readOnlyCacheMap用的是ConcurrentHashMap,執行緒安全的。
    readWriteCacheMap用的是GuavaCache,不懂的小夥伴可以自己閱讀以下,我之前的部落格也有講解這個,這個是谷歌開源的Guava專案基於記憶體的快取,其內部也是實現的Map結構。

  2. 主要重點我們來看下GuavaCache,這裡初始化大小是serverConfig.getInitialCapacityOfResponseCache() 預設是1000,也是Map的初始大小。
    expireAfterWrite 重新整理時間是serverConfig.getResponseCacheAutoExpirationInSeconds()預設時間是180s。
    接著是build方法,這裡獲取登錄檔資訊就是用的generatePayload方法,如果查詢readWriteCacheMap中登錄檔資訊為空,這會執行build方法。

繼續跟進generatePayload方法:

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                }
                break;
        }
        return new Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
}

這個程式碼刪減了一部分,到時增量抓取登錄檔也會走這個邏輯,ALL_APPS就是全量抓取,ALL_APPS_DELTA就是增量抓取的意思,這裡先插個眼,一會增量抓取登錄檔的邏輯再回頭看。

上面的邏輯我們只需要關注registry.getApplicationsFromMultipleRegions 即可,這個是獲取登錄檔的邏輯。接著繼續往下跟程式碼:

AbstractInstanceRegistry.java

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

    Applications apps = new Applications();
    apps.setVersion(1L);
    for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
        Application app = null;

        if (entry.getValue() != null) {
            for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                if (app == null) {
                    app = new Application(lease.getHolder().getAppName());
                }
                app.addInstance(decorateInstanceInfo(lease));
            }
        }
        if (app != null) {
            apps.addApplication(app);
        }
    }
    if (includeRemoteRegion) {
        for (String remoteRegion : remoteRegions) {
            RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
            if (null != remoteRegistry) {
                Applications remoteApps = remoteRegistry.getApplications();
                for (Application application : remoteApps.getRegisteredApplications()) {
                    if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                        logger.info("Application {}  fetched from the remote region {}",
                                application.getName(), remoteRegion);

                        Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                        if (appInstanceTillNow == null) {
                            appInstanceTillNow = new Application(application.getName());
                            apps.addApplication(appInstanceTillNow);
                        }
                        for (InstanceInfo instanceInfo : application.getInstances()) {
                            appInstanceTillNow.addInstance(instanceInfo);
                        }
                    } else {
                        logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                        + "whitelist and this app is not in the whitelist.",
                                application.getName(), remoteRegion);
                    }
                }
            } else {
                logger.warn("No remote registry available for the remote region {}", remoteRegion);
            }
        }
    }
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}

這裡再看到 registry.entrySet()是不是會特別親切?Map<String, Map<String, Lease<InstanceInfo>> 我們上一篇講Client註冊的時候 就是將註冊資訊放入到registry對應這個資料結構中的,果不其然,這裡拿到所有的註冊資訊,然後封裝到Applications 物件中的。

這裡最後apps.setAppsHashCode()邏輯,先插個眼 後面講增量同步有類似的邏輯,後面再回頭看。接著再回頭看 返回資料後 readWriteCacheMap 的操作邏輯。

if (shouldUseReadOnlyResponseCache) {
    timer.schedule(getCacheUpdateTask(),
            new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                    + responseCacheUpdateIntervalMs),
            responseCacheUpdateIntervalMs);
}

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

這裡是起了一個排程任務,會去定時比較一級和二級快取是否一致,如果不一致 就會用二級快取覆蓋一級快取。這就回答了上面的第一個問題,兩級快取一致性的問題,預設30s執行一次。所以這裡仍會有問題,可能快取在30s內會存在不一致的情況,這裡用的是最終一致的思想。

緊接著 讀寫快取獲取到資料後再去回寫只讀快取,這是上面ResponseCacheImpl.java 的邏輯,到了這裡 全量抓取登錄檔的程式碼都已經看完了,這裡主要的亮點是使用了兩級快取策略來返回對應的資料。

接著整理下過期的幾個機制,也是迴應上面丟擲的第二個問題。

用一張圖作為總結:

  1. 主動過期
    readWriteCacheMap,讀寫快取

    有新的服務例項發生註冊、下線、故障的時候,就會去重新整理readWriteCacheMap(在Client註冊的時候,AbstractInstanceRegistry中register方法最後會有一個invalidateCache()方法)

    比如說現在有一個服務A,ServiceA,有一個新的服務例項,Instance010來註冊了,註冊完了之後,其實必須是得重新整理這個快取的,然後就會呼叫ResponseCache.invalidate(),將之前快取好的ALL_APPS這個key對應的快取,給他過期掉

    將readWriteCacheMap中的ALL_APPS快取key,對應的快取給過期掉

  2. 定時過期

    readWriteCacheMap在構建的時候,指定了一個自動過期的時間,預設值就是180秒,所以你往readWriteCacheMap中放入一個數據過後,自動會等180秒過後,就將這個資料給他過期了

  3. 被動過期

    readOnlyCacheMap怎麼過期呢?
    預設是每隔30秒,執行一個定時排程的執行緒任務,TimerTask,有一個邏輯,會每隔30秒,對readOnlyCacheMap和readWriteCacheMap中的資料進行一個比對,如果兩塊資料是不一致的,那麼就將readWriteCacheMap中的資料放到readOnlyCacheMap中來。

    比如說readWriteCacheMap中,ALL_APPS這個key對應的快取沒了,那麼最多30秒過後,就會同步到readOnelyCacheMap中去。

client端增量抓取登錄檔邏輯

上面抓取全量登錄檔的程式碼已經說了,這裡來講一下增量抓取,入口還是在DiscoverClient.java
中,當初始化完DiscoverClient.java 後會執行一個初始化定時任務的方法initScheduledTasks(), 其中這個裡面就會每隔30s 增量抓取一次登錄檔資訊。

這裡就不跟著這裡的邏輯一步步看了,看過上面的程式碼後 應該會對這裡比較清晰了,這裡我們直接看Server端程式碼了。

還記的我們上面插過的眼,獲取全量用的是ALL_APPS 增量用的是ALL_APPS_DELTA, 所以我們這裡只看增量的邏輯就行了。

else if (ALL_APPS_DELTA.equals(key.getName())) {
    if (isRemoteRegionRequested) {
        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
        versionDeltaWithRegions.incrementAndGet();
        versionDeltaWithRegionsLegacy.incrementAndGet();
        payload = getPayLoad(key,
                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
    } else {
        tracer = serializeDeltaAppsTimer.start();
        versionDelta.incrementAndGet();
        versionDeltaLegacy.incrementAndGet();
        payload = getPayLoad(key, registry.getApplicationDeltas());
    }
}

上面只是截取了部分程式碼,這裡直接看主要的邏輯registry.getApplicationDeltasFromMultipleRegions即可,這個和全量的方法名只有一個Deltas的區別。

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = allKnownRemoteRegions; // null means all remote regions.
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;

    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        GET_ALL_CACHE_MISS_DELTA.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        write.lock();
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            logger.debug("The instance id {} is found with status {} and actiontype {}",
                    instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
        }

        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                Application appInstanceTillNow =
                                        apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                }
                            }
                        }
                    }
                }
            }
        }

        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

這裡程式碼還是比較多的,我們只需要抓住重點即可:

  1. recentlyChangedQueue中獲取註冊資訊,從名字可以看出來 這是最近改變的client註冊資訊的佇列
  2. 使用writeLock,因為這裡是獲取增量註冊資訊,是從佇列中獲取,如果不加寫鎖,那麼獲取的時候又有新資料加入佇列中,新資料會獲取不到的

基於上面第一點,我們來看看這個佇列怎麼做的:

  1. 資料結構:ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
  2. AbstractInstanceRegistry.java初始化的時候會啟動一個定時任務,預設30s中執行一次。如果註冊時間小於當前時間的180s,就會放到這個佇列中

AbstractInstanceRegistry.java具體程式碼如下:

protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    this.serverConfig = serverConfig;
    this.clientConfig = clientConfig;
    this.serverCodecs = serverCodecs;
    this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
    this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);

    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            serverConfig.getDeltaRetentionTimerIntervalInMs(),
            serverConfig.getDeltaRetentionTimerIntervalInMs());
}

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

這裡就能看明白了,也就是說增量抓取會獲取EurekaServer端3分鐘內儲存的變動的Client資訊。
最後還有一個亮點,我們上面說過,無論是全量抓取還是增量抓取,最後都會返回一個全量登錄檔的hash值,程式碼是apps.setAppsHashCode(allApps.getReconcileHashCode());, 其中apps就是返回的Applications中的屬性,最後我們再看看這個hashCode的用法。

回到DiscoveryClient.java, 找到refreshRegistry 方法,然後一路跟蹤到getAndUpdateDelta方法,這裡具體程式碼我就不貼了,流程如下:

  1. 獲取delta增量資料
  2. 根據增量資料和本地登錄檔資料進行合併
  3. 計算中本地登錄檔資訊的hashCode值
  4. 如果本地hashCode值和server端返回的hashCode值不一致則再全量獲取一次登錄檔資訊

最後一張圖總結增量登錄檔抓取邏輯:

總結&感悟

這篇文章寫得有點長了,確實自己也很用心去寫了,我感覺這裡多級快取機制+增量資料Hash一致性的對比方案做的很優秀,如果要我做一個數據全量+增量同步 我也會借鑑這種方案。

看原始碼 能夠學到的就是別人的設計思想。總結的部分可以看上面的一些圖,登錄檔抓取的原始碼學習就到這了,後面 還準備看下心跳機制、保護機制、叢集等等一些的原始碼。

這裡讀完原始碼之後會發下一個問題:

假設有服務例項註冊、下線、故障,要呼叫這個服務的其他服務,可能會過30秒之後才能感知倒,為什麼呢?因為這裡再獲取服務登錄檔的時候,有一個多級快取的機制,最多是30秒後才會去更新一級快取。

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

相關推薦

一起原始碼-服務Nexflix Eureka 原始碼EurekaClient 精妙設計分析

前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取

前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之EurekaServer上下文EurekaClient建立

前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli

一起原始碼-服務Nexflix Eureka 原始碼在眼花繚亂的程式碼中,EurekaClient是如何註冊的?

前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka

一起原始碼-服務Nexflix Eureka 原始碼通過單元測試來Debug Eureka註冊過程

前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。

一起原始碼-服務Nexflix Eureka 原始碼服務續約原始碼分析

前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著

一起原始碼-服務Nexflix Eureka 原始碼服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?

前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註

一起原始碼-服務Nexflix Eureka 原始碼十一EurekaServer自我保護機制竟然有這麼多Bug?

前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下

一起原始碼-服務Nexflix Eureka 原始碼十二EurekaServer叢集模式原始碼分析

前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收

一起原始碼-服務Nexflix Eureka 原始碼十三Eureka原始碼解讀完結撒花篇~

前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的

一起原始碼-服務Ribbon 原始碼Ribbon概念理解及Demo除錯

前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類

一起原始碼-服務Ribbon 原始碼通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析

前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的

一起原始碼-服務Ribbon 原始碼Ribbon與Eureka整合原理分析

前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下

一起原始碼-服務Ribbon 原始碼進一步探究Ribbon的IRule和IPing

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Ribbon原始碼Ribbon原始碼解讀彙總篇~

前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F

一起原始碼-服務Feign 原始碼原始碼初探,通過Demo Debug Feign原始碼

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Feign 原始碼Feign動態代理構造過程

前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr

一起原始碼-服務Feign 原始碼Feign結合Ribbon實現負載均衡的原理分析

前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d

一起原始碼-服務Hystrix 原始碼Hystrix基礎原理與Demo搭建

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix非降級邏輯流程梳理

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr