Eureka初始化(1)
1. 基於1.X版本的原始碼ExampleEurekaService的main函式啟動。通過配置中介軟體Archaius獲取全域性通用配置例項,例項化MyDataCenterInstanceConfig,父類PropertiesInstanceConfig以及AbstractInstanceConfig,拼接名稱空間以及獲取對應檔案配置例項,返回例項配置instanceConfig
public PropertiesInstanceConfig(String namespace, DataCenterInfo info) { super(info); this.namespace = namespace.endsWith(".") ? namespace : namespace + "."; appGrpNameFromEnv = ConfigurationManager.getConfigInstance() .getString(FALLBACK_APP_GROUP_KEY, Values.UNKNOWN_APPLICATION); this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME); }
構造例項資訊instanceInfo,配置有客戶端的心跳時間,失效時間以及本例項的其他資訊。最後把例項配置和例項資訊儲存在應用資訊管理類ApplicationInfoManager中返回。
@Inject public EurekaConfigBasedInstanceInfoProvider(EurekaInstanceConfig config) { this.config = config; } @Override public synchronized InstanceInfo get() { if (instanceInfo == null) { // Build the lease information to be passed to the server based on config LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); if (vipAddressResolver == null) { vipAddressResolver = new Archaius1VipAddressResolver(); } // Builder the instance information to be registered with eureka server InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver); // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname String instanceId = config.getInstanceId(); if (instanceId == null || instanceId.isEmpty()) { DataCenterInfo dataCenterInfo = config.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { instanceId = ((UniqueIdentifier) dataCenterInfo).getId(); } else { instanceId = config.getHostName(false); } } String defaultAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh AWS data center info, and return up to date address defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false); } else { defaultAddress = config.getHostName(false); } // fail safe if (defaultAddress == null || defaultAddress.isEmpty()) { defaultAddress = config.getIpAddress(); } builder.setNamespace(config.getNamespace()) .setInstanceId(instanceId) .setAppName(config.getAppname()) .setAppGroupName(config.getAppGroupName()) .setDataCenterInfo(config.getDataCenterInfo()) .setIPAddr(config.getIpAddress()) .setHostName(defaultAddress) .setPort(config.getNonSecurePort()) .enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled()) .setSecurePort(config.getSecurePort()) .enablePort(PortType.SECURE, config.getSecurePortEnabled()) .setVIPAddress(config.getVirtualHostName()) .setSecureVIPAddress(config.getSecureVirtualHostName()) .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl()) .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl()) .setASGName(config.getASGName()) .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(), config.getSecureHealthCheckUrl()); // Start off with the STARTING state to avoid traffic if (!config.isInstanceEnabledOnit()) { InstanceStatus initialStatus = InstanceStatus.STARTING; LOG.info("Setting initial instance status as: {}", initialStatus); builder.setStatus(initialStatus); } else { LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise " + "itself as available. You would instead want to control this via a healthcheck handler.", InstanceStatus.UP); } // Add any user-specific metadata information for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) { String key = mapEntry.getKey(); String value = mapEntry.getValue(); builder.add(key, value); } instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); } return instanceInfo; }
2. 初始化DefaultEurekaClientConfig客戶端配置例項,獲取通用配置例項以及DefaultEurekaTransportConfig傳輸配置
public DefaultEurekaClientConfig(String namespace) { this.namespace = namespace.endsWith(".") ? namespace : namespace + "."; this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME); this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance); } public DefaultEurekaTransportConfig(String parentNamespace, DynamicPropertyFactory configInstance) { this.namespace = parentNamespace == null ? SUB_NAMESPACE : (parentNamespace.endsWith(".") ? parentNamespace + SUB_NAMESPACE : parentNamespace + "." + SUB_NAMESPACE); this.configInstance = configInstance; }
初始化DiscoveryClient核心類,傳遞引數有前面初始化的配置管理和備份註冊提供資訊
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
});
}
根據引數AbstractDiscoveryClientOptionalArgs初始化一些健康檢查等配置資訊,應用路徑標識appPathIdentifier,lurl隨機處理器,本地分割槽應用localRegionApps.set(new Applications());Applications物件中的各種成員變數map,雜湊碼,版本增量等,記錄獲取註冊資訊的原子變數fetchRegistryGeneration,遠端分割槽的資訊remoteRegionsToFetch,根絕配置建立有關注冊的監控registryStalenessMonitor和心跳的監控heartbeatStalenessMonitor,否則配置為既不註冊資料也不查詢資料的客戶端。建立週期執行緒池scheduler用來執行心跳和快取的操作,建立心跳和快取重新整理執行緒池heartbeatExecutor,cacheRefreshExecutor
3. 初始化EurekaTransport,執行scheduleServerEndpointTask(eurekaTransport, args);建立TransportClientFactory
// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
eurekaTransport.transportClientFactory = providedJerseyClient == null
? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)
: transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
類Jersey1TransportClientFactories
public TransportClientFactory newTransportClientFactory(EurekaClientConfig clientConfig,
Collection<ClientFilter> additionalFilters, InstanceInfo myInstanceInfo, Optional<SSLContext> sslContext,
Optional<HostnameVerifier> hostnameVerifier) {
final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create(
clientConfig,
additionalFilters,
myInstanceInfo,
new EurekaClientIdentity(myInstanceInfo.getIPAddr()),
sslContext,
hostnameVerifier
);
final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory);
return new TransportClientFactory() {
@Override
public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) {
return metricsFactory.newClient(serviceUrl);
}
@Override
public void shutdown() {
metricsFactory.shutdown();
jerseyFactory.shutdown();
}
};
}
Jersey1TransportClientFactories#newTransportClientFactory=>JerseyEurekaHttpClientFactory#create讀取關於http等相關配置並建立對應的http客戶端工廠
public static JerseyEurekaHttpClientFactory create(EurekaClientConfig clientConfig,
Collection<ClientFilter> additionalFilters,
InstanceInfo myInstanceInfo,
AbstractEurekaIdentity clientIdentity,
Optional<SSLContext> sslContext,
Optional<HostnameVerifier> hostnameVerifier) {
boolean useExperimental = "true".equals(clientConfig.getExperimental("JerseyEurekaHttpClientFactory.useNewBuilder"));
JerseyEurekaHttpClientFactoryBuilder clientBuilder = (useExperimental ? experimentalBuilder() : newBuilder())
.withAdditionalFilters(additionalFilters)
.withMyInstanceInfo(myInstanceInfo)
.withUserAgent("Java-EurekaClient")
.withClientConfig(clientConfig)
.withClientIdentity(clientIdentity);
sslContext.ifPresent(clientBuilder::withSSLContext);
hostnameVerifier.ifPresent(clientBuilder::withHostnameVerifier);
if ("true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) {
clientBuilder.withClientName("DiscoveryClient-HTTPClient-System").withSystemSSLConfiguration();
} else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) {
clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient")
.withProxy(
clientConfig.getProxyHost(), Integer.parseInt(clientConfig.getProxyPort()),
clientConfig.getProxyUserName(), clientConfig.getProxyPassword()
);
} else {
clientBuilder.withClientName("DiscoveryClient-HTTPClient");
}
return clientBuilder.build();
}
在JerseyEurekaHttpClientFactoryBuilder#buildLegacy
private JerseyEurekaHttpClientFactory buildLegacy(Map<String, String> additionalHeaders, boolean systemSSL) {
EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
.withClientName(clientName)
.withUserAgent("Java-EurekaClient")
.withConnectionTimeout(connectionTimeout)
.withReadTimeout(readTimeout)
.withMaxConnectionsPerHost(maxConnectionsPerHost)
.withMaxTotalConnections(maxTotalConnections)
.withConnectionIdleTimeout((int) connectionIdleTimeout)
.withEncoderWrapper(encoderWrapper)
.withDecoderWrapper(decoderWrapper)
.withProxy(proxyHost,String.valueOf(proxyPort),proxyUserName,proxyPassword);
if (systemSSL) {
clientBuilder.withSystemSSLConfiguration();
} else if (sslContext != null) {
clientBuilder.withCustomSSL(sslContext);
}
if (hostnameVerifier != null) {
clientBuilder.withHostnameVerifier(hostnameVerifier);
}
EurekaJerseyClient jerseyClient = clientBuilder.build();
ApacheHttpClient4 discoveryApacheClient = jerseyClient.getClient();
addFilters(discoveryApacheClient);
return new JerseyEurekaHttpClientFactory(jerseyClient, additionalHeaders);
}
在EurekaJerseyClientBuilder#build
public EurekaJerseyClient build() {
MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
try {
return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
} catch (Throwable e) {
throw new RuntimeException("Cannot create Jersey client ", e);
}
}
建立請求協議http和https處理,利用jersey RESTful 框架來實現請求處理,設定編解碼規範,配置其他請求處理屬性。
MyDefaultApacheHttpClient4Config() {
MonitoredConnectionManager cm;
if (systemSSL) {
cm = createSystemSslCM();
} else if (sslContext != null || hostnameVerifier != null || trustStoreFileName != null) {
cm = createCustomSslCM();
} else {
cm = createDefaultSslCM();
}
if (proxyHost != null) {
addProxyConfiguration(cm);
}
DiscoveryJerseyProvider discoveryJerseyProvider = new DiscoveryJerseyProvider(encoderWrapper, decoderWrapper);
getSingletons().add(discoveryJerseyProvider);
// Common properties to all clients
cm.setDefaultMaxPerRoute(maxConnectionsPerHost);
cm.setMaxTotal(maxTotalConnections);
getProperties().put(ApacheHttpClient4Config.PROPERTY_CONNECTION_MANAGER, cm);
String fullUserAgentName = (userAgent == null ? clientName : userAgent) + "/v" + buildVersion();
getProperties().put(CoreProtocolPNames.USER_AGENT, fullUserAgentName);
// To pin a client to specific server in case redirect happens, we handle redirects directly
// (see DiscoveryClient.makeRemoteCall methods).
getProperties().put(PROPERTY_FOLLOW_REDIRECTS, Boolean.FALSE);
getProperties().put(ClientPNames.HANDLE_REDIRECTS, Boolean.FALSE);
}
初始化EurekaJerseyClientImpl用來接收請求處理
public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
ClientConfig clientConfig) {
try {
jerseyClientConfig = clientConfig;
apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);
HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();
HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
HttpConnectionParams.setSoTimeout(params, readTimeout);
this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
} catch (Throwable e) {
throw new RuntimeException("Cannot create Jersey client", e);
}
}
儲存http處理端,啟動週期執行緒池關閉空閒連線,收集http請求的監控資料
public ApacheHttpClientConnectionCleaner(ApacheHttpClient4 apacheHttpClient, final long connectionIdleTimeout) {
this.apacheHttpClient = apacheHttpClient;
this.eurekaConnCleaner.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
cleanIdle(connectionIdleTimeout);
}
},
HTTP_CONNECTION_CLEANER_INTERVAL_MS,
HTTP_CONNECTION_CLEANER_INTERVAL_MS,
TimeUnit.MILLISECONDS
);
MonitorConfig.Builder monitorConfigBuilder = MonitorConfig.builder("Eureka-Connection-Cleaner-Time");
executionTimeStats = new BasicTimer(monitorConfigBuilder.build());
cleanupFailed = new BasicCounter(MonitorConfig.builder("Eureka-Connection-Cleaner-Failure").build());
try {
Monitors.registerObject(this);
} catch (Exception e) {
logger.error("Unable to register with servo.", e);
}
}
給框架Jersey設定過濾器,壓縮資料功能以及設定header,最後儲存在JerseyEurekaHttpClientFactory返回,關於網路請求框架的工廠構造完畢。
private void addFilters(ApacheHttpClient4 discoveryApacheClient) {
// Add gzip content encoding support
discoveryApacheClient.addFilter(new GZIPContentEncodingFilter(false));
// always enable client identity headers
String ip = myInstanceInfo == null ? null : myInstanceInfo.getIPAddr();
AbstractEurekaIdentity identity = clientIdentity == null ? new EurekaClientIdentity(ip) : clientIdentity;
discoveryApacheClient.addFilter(new EurekaIdentityHeaderFilter(identity));
if (additionalFilters != null) {
for (ClientFilter filter : additionalFilters) {
if (filter != null) {
discoveryApacheClient.addFilter(filter);
}
}
}
}