Eureka原始碼分析之Eureka Client獲取例項資訊流程
下方是Eureka Client從Eureka Server獲取例項資訊的總體流程圖,後面會詳細介紹每個步驟。
Eureka Client在剛啟動的時候會從Eureka Server全量獲取一次註冊資訊,同時初始化Eureka Client本地例項資訊快取定時更新任務,預設30s一次 registryFetchIntervalSeconds = 30。
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { //略去部分無用程式碼..... //此時正在建立Client例項物件,也就是說Client剛啟動的時候,全量獲取一次例項資訊儲存到本地 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { //重點方法fetchRegistry(boolean forceFullRegistryFetch) //沒有獲取成功,即fetchRegistry方法返回false,那麼從備份中獲取註冊資訊 //獲取例項資訊異常,可以實現BackupRegistry介面,讓eureka client獲取一些其他的例項資訊 fetchRegistryFromBackup(); } //略去部分無用程式碼..... // finally, init the schedule tasks //初始化排程任務,重新整理Client本地快取、心跳、例項狀態資訊複製 initScheduledTasks(); //略去部分無用程式碼..... }
重點看fetchRegistry(boolean forceFullRegistryFetch)這個方法,引數forceFullRegistryFetch表示是否全量獲取例項資訊,可以通過這個引數配置eureka.client.disable-delta,預設是false,即預設採取增量獲取模式,後面會講增量與全量的區別,以及為什麼預設採取增量模式。
private boolean fetchRegistry(boolean forceFullRegistryFetch) { try { // If the delta is disabled or if it is the first time, get all applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //略去log..... //全量獲取,並儲存到本地快取 getAndStoreFullRegistry(); } else { //增量獲取,並更新本地快取,如果增量獲取失敗,進行一次全量獲取 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); return true; }
主要看一下增量獲取方法的實現,getAndUpdateDelta(Applications applications),delta的意思是數學阿拉伯字元,代表著增量。
注意下面delta == null的條件判斷,如果增量獲取沒有獲取到例項資訊返回的是new Applications(),而不是null,返回null說明發生了異常。只有增量獲取發生了異常,才會再進行一次全量獲取。
private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; //增量獲取,getDelta的實現 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } //如果增量獲取失敗,進行一次全量獲取 //注意這個地方,是獲取失敗了,如果沒有獲取到,那麼返回的是個new Applications(); if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); //根據所有例項的資訊,生成一個HashCode reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason //比較增量的AppsHashCode和更新之後快取applications的HashCode,如果不一致,進行一次全量獲取。 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
//負責讀動作的http請求,全量獲取以及增量獲取
private EurekaHttpClient queryClient;
EurekaHttpClient的實現類很多,最終發起http呼叫的是AbstractJerseyEurekaHttpClient這個類,獲取例項資訊的主要兩個方法
注意下面getApplicationsInternal方法,裡面的try程式碼塊,並沒有catch(並不是我給刪了),甚至連log都不打。也就是說,在獲取例項資訊的時候,即便發生異常也不管,如果發生異常就返回null的Applications。
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
return getApplicationsInternal("apps/delta", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
//省略...
}
}
我們可以在eureka-core工程下的resources包下面找到Eureka Server暴露的REST介面,getDelta對應的介面在ApplicationsResource裡面
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
//省略部分程式碼...
//快取的KEY包含實體的型別、增量或全量獲取/ApplicationName、region資訊、keytype(JSON, XML)
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
//responseCache為ResponseCacheImpl物件,是Eureka Server快取
//包含兩級快取,一級快取為Google Guava Cache,二級快取為ConcurrentMap<Key, Value>
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
再來看這個responseCache.get(cacheKey)方法,直接到ResponseCacheImpl類裡面找get方法
public String get(final Key key) {
return get(key, shouldUseReadOnlyResponseCache);
}
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
//開啟二級快取
//從二級快取readOnlyCacheMap中讀取
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
//如果沒讀到,那麼從一級快取中讀,然後再儲存到二級快取
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
//不開啟二級快取,直接從一級快取readWriteCacheMap中讀取
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
預設先從二級快取中讀取,如果二級快取沒有命中,那麼接著從再去一級快取中讀取,那麼如果一級快取也沒有命中呢,我們再來看看這個readWriteCacheMap,這個採用了Google Guava Cache,在remove的時候可以自定義回撥,在get方法沒有返回值的時候,去呼叫load(Key key)方法將返回值返回並儲存。
//谷歌Guava快取,有自動過期功能,這裡是預設3分鐘
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
//此處從Eureka Server底層的雙層map去取例項資訊
Value value = generatePayload(key);
return value;
}
});
全量獲取和增量獲取的區別就在這兩個方法裡getApplicationsFromMultipleRegions,getApplicationDeltasFromMultipleRegions,這兩個方法在AbstractInstanceRegistry類裡面,這個類可以看做是Eureka Server大部分功能的實現類
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
//全量獲取
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} //增量獲取
else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} //按appName獲取
else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
看看增量獲取的方法getApplicationDeltasFromMultipleRegions內部是怎麼實現的。裡面有一個recentlyChangedQueue,先從本地最近改變的佇列裡面獲取,再從遠端區域的最近改變的佇列裡面獲取。
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
//allKnownRemoteRegions從Eureka Server 配置檔案中配置
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
//從最近改變的佇列裡面獲取最近改變的例項資訊
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
logger.debug("The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
//遍歷所有的remoteRegions
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
//獲取註冊到其他Region的例項資訊
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
那麼這個recentlyChangedQueue裡面都放著什麼樣的資料呢,或者說什麼時候會向這個佇列裡面新增資料呢?
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
//省略部分程式碼...
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
//省略部分程式碼...
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//發生了新的註冊,失效readWriteCacheMap部分快取,且不是清空全部快取
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
}
這裡只列出了註冊方法,呼叫internalCancel、statusUpdate、deleteStatusOverride方法也會向這個佇列裡面新增資料,當Client發起增量獲取請求的時候,就會從這個佇列裡面消費。
細心考慮的話,會發現register、internalCancel、statusUpdate、deleteStatusOverride這幾個方法,在一個平穩執行的生產環境,這幾個方法都不會經常被呼叫(相信生產環境不會經常有例項發生狀態改變,也不會經常有例項上下線),換句話說,這個佇列裡面基本上會一直保持空的狀態。
回到最初的問題,為什麼Eureka Client預設設定的是開啟增量獲取。
正是因為,正常執行的環境,各個例項的狀態不會經常發生改變,Client不需要經常去做遍歷覆蓋等操作,client端儲存的例項資訊就是最新的,也是沒有改變過的。client預設每30s向Eureka Server 傳送一次獲取例項資訊請求,這30s內發生例項狀態改變的概率還是非常小的。即便服務例項的數量很大,那麼也不會每30s就會發生一次例項狀態改變。
當然,如果服務例項的數量很大(1000個例項),如果關閉了增量獲取,那麼Client每30秒就要從Server端獲取包含這1000個例項的response,也是一件很消耗流量的事情。
另外,這個recentlyChangedQueue佇列會定時更新,有個定時任務會預設每30s執行一次清除操作,會清除30s(預設)前加進來的例項資訊,也就是說Client從這個佇列裡面拿到的都是最多30s*2內發生改變的例項資訊。
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
//預設只保留30s內加入到這個佇列面的item,超過30秒的,全部移除
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
這裡只講了一個大概的流程,有些地方沒有很詳細的講,有興趣的同學可以對照最上面的流程圖去看原始碼,也可以對照著去debug。如果有任何疑問或者講的不夠準確的地方,感謝提出!