【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!
前言
前情回顧
上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
本講目錄
回頭看了下之前的部落格,沒有一個總目錄說明,每篇都是直接原始碼分析了。從此篇文章開始都會加上目錄,以及文章最後會加上總結及讀此篇原始碼的感受。希望這個部落格系列的文章會越來越好。
目錄如下:
- client端第一次註冊全量抓取登錄檔的邏輯
- server端返回登錄檔資訊集合的多級快取機制
- server端登錄檔多級快取過期機制:主動+定時+被動
- client端增量抓取登錄檔邏輯
技術亮點:
- 登錄檔抓取的多級快取機制
- 增量抓取返回的全量資料hashCode,和本地資料hashCode對比,保證資料一致性
這裡再囉嗦一點,之前一直吐槽EurekaClient註冊的邏輯,今天看了EurekaClient登錄檔抓取的邏輯後,不由的感嘆設計的精妙之處,這裡說的精妙是指EurekaServer端對於登錄檔讀取邏輯的設計,快取邏輯以及增量獲取時Hash一致性的判斷,真的很妙,感覺又學到了不少東西。讀完這段程式碼 一大早就很興奮,哈哈哈,一起看看吧。
說明
原創不易,如若轉載 請標明來源:一枝花算不算浪漫
EurekaClient全量抓取登錄檔邏輯
一直在想著怎麼才能把自己看完程式碼後的理解用文字表達出來,這裡採用一種新模式吧,先畫圖,然後原始碼,然後解讀。
圖片看起來很簡單,Client傳送Http請求給Server端,Server端返回全量的登錄檔資訊給Client端。接下來就是跟進程式碼一步步分析,這裡先有個大概印象
原始碼解析
- Client端傳送獲取全量登錄檔請求
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 省略很多無關程式碼 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } } private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // 刪減掉一些程式碼 // registry was fetched successfully, so return true return true; } private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
這裡就不再贅述Client端是如何一步步跟進到發請求的程式碼的,因為之前通過單元測試程式碼已經搞清楚了Server端接受請求的類是ApplicationsResource.java
, Client端主要核心的程式碼也在 DiscoveryClient.java
中。
程式碼還是之前看了好多遍的祖傳程式碼,只是省略了很多內容,只展示我們需要分析的地方。
clientConfig.shouldFetchRegistry()
這個配置預設是true,然後fetchRegistry
方法中getAndStoreFullRegistry()
,因為第一次都是獲取全量登錄檔資訊,繼續往後。
getAndStoreFullRegistry
方法中可以看到就是傳送Http請求給Server端,然後等待Server端返回全量登錄檔資訊。
這裡獲取全量請求執行的是eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
然後再一路往下,跟蹤到 AbstractJersey2EurekaHttpClient.java
中,getApplicationsInternal
方法,發下傳送的是GET
請求,於是到Server端ApplicationsResource.java
中的GET
方法getContainers
中檢視邏輯
server端返回登錄檔資訊集合的多級快取機制
上面已經看了Client端 傳送抓取全量登錄檔的邏輯,到了Server端檢視ApplicationsResource.java
中的GET
方法getContainers
,接著看看這部分的原始碼
private final ResponseCache responseCache;
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// 省略部分程式碼
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
這裡接收到Client端的請求後,會去responseCache
中去拿去全量的資料資訊。
從屬性名字就可以看出來,這個是從快取中獲取資料。
ResponseCacheImpl.java
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
這裡主要關注getValue
方法,這裡主要有兩個map,一個是readOnlyCacheMap
另一個是readWriteCacheMap
, 這裡我們光看名字就可以知道一個是隻讀快取,一個是讀寫快取,這裡用了兩層的快取結構,如果只讀快取不為空 則直接返回,如果為空查詢可讀快取。
關於快取的講解 我們繼續往下看。
server端登錄檔多級快取過期機制:主動+定時+被動
繼續看快取相關,用到了多級快取這裡可能就會存在一些疑問:
- 兩級快取資料如何儲存同步?
- 快取資料如何過期?
帶著疑問我們來繼續看原始碼
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
// 省略部分程式碼
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
// 省略部分程式碼
}
readOnlyCacheMap
用的是ConcurrentHashMap,執行緒安全的。
readWriteCacheMap
用的是GuavaCache,不懂的小夥伴可以自己閱讀以下,我之前的部落格也有講解這個,這個是谷歌開源的Guava專案基於記憶體的快取,其內部也是實現的Map結構。主要重點我們來看下GuavaCache,這裡初始化大小是
serverConfig.getInitialCapacityOfResponseCache()
預設是1000,也是Map的初始大小。
expireAfterWrite
重新整理時間是serverConfig.getResponseCacheAutoExpirationInSeconds()
預設時間是180s。
接著是build方法,這裡獲取登錄檔資訊就是用的generatePayload
方法,如果查詢readWriteCacheMap中登錄檔資訊為空,這會執行build方法。
繼續跟進generatePayload
方法:
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
}
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
這個程式碼刪減了一部分,到時增量抓取登錄檔也會走這個邏輯,ALL_APPS
就是全量抓取,ALL_APPS_DELTA
就是增量抓取的意思,這裡先插個眼,一會增量抓取登錄檔的邏輯再回頭看。
上面的邏輯我們只需要關注registry.getApplicationsFromMultipleRegions
即可,這個是獲取登錄檔的邏輯。接著繼續往下跟程式碼:
AbstractInstanceRegistry.java
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
這裡再看到 registry.entrySet()
是不是會特別親切?Map<String, Map<String, Lease<InstanceInfo>>
我們上一篇講Client註冊的時候 就是將註冊資訊放入到registry對應這個資料結構中的,果不其然,這裡拿到所有的註冊資訊,然後封裝到Applications
物件中的。
這裡最後apps.setAppsHashCode()
邏輯,先插個眼 後面講增量同步有類似的邏輯,後面再回頭看。接著再回頭看 返回資料後 readWriteCacheMap
的操作邏輯。
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
這裡是起了一個排程任務,會去定時比較一級和二級快取是否一致,如果不一致 就會用二級快取覆蓋一級快取。這就回答了上面的第一個問題,兩級快取一致性的問題,預設30s執行一次。所以這裡仍會有問題,可能快取在30s內會存在不一致的情況,這裡用的是最終一致的思想。
緊接著 讀寫快取獲取到資料後再去回寫只讀快取,這是上面ResponseCacheImpl.java
的邏輯,到了這裡 全量抓取登錄檔的程式碼都已經看完了,這裡主要的亮點是使用了兩級快取策略來返回對應的資料。
接著整理下過期的幾個機制,也是迴應上面丟擲的第二個問題。
用一張圖作為總結:
主動過期
readWriteCacheMap,讀寫快取有新的服務例項發生註冊、下線、故障的時候,就會去重新整理readWriteCacheMap(在Client註冊的時候,AbstractInstanceRegistry中register方法最後會有一個invalidateCache()方法)
比如說現在有一個服務A,ServiceA,有一個新的服務例項,Instance010來註冊了,註冊完了之後,其實必須是得重新整理這個快取的,然後就會呼叫ResponseCache.invalidate(),將之前快取好的ALL_APPS這個key對應的快取,給他過期掉
將readWriteCacheMap中的ALL_APPS快取key,對應的快取給過期掉
定時過期
readWriteCacheMap在構建的時候,指定了一個自動過期的時間,預設值就是180秒,所以你往readWriteCacheMap中放入一個數據過後,自動會等180秒過後,就將這個資料給他過期了
被動過期
readOnlyCacheMap怎麼過期呢?
預設是每隔30秒,執行一個定時排程的執行緒任務,TimerTask,有一個邏輯,會每隔30秒,對readOnlyCacheMap和readWriteCacheMap中的資料進行一個比對,如果兩塊資料是不一致的,那麼就將readWriteCacheMap中的資料放到readOnlyCacheMap中來。比如說readWriteCacheMap中,ALL_APPS這個key對應的快取沒了,那麼最多30秒過後,就會同步到readOnelyCacheMap中去。
client端增量抓取登錄檔邏輯
上面抓取全量登錄檔的程式碼已經說了,這裡來講一下增量抓取,入口還是在DiscoverClient.java
中,當初始化完DiscoverClient.java
後會執行一個初始化定時任務的方法initScheduledTasks()
, 其中這個裡面就會每隔30s 增量抓取一次登錄檔資訊。
這裡就不跟著這裡的邏輯一步步看了,看過上面的程式碼後 應該會對這裡比較清晰了,這裡我們直接看Server端程式碼了。
還記的我們上面插過的眼,獲取全量用的是ALL_APPS
增量用的是ALL_APPS_DELTA
, 所以我們這裡只看增量的邏輯就行了。
else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
}
上面只是截取了部分程式碼,這裡直接看主要的邏輯registry.getApplicationDeltasFromMultipleRegions
即可,這個和全量的方法名只有一個Deltas的區別。
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
logger.debug("The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
這裡程式碼還是比較多的,我們只需要抓住重點即可:
- 從
recentlyChangedQueue
中獲取註冊資訊,從名字可以看出來 這是最近改變的client註冊資訊的佇列 - 使用writeLock,因為這裡是獲取增量註冊資訊,是從佇列中獲取,如果不加寫鎖,那麼獲取的時候又有新資料加入佇列中,新資料會獲取不到的
基於上面第一點,我們來看看這個佇列怎麼做的:
- 資料結構:
ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
AbstractInstanceRegistry.java
初始化的時候會啟動一個定時任務,預設30s中執行一次。如果註冊時間小於當前時間的180s,就會放到這個佇列中
AbstractInstanceRegistry.java
具體程式碼如下:
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
這裡就能看明白了,也就是說增量抓取會獲取EurekaServer端3分鐘內儲存的變動的Client資訊。
最後還有一個亮點,我們上面說過,無論是全量抓取還是增量抓取,最後都會返回一個全量登錄檔的hash值,程式碼是apps.setAppsHashCode(allApps.getReconcileHashCode());
, 其中apps就是返回的Applications
中的屬性,最後我們再看看這個hashCode的用法。
回到DiscoveryClient.java
, 找到refreshRegistry
方法,然後一路跟蹤到getAndUpdateDelta
方法,這裡具體程式碼我就不貼了,流程如下:
- 獲取delta增量資料
- 根據增量資料和本地登錄檔資料進行合併
- 計算中本地登錄檔資訊的hashCode值
- 如果本地hashCode值和server端返回的hashCode值不一致則再全量獲取一次登錄檔資訊
最後一張圖總結增量登錄檔抓取邏輯:
總結&感悟
這篇文章寫得有點長了,確實自己也很用心去寫了,我感覺這裡多級快取機制+增量資料Hash一致性的對比方案做的很優秀,如果要我做一個數據全量+增量同步 我也會借鑑這種方案。
看原始碼 能夠學到的就是別人的設計思想。總結的部分可以看上面的一些圖,登錄檔抓取的原始碼學習就到這了,後面 還準備看下心跳機制、保護機制、叢集等等一些的原始碼。
這裡讀完原始碼之後會發下一個問題:
假設有服務例項註冊、下線、故障,要呼叫這個服務的其他服務,可能會過30秒之後才能感知倒,為什麼呢?因為這裡再獲取服務登錄檔的時候,有一個多級快取的機制,最多是30秒後才會去更新一級快取。
申明
本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫
相關推薦
【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!
前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI
【一起學原始碼-微服務】Nexflix Eureka 原始碼二:EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取
前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /
【一起學原始碼-微服務】Nexflix Eureka 原始碼三:EurekaServer啟動之EurekaServer上下文EurekaClient建立
前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli
【一起學原始碼-微服務】Nexflix Eureka 原始碼六:在眼花繚亂的程式碼中,EurekaClient是如何註冊的?
前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka
【一起學原始碼-微服務】Nexflix Eureka 原始碼七:通過單元測試來Debug Eureka註冊過程
前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。
【一起學原始碼-微服務】Nexflix Eureka 原始碼九:服務續約原始碼分析
前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著
【一起學原始碼-微服務】Nexflix Eureka 原始碼十:服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?
前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註
【一起學原始碼-微服務】Nexflix Eureka 原始碼十一:EurekaServer自我保護機制竟然有這麼多Bug?
前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下
【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析
前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收
【一起學原始碼-微服務】Nexflix Eureka 原始碼十三:Eureka原始碼解讀完結撒花篇~!
前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的
【一起學原始碼-微服務】Ribbon 原始碼一:Ribbon概念理解及Demo除錯
前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類
【一起學原始碼-微服務】Ribbon 原始碼二:通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析
前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的
【一起學原始碼-微服務】Ribbon 原始碼三:Ribbon與Eureka整合原理分析
前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下
【一起學原始碼-微服務】Ribbon 原始碼四:進一步探究Ribbon的IRule和IPing
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Ribbon原始碼五:Ribbon原始碼解讀彙總篇~
前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F
【一起學原始碼-微服務】Feign 原始碼一:原始碼初探,通過Demo Debug Feign原始碼
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Feign 原始碼二:Feign動態代理構造過程
前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr
【一起學原始碼-微服務】Feign 原始碼三:Feign結合Ribbon實現負載均衡的原理分析
前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d
【一起學原始碼-微服務】Hystrix 原始碼一:Hystrix基礎原理與Demo搭建
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe
【一起學原始碼-微服務】Hystrix 原始碼二:Hystrix核心流程:Hystix非降級邏輯流程梳理
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr