Eureka客戶端初始化(4)
10. 上接RetryableEurekaHttpClient#execute,繼續執行clientFactory.newClient(currentEndpoint);返回的是new RedirectingEurekaHttpClient(endpoint.getServiceUrl(), delegateFactory, dnsService);requestExecutor.execute(currentHttpClient);即RedirectingEurekaHttpClient#execute
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { EurekaHttpClient currentEurekaClient = delegateRef.get(); if (currentEurekaClient == null) { AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint)); try { EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef); TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get())); return response; } catch (Exception e) { logger.error("Request execution error", e); TransportUtils.shutdown(currentEurekaClientRef.get()); throw e; } } else { try { return requestExecutor.execute(currentEurekaClient); } catch (Exception e) { logger.error("Request execution error", e); delegateRef.compareAndSet(currentEurekaClient, null); currentEurekaClient.shutdown(); throw e; } } }
這裡的factory.newClient(serviceEndpoint)即Jersey1TransportClientFactories#newTransportClientFactory返回的,執行下來是返回的是MetricsCollectingEurekaHttpClient,這裡的delegateFactory是JerseyEurekaHttpClientFactory,執行newClient後返回new JerseyApplicationClient(apacheClient, endpoint.getServiceUrl(), additionalHeaders);
return new MetricsCollectingEurekaHttpClient( delegateFactory.newClient(endpoint), metricsByRequestType, exceptionMetrics, false );
繼續執行
private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor, AtomicReference<EurekaHttpClient> currentHttpClientRef) { URI targetUrl = null; for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) { EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get()); if (httpResponse.getStatusCode() != 302) { if (followRedirectCount == 0) { logger.debug("Pinning to endpoint {}", targetUrl); } else { logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount); } return httpResponse; } targetUrl = getRedirectBaseUri(httpResponse.getLocation()); if (targetUrl == null) { throw new TransportException("Invalid redirect URL " + httpResponse.getLocation()); } currentHttpClientRef.getAndSet(null).shutdown(); currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString()))); } String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl(); logger.warn(message); throw new TransportException(message); }
繼續深度呼叫requestExecutor.execute(currentHttpClientRef.get());即MetricsCollectingEurekaHttpClient#execute
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
Stopwatch stopwatch = requestMetrics.latencyTimer.start();
try {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment();
exceptionsMetric.count(e);
throw e;
} finally {
stopwatch.stop();
}
}
這裡的requestExecutor.execute(delegate)代表JerseyApplicationClient的父類AbstractJerseyEurekaHttpClient執行getApplications,請求服務連結後最後返回結果應用資訊EurekaHttpResponse<Applications>
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
記錄監控所需的各種返回碼狀態指標,如果是重定向302的話需要做額外的處理,在請求一次目標連結,最後返回。回到RetryableEurekaHttpClient#execute,判斷狀態儲存後返回DiscoveryClient#getAndStoreFullRegistry,打亂順序後儲存應用資訊localRegionApps.set(this.filterAndShuffle(apps));儲存應用資訊雜湊碼
private void shuffleInstances(boolean filterUpInstances,
boolean indexByRemoteRegions,
@Nullable Map<String, Applications> remoteRegionsRegistry,
@Nullable EurekaClientConfig clientConfig,
@Nullable InstanceRegionChecker instanceRegionChecker) {
Map<String, VipIndexSupport> secureVirtualHostNameAppMap = new HashMap<>();
Map<String, VipIndexSupport> virtualHostNameAppMap = new HashMap<>();
for (Application application : appNameApplicationMap.values()) {
if (indexByRemoteRegions) {
application.shuffleAndStoreInstances(remoteRegionsRegistry, clientConfig, instanceRegionChecker);
} else {
application.shuffleAndStoreInstances(filterUpInstances);
}
this.addInstancesToVIPMaps(application, virtualHostNameAppMap, secureVirtualHostNameAppMap);
}
shuffleAndFilterInstances(virtualHostNameAppMap, filterUpInstances);
shuffleAndFilterInstances(secureVirtualHostNameAppMap, filterUpInstances);
this.virtualHostNameAppMap.putAll(virtualHostNameAppMap);
this.virtualHostNameAppMap.keySet().retainAll(virtualHostNameAppMap.keySet());
this.secureVirtualHostNameAppMap.putAll(secureVirtualHostNameAppMap);
this.secureVirtualHostNameAppMap.keySet().retainAll(secureVirtualHostNameAppMap.keySet());
}
11. DiscoveryClient#fetchRegistry釋出快取更新事件onCacheRefreshed();釋出遠端狀態更新事件updateInstanceRemoteStatus();如果查詢失敗則請求備用註冊地址fetchRegistryFromBackup();獲取應用資訊儲存起來。判斷是否在初始化時就向伺服器註冊本機資訊。
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
12. 最後,初始化排程任務(例如叢集解析器、心跳、例項化資訊複製器、快取重新整理,狀態變化監聽器)
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}