【SpringCloud Eureka原始碼】從Eureka Client發起註冊請求到Eureka Server處理的整個服務註冊過程(上)
目錄
- Eureka Client啟動並呼叫Eureka Server的註冊介面
本文使用Spring Cloud Eureka分析
Spring Cloud版本: Dalston.SR5
spring-cloud-starter-eureka版本: 1.3.6.RELEASE
netflix eureka版本: 1.6.2
Eureka Client啟動並呼叫Eureka Server的註冊介面
Spring Cloud Eureka的自動配置
@EnableDiscoveryClient
首先從使用Eureka Client必須引入的@EnableDiscoveryClient
註解說起
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/**
* If true, the ServiceRegistry will automatically register the local server.
*/
boolean autoRegister() default true;
}
@EnableDiscoveryClient
註解的作用:
autoRegister預設值為true,即服務發現客戶端預設會自動註冊到服務端
Import匯入
EnableDiscoveryClientImportSelector.class
,其作用是匯入了 spring-cloud-eureka-client.jar!\META-INF\spring.factories 中的
EurekaDiscoveryClientConfiguration
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
由於autoRegister預設為true,故還會匯入
AutoServiceRegistrationConfiguration
,即啟用自動服務註冊的配置,等同於在配置檔案中spring.cloud.service-registry.auto-registration.enabled = true
EurekaDiscoveryClientConfiguration
- 向Spring容器註冊EurekaDiscoveryClientConfiguration.Marker.class,使得真正匯入DiscoveryClient的
EurekaClientAutoConfiguration
配置類滿足啟用條件 - 建立監聽
RefreshScopeRefreshedEvent
事件的監聽器,滿足在使用RefreshScope重新整理時可以重建EurekaClient(不是本文重點) - 在配置
eureka.client.healthcheck.enabled=true
的前提下,向Spring容器註冊EurekaHealthCheckHandler
用於健康檢查(不是本文重點)
所以,EurekaDiscoveryClientConfiguration的主要作用是向Spring容器註冊EurekaDiscoveryClientConfiguration.Marker.class,使得EurekaClientAutoConfiguration
配置類滿足啟用條件
EurekaClientAutoConfiguration
EurekaClientAutoConfiguration
配置類中涉及的內容比較多,主要內容:
- 1、註冊了spring cloud包下的
EurekaClientConfigBean
,這是個對netflix的EurekaClientConfig
客戶端配置介面的實現 - 2、註冊了spring cloud包下的
EurekaInstanceConfigBean
,這是個對netflix的EurekaInstanceConfig
例項資訊配置介面的實現 - 3、註冊了一些AutoServiceRegistration,即客戶端自動註冊的元件,如
EurekaRegistration
: Eureka例項的服務註冊資訊(在開啟客戶端自動註冊時才會註冊)EurekaServiceRegistry
: Eureka服務註冊器EurekaAutoServiceRegistration
: Eureka服務自動註冊器,實現了SmartLifecycle,會在Spring容器的refresh的最後階段被呼叫,通過EurekaServiceRegistry
註冊器註冊EurekaRegistration
資訊
- 4、註冊netflix的
EurekaClient
和ApplicationInfoManager
,註冊時分為兩種情況,即是否滿足RefreshScope,如果滿足,注入的Bean是帶有 @Lazy + @RefreshScope 註解的ApplicationInfoManager
: 管理並初始化當前Instance例項的註冊資訊,並提供了例項狀態監聽機制EurekaClient
: netflix的介面類,用於和Eureka Server互動的客戶端,而netflix的預設實現是DiscoveryClient
,也是本文分析的重點
- 5、註冊
EurekaHealthIndicator
,為/health端點提供Eureka相關資訊,主要有Status當前例項狀態和applications服務列表,在從Eureka Server獲取服務列表正常的情況下,Status使用Eureka Server上的InstanceRemoteStatus,不正常情況下,程式碼中有一些判斷邏輯
public class EurekaClientAutoConfiguration {
...省略
/**
* 1、註冊EurekaClientConfigBean
*/
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean() {
EurekaClientConfigBean client = new EurekaClientConfigBean();
if ("bootstrap".equals(propertyResolver.getProperty("spring.config.name"))) {
// We don't register during bootstrap by default, but there will be another
// chance later.
client.setRegisterWithEureka(false);
}
return client;
}
/**
* 2、註冊EurekaInstanceConfigBean
*/
@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) throws MalformedURLException {
PropertyResolver eurekaPropertyResolver = new RelaxedPropertyResolver(this.env, "eureka.instance.");
String hostname = eurekaPropertyResolver.getProperty("hostname");
boolean preferIpAddress = Boolean.parseBoolean(eurekaPropertyResolver.getProperty("preferIpAddress"));
int nonSecurePort = Integer.valueOf(propertyResolver.getProperty("server.port", propertyResolver.getProperty("port", "8080")));
int managementPort = Integer.valueOf(propertyResolver.getProperty("management.port", String.valueOf(nonSecurePort)));
String managementContextPath = propertyResolver.getProperty("management.contextPath", propertyResolver.getProperty("server.contextPath", "/"));
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setNonSecurePort(nonSecurePort);
instance.setInstanceId(getDefaultInstanceId(propertyResolver));
instance.setPreferIpAddress(preferIpAddress);
if (managementPort != nonSecurePort && managementPort != 0) {
if (StringUtils.hasText(hostname)) {
instance.setHostname(hostname);
}
String statusPageUrlPath = eurekaPropertyResolver.getProperty("statusPageUrlPath");
String healthCheckUrlPath = eurekaPropertyResolver.getProperty("healthCheckUrlPath");
if (!managementContextPath.endsWith("/")) {
managementContextPath = managementContextPath + "/";
}
if (StringUtils.hasText(statusPageUrlPath)) {
instance.setStatusPageUrlPath(statusPageUrlPath);
}
if (StringUtils.hasText(healthCheckUrlPath)) {
instance.setHealthCheckUrlPath(healthCheckUrlPath);
}
String scheme = instance.getSecurePortEnabled() ? "https" : "http";
URL base = new URL(scheme, instance.getHostname(), managementPort, managementContextPath);
instance.setStatusPageUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getStatusPageUrlPath(), '/')).toString());
instance.setHealthCheckUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getHealthCheckUrlPath(), '/')).toString());
}
return instance;
}
/**
* 3、註冊客戶端自動註冊相關元件
* EurekaRegistration: Eureka例項的服務註冊資訊(在開啟客戶端自動註冊時才會註冊)
* EurekaServiceRegistry: Eureka服務註冊器
* EurekaAutoServiceRegistration: Eureka服務自動註冊器,
* 通過EurekaServiceRegistry註冊器註冊EurekaRegistration資訊
*/
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager) {
return EurekaRegistration.builder(instanceConfig)
.with(applicationInfoManager)
.with(eurekaClient)
.with(healthCheckHandler)
.build();
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
/**
* 4、註冊netflix的 EurekaClient 和 ApplicationInfoManager
*/
// 如果禁用客戶端自動註冊,在此方法debug打斷點會觸發服務註冊,狀態為STARTING
@Bean
public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
return new EurekaDiscoveryClient(config, client);
}
// 普通的EurekaClient配置(不可重新整理)
@Configuration
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired(required = false)
private DiscoveryClientOptionalArgs optionalArgs;
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
}
// 可重新整理的EurekaClient配置類
@Configuration
@ConditionalOnRefreshScope //滿足@ConditionalOnClass(RefreshScope.class)
// @ConditionalOnBean(RefreshAutoConfiguration.class)
protected static class RefreshableEurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired(required = false)
private DiscoveryClientOptionalArgs optionalArgs;
// 註冊CloudEurekaClient,是com.netflix.discovery.EurekaClient介面的實現類
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
manager.getInfo(); // force initialization
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
// 註冊ApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
}
/**
* 5、註冊 EurekaHealthIndicator
*/
@Configuration
@ConditionalOnClass(Endpoint.class)
protected static class EurekaHealthIndicatorConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient,
EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) {
return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig);
}
}
}
如上,在滿足一系列Conditional條件後,會向Spring容器中註冊CloudEurekaClient
,它是com.netflix.discovery.EurekaClient介面的實現類,具體繼承實現關係如下
DiscoveryClient繼承實現關係
如上圖所示,剛剛建立的CloudEurekaClient
是 com.netflix.discovery.DiscoveryClient
的子類,它們都實現了com.netflix.discovery.EurekaClient
介面
EurekaClient
是Netflix對服務發現客戶端抽象的介面,包含很多方法,而DiscoveryClient
是其預設實現,也是本文分析的重點,CloudEurekaClient
是spring cloud的實現,根據類上註釋,其主要重寫了onCacheRefreshed()方法
,這個方法主要是從Eureka Server fetchRegistry()
獲取服務列表之後用於以廣播方式通知快取重新整理事件的,其實DiscoveryClient
也有onCacheRefreshed()方法
的實現,但由於DiscoveryClient
是Netflix的類,只發送了com.netflix.discovery.EurekaEvent,而CloudEurekaClient
使用Spring的ApplicationEventPublisher
,傳送了HeartbeatEvent
注意:
上面說的都是netflix的DiscoveryClient
還有另一個DiscoveryClient,是
org.springframework.cloud.client.discovery.DiscoveryClient
是Spring對服務發現客戶端的抽象
建立DiscoveryClient的過程
DiscoveryClient構造方法
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config,
AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
/**
* AbstractDiscoveryClientOptionalArgs 是DiscoveryClient的可選引數,可理解為擴充套件點
* 包含healthCheckHandlerProvider、healthCheckCallbackProvider、eventListeners等
* spring cloud預設實現為MutableDiscoveryClientOptionalArgs,但此處相關成員變數賦值後認為空
*/
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// 如果 shouldFetchRegistry=true,註冊netflix servo監控
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 如果 shouldRegisterWithEureka=true,註冊netflix servo監控
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 如果既不要向eureka server註冊,又不要獲取服務列表,就什麼都不用初始化
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
// 【重點】建立各種Executor 和 eurekaTransport、instanceRegionChecker
try {
// 執行定時任務的定時器,定時執行緒名為 DiscoveryClient-%d
// 在定時器中用於定時執行TimedSupervisorTask監督任務,監督任務會強制超時 和 記錄監控資料
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 執行heartbeat心跳任務的執行器,預設最大執行緒數=2,執行緒名為:DiscoveryClient-HeartbeatExecutor-%d
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 執行服務列表快取重新整理的執行器,預設最大執行緒數=2,執行緒名為:DiscoveryClient-CacheRefreshExecutor-%d
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
// 初始化eurekaTransport在服務註冊,獲取服務列表時的client
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 如果需要從eureka server獲取服務列表,並且嘗試fetchRegistry(false)失敗,呼叫BackupRegistry
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// 【重點】初始化所有定時任務
initScheduledTasks();
// 新增servo監控
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
上面的DiscoveryClient構造方法程式碼比較多,但多數都是一些賦值,本次分析的重點在註釋中已經標出,建立了各種Executor 和 eurekaTransport、instanceRegionChecker,之後又呼叫initScheduledTasks()方法
初始化所有這些定時任務
【重點】initScheduledTasks() 初始化定時任務
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
// 1、如果要從Eureka Server獲取服務列表
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 從eureka伺服器獲取登錄檔資訊的頻率(預設30s)
// 同時也是單次獲取服務列表的超時時間
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// 如果快取重新整理超時,下一次執行的delay最大是registryFetchIntervalSeconds的幾倍(預設10),預設每次執行是上一次的2倍
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
/**
* 【#### 執行CacheRefreshThread,服務列表快取重新整理任務 ####】
* 執行TimedSupervisorTask監督任務的定時器,具體執行器為cacheRefreshExecutor,任務為CacheRefreshThread
*/
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh", //監控名
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds, //指定具體任務的超時時間
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 2、如果要註冊到Eureka Server
if (clientConfig.shouldRegisterWithEureka()) {
// 續租的時間間隔(預設30s)
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// 如果心跳任務超時,下一次執行的delay最大是renewalIntervalInSecs的幾倍(預設10),預設每次執行是上一次的2倍
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
/**
* 【#### 執行HeartbeatThread,傳送心跳資料 ####】
* 執行TimedSupervisorTask監督任務的定時器,具體執行器為heartbeatExecutor,任務為HeartbeatThread
*/
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
/**
* 【#### InstanceInfo複製器 ####】
* 啟動後臺定時任務scheduler,執行緒名為 DiscoveryClient-InstanceInfoReplicator-%d
* 預設每30s執行一次定時任務,檢視Instance資訊(DataCenterInfo、LeaseInfo、InstanceStatus)是否有變化
* 如果有變化,執行 discoveryClient.register()
*/
instanceInfoReplicator = new InstanceInfoReplicator(
this, //當前DiscoveryClient
instanceInfo, //當前例項資訊
clientConfig.getInstanceInfoReplicationIntervalSeconds(),//InstanceInfo的複製間隔(預設30s)
2); // burstSize
/**
* 【StatusChangeListener 狀態改變監聽器】
*/
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
//使用InstanceInfo複製器 scheduler.submit()一個Runnable任務
//後臺馬上執行 discoveryClient.register()
instanceInfoReplicator.onDemandUpdate();
}
};
/**
* 是否關注Instance狀態變化,使用後臺執行緒將狀態同步到eureka server(預設true)
* 呼叫 ApplicationInfoManager#setInstanceStatus(status) 會觸發
* 將 StatusChangeListener 註冊到 ApplicationInfoManager
*/
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 啟動InstanceInfo複製器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}
// 當前服務例項不註冊到Eureka Server
else {
logger.info("Not registering with Eureka server per configuration");
}
}
總的來說initScheduledTasks()
做了以下幾件事:
- 如果shouldFetchRegistry=true,即要從Eureka Server獲取服務列表
- 啟動重新整理服務列表定時執行緒(DiscoveryClient-CacheRefreshExecutor-%d),預設registryFetchIntervalSeconds=30s執行一次,任務為
CacheRefreshThread
,即從Eureka Server獲取服務列表,也重新整理客戶端快取
- 啟動重新整理服務列表定時執行緒(DiscoveryClient-CacheRefreshExecutor-%d),預設registryFetchIntervalSeconds=30s執行一次,任務為
- 如果shouldRegisterWithEureka=true,即要註冊到Eureka Server
- 啟動heartbeat心跳定時執行緒(DiscoveryClient-HeartbeatExecutor-%d),預設renewalIntervalInSecs=30s續約一次,任務為
HeartbeatThread
,即客戶端向Eureka Server傳送心跳 - 啟動InstanceInfo複製器定時執行緒(DiscoveryClient-InstanceInfoReplicator-%d),開啟定時執行緒檢查當前Instance的DataCenterInfo、LeaseInfo、InstanceStatus,如果發現變更就執行
discoveryClient.register()
,將例項資訊同步到Server端
- 啟動heartbeat心跳定時執行緒(DiscoveryClient-HeartbeatExecutor-%d),預設renewalIntervalInSecs=30s續約一次,任務為
Eureka Client複製InstanceInfo,發起註冊
由建立DiscoveryClient的過程可知,建立了很多定時執行執行緒,如定時從Server端重新整理服務列表的CacheRefreshThread,定時報心跳續約的HeartbeatThread,還有用於更新並複製本地例項狀態到Server端的InstanceInfo複製器定時執行緒,而正是InstanceInfoReplicator#run()
中的discoveryClient.register()
發起了註冊
那麼怎麼可以觸發註冊行為呢?
// InstanceInfoReplicator#run()
public void run() {
try {
/**
* 重新整理 InstanceInfo
* 1、重新整理 DataCenterInfo
* 2、重新整理 LeaseInfo 租約資訊
* 3、根據HealthCheckHandler獲取InstanceStatus,並更新,如果狀態發生變化會觸發所有StatusChangeListener
*/
discoveryClient.refreshInstanceInfo();
// 如果isInstanceInfoDirty=true,返回dirtyTimestamp,否則是null
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register(); //發起註冊
instanceInfo.unsetIsDirty(dirtyTimestamp); //isInstanceInfoDirty置為false
}
}
catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
}
finally { // 繼續下次任務
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
如上,先重新整理InstanceInfo,重新整理後如果發現有髒資料,即例項發生了變更,還未同步給Server的資料,就發起註冊
那麼在Eureka Client啟動的這種場景下,怎樣會觸發有髒資料下的註冊?
- 由InstanceInfoReplicator複製器的自動定時任務在重新整理InstanceInfo時發現有髒資料,並更新
- InstanceInfoReplicator複製器提供
onDemandUpdate()
按需更新方法,一旦呼叫,馬上會submit()任務,其中會cancel自動更新任務,馬上執行InstanceInfoReplicator#run()
InstanceInfoReplicator複製器自動定時更新
InstanceInfoReplicator複製器在啟動建立DiscoveryClient時被建立並start()啟動
// InstanceInfoReplicator#start()
public void start(int initialDelayMs) { // 預設40s
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register 初始化時會將instanceInfo設定為dirty
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
所以當自動更新啟動時會設定InstanceInfo為髒資料,因為要觸發第一次向Server同步,那麼在40s後會呼叫InstanceInfoReplicator#run()
,假設InstanceInfo並沒有其它變更,那麼也會發起discoveryClient.register()
注意:
正常情況下是不會由延遲40s的第一次執行定時任務發起註冊,而是下面的onDemandUpdate() 主動按需更新發起註冊
如果設定@EnableDiscoveryClient(autoRegister = false) 或者 spring.cloud.service-registry.auto-registration.enabled=false,即放棄自動註冊,並在EurekaClientAutoConfiguration的如下方法打斷點
@Bean public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) { return new EurekaDiscoveryClient(config, client); }
會在斷點生效時觸發EurekaClient的例項化,而此EurekaClient就是一個DiscoveryClient,會啟動InstanceInfoReplicator自動定時更新執行緒,但由於new InstanceInfoFactory().create(config)時本地例項狀態為STARTING,所以註冊到Server端的狀態也是STARTING
onDemandUpdate() 主動按需更新
目前只有在ApplicationInfoManager#setInstanceStatus()更新例項狀態,且例項狀態真的發生變更,觸發StatusChangeListener狀態變更監聽器時,會呼叫onDemandUpdate
馬上submit任務執行InstanceInfoReplicator#run()
,再發起註冊
由於Spring Cloud預設是啟用服務自動註冊AutoServiceRegistration的,所以在EurekaClientAutoConfiguration
自動配置時會註冊服務自動註冊相關元件(EurekaRegistration、EurekaServiceRegistry、EurekaAutoServiceRegistration),其中EurekaAutoServiceRegistration
實現了Spring的SmartLifecycle介面,會在Spring容器refresh要完畢時觸發生命週期方法start(),其中會使用EurekaServiceRegistry
服務註冊器註冊EurekaRegistration
這個本地例項資訊
// EurekaServiceRegistry#register()
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getInstanceConfig().getAppname()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
// 設定初始化狀態
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
if (reg.getHealthCheckHandler() != null) {
reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
}
}
主要是設定初始化狀態步驟,而EurekaInstanceConfigBean
本地例項資訊的initialStatus初始化狀態為 InstanceStatus.UP,所以狀態與new InstanceInfo()時的STARTING不同,發生了狀態變更,觸發在建立DiscoveryClient時設定的StatusChangeListener
...省略
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
...省略
其中會呼叫 InstanceInfoReplicator.onDemandUpdate() 例項資訊複製器做按需更新,馬上將UP狀態更新/註冊到Server端
所以,以我判斷,Eureka Client啟動時的自動註冊大多數應該是Spring Cloud的服務自動註冊機制,在Spring容器基本啟動完畢時,觸發服務自動註冊操作,其中會使用ApplicationInfoManager更新例項狀態為初始狀態UP,一旦例項狀態變更會被馬上監聽到,執行復制器的InstanceInfoReplicator.onDemandUpdate()按需更新,馬上執行一次discoveryClient.register()操作
所以,下面就是分析 discoveryClient.register() 是怎麼註冊服務的
DiscoveryClient#register() 註冊
// DiscoveryClient#register()
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
如上,註冊方法使用eurekaTransport
的註冊客戶端registrationClient
呼叫了register(instanceinfo)方法
EurekaTransport
是DiscoveryClient
的內部類,其中包含
- registrationClient 和 registrationClientFactory: 負責註冊、續約相關工作的
EurekaHttpClient
和EurekaHttpClientFactory
的實現類 - queryClient 和 queryClientFactory: 負責獲取Server端服務列表的
EurekaHttpClient
和EurekaHttpClientFactory
的實現類 - TransportClientFactory: 負責傳輸訊息的客戶端工廠(底層用於和Server互動的http框架是 Jersey,此處的工廠就和Jersey相關)
那麼EurekaTransport
的相關元件,尤其是registrationClient
註冊客戶端是如何初始化的呢?
registrationClient - 服務註冊相關的EurekaHttpClient
初始化是在DiscoveryClient
的構造方法中
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
主要是scheduleServerEndpointTask()
方法
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
AbstractDiscoveryClientOptionalArgs args) {
Collection<?> additionalFilters = args == null
? Collections.emptyList()
: args.additionalFilters;
EurekaJerseyClient providedJerseyClient = args == null
? null
: args.eurekaJerseyClient;
TransportClientFactories argsTransportClientFactories = null;
if (args != null && args.getTransportClientFactories() != null) {
argsTransportClientFactories = args.getTransportClientFactories();
}
// Ignore the raw types warnings since the client filter interface changed between jersey 1/2
@SuppressWarnings("rawtypes")
TransportClientFactories transportClientFactories = argsTransportClientFactories == null
? new Jersey1TransportClientFactories()
: argsTransportClientFactories;
// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
// 1、引數中是否提供了transportClientFactory的實現,沒有就使用Jersey1TransportClientFactories
eurekaTransport.transportClientFactory = providedJerseyClient == null
? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
: transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
@Override
public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
long delay = getLastSuccessfulRegistryFetchTimePeriod();
if (delay > thresholdInMs) {
logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
thresholdInMs, delay);
return null;
} else {
return localRegionApps.get();
}
}
};
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
clientConfig,
transportConfig,
eurekaTransport.transportClientFactory,
applicationInfoManager.getInfo(),
applicationsSource
);
/**
* 是否要想Eureka Server註冊
* 2、建立RegistrationClient用於註冊的客戶端及工廠,並設定到eurekaTransport
*/
if (clientConfig.shouldRegisterWithEureka()) {
EurekaHttpClientFactory newRegistrationClientFactory = null;
EurekaHttpClient newRegistrationClient = null;
try {
newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
transportConfig
);
newRegistrationClient = newRegistrationClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
eurekaTransport.registrationClient = newRegistrationClient;
}
//
/**
* 是否要從Server端獲取服務列表
* 3、建立QueryClient用於查詢服務列表的客戶端及工廠,並設定到eurekaTransport
*/
// new method (resolve from primary servers for read)
// Configure new transport layer (candidate for injecting in the future)
if (clientConfig.shouldFetchRegistry()) {
EurekaHttpClientFactory newQueryClientFactory = null;
EurekaHttpClient newQueryClient = null;
try {
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource
);
newQueryClient = newQueryClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.queryClientFactory = newQueryClientFactory;
eurekaTransport.queryClient = newQueryClient;
}
}
所以,下面就是逐層深入分析RegistrationClient用於註冊的客戶端及工廠是如何建立的?
由於RegistrationClient其實是一種EurekaHttpClient
,而EurekaHttpClient
是介面,其實現類很多
檢視原始碼發現,Netflix採用的是 Factory工廠 + 代理 的模式,從最外層建立的EurekaHttpClient工廠包含一個成員變數是另一個代理的EurekaHttpClient工廠,每個工廠生成的EurekaHttpClient功能不一樣,在從外層執行一個操作時,最外層的工廠執行其相關功能後,使用代理的工廠新建EurekaHttpClient例項,再呼叫其相同的方法,也實現這個EurekaHttpClient的相關功能,就這樣逐層深入,各司其職後,最後使用Jersey傳送POST請求到Eureka Server發起註冊,而這些EurekaHttpClient都是在com.netflix.discovery.shared.transport.decorator
EurekaHttpClient的包裝類的包下的,由外到內大致是:
- SessionedEurekaHttpClient: 強制在一定時間間隔後重連EurekaHttpClient,防止永遠只連線特定Eureka Server,反過來保證了在Server端叢集拓撲發生變化時的負載重分配
- RetryableEurekaHttpClient: 帶有重試功能,預設最多3次,在配置的所有候選Server地址中嘗試請求,成功重用,失敗會重試另一Server,並維護隔離清單,下次跳過,當隔離數量達到閾值,清空隔離清單,重新開始
- RedirectingEurekaHttpClient: Server端返回302重定向時,客戶端shutdown原EurekaHttpClient,根據response header中的Location新建EurekaHttpClient
- MetricsCollectingEurekaHttpClient: 統計收集Metrics資訊
- JerseyApplicationClient: AbstractJerseyEurekaHttpClient的子類
- AbstractJerseyEurekaHttpClient: 底層實現通過Jersery註冊、發心跳等的核心類
- jerseyClient: Jersery客戶端
- AbstractJerseyEurekaHttpClient: 底層實現通過Jersery註冊、發心跳等的核心類
- JerseyApplicationClient: AbstractJerseyEurekaHttpClient的子類
- MetricsCollectingEurekaHttpClient: 統計收集Metrics資訊
- RedirectingEurekaHttpClient: Server端返回302重定向時,客戶端shutdown原EurekaHttpClient,根據response header中的Location新建EurekaHttpClient
- RetryableEurekaHttpClient: 帶有重試功能,預設最多3次,在配置的所有候選Server地址中嘗試請求,成功重用,失敗會重試另一Server,並維護隔離清單,下次跳過,當隔離數量達到閾值,清空隔離清單,重新開始
SessionedEurekaHttpClient - 定時重連
// SessionedEurekaHttpClient#execute()
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
// 如果上次重連時間到現在已經超過了currentSessionDurationMs,關閉當前EurekaHttpClient
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
// 如果EurekaHttpClient為空,clientFactory.newClient()重建
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
// 繼續執行後續
return requestExecutor.execute(eurekaHttpClient);
}
RetryableEurekaHttpClient - 候選範圍內失敗重試
// RetryableEurekaHttpClient#execute()
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
// 最多重試numberOfRetries(預設:3)
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();//從AtomicReference<EurekaHttpClient>獲取當前EurekaHttpClient
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates(); //返回候選集合 排除 已經失敗隔離的Host集合
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 根據當前的下標獲取Endpoint,並新建 JerseyClient
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
// 繼續後續執行
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
// 如果根據當前操作型別 和 返回狀態碼,滿足狀態計算器,記錄currentHttpClient可用,下次繼續使用
// 返回狀態碼是:200、300、302,或者Register、SendHeartBeat情況下是404
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
}
catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
// 請求失敗 或 報5xx錯誤,將delegate清空,重試另一個Server,並將當前Endpoint放到隔離集合
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
// 多次重試後仍無法成功返回結果,上拋異常
throw new TransportException("Retry limit reached; giving up on completing the request");
}
//########## RetryableEurekaHttpClient#getHostCandidates()
// 返回 所有候選的Host節點資料 與 隔離集合 的資料差集
private List<EurekaEndpoint> getHostCandidates() {
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); //所有候選節點資料
quarantineSet.retainAll(candidateHosts); //確保quarantineSet隔離集合中的資料都在candidateHosts中
//當candidateHosts發生變化時也能及時清理quarantineSet隔離集合
// If enough hosts are bad, we have no choice but start over again
// 預設:0.66百分比
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
// 隔離集合為空
if (quarantineSet.isEmpty()) {
// no-op
}
// 隔離資料已經大於閥值,不得已要重新開始,清空隔離集合
else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
}
// 隔離集合不為空,也不大於閥值,排除隔離集合中的Endpoint後返回
else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
//########## ServerStatusEvaluators#LEGACY_EVALUATOR
// Eureka Server返回狀態的計算器,計算不同場景下的不同狀態碼是否代表成功
private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
@Override
public boolean accept(int statusCode, RequestType requestType) {
if (statusCode >= 200 && statusCode < 300 || statusCode == 302) {
return true;
} else if (requestType == RequestType.Register && statusCode == 404) {
return true;
} else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {
return true;
} else if (requestType == RequestType.Cancel) { // cancel is best effort
return true;
} else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) {
return true;
}
return false;
}
};
RedirectingEurekaHttpClient - 按Server端要求重定向到新Server
//########## RedirectingEurekaHttpClient#executeOnNewServer
// Server端返回302重定向時,客戶端shutdown原EurekaHttpClient,根據response header中的Location新建EurekaHttpClient
private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor,
AtomicReference<EurekaHttpClient> currentHttpClientRef) {
URI targetUrl = null;
// 最多重定向預設10次
for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get());
// 如果返回的不是302重定向,返回response
if (httpResponse.getStatusCode() != 302) {
if (followRedirectCount == 0) {
logger.debug("Pinning to endpoint {}", targetUrl);
} else {
logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount);
}
return httpResponse;
}
// 從response中獲取Location,用於重建EurekaHttpClient
targetUrl = getRedirectBaseUri(httpResponse.getLocation());
if (targetUrl == null) {
throw new TransportException("Invalid redirect URL " + httpResponse.getLocation());
}
currentHttpClientRef.getAndSet(null).shutdown();
currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString())));
}
String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl();
logger.warn(message);
throw new TransportException(message);
}
MetricsCollectingEurekaHttpClient - 統計收集執行情況
// MetricsCollectingEurekaHttpClient#execute()
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
Stopwatch stopwatch = requestMetrics.latencyTimer.start(); //統計執行延時
try {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); //按狀態統計
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment(); //統計錯誤
exceptionsMetric.count(e); //按異常名統計
throw e;
} finally {
stopwatch.stop();
}
}
AbstractJerseyEurekaHttpClient - 底層通過Jersery傳送註冊、心跳請求
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
protected final Client jerseyClient; //真正處理請求的Jersery客戶端
protected final String serviceUrl; //連線的Server端地址
protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) {
this.jerseyClient = jerseyClient;
this.serviceUrl = serviceUrl;
logger.debug("Created client for url: {}", serviceUrl);
}
/**
* 註冊方法
*/
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName(); //請求Eureka Server的【/apps/應用名】介面地址
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info); //例項InstanceInfo資料,通過Post請求body發過去
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
}
finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
...省略
Eureka Server接收到的註冊請求詳情
經過上面的步驟,客戶端已經可以通過Jersery傳送Http請求給Eureka Server端註冊,具體請求如下:
POST /eureka/apps/應用名 HTTP/1.1
Accept-Encoding: gzip
Content-Type: application/json
Accept: application/json
DiscoveryIdentity-Name: DefaultClient
DiscoveryIdentity-Version: 1.4
DiscoveryIdentity-Id: 192.168.70.132
Transfer-Encoding: chunked
Host: localhost:8001
Connection: Keep-Alive
User-Agent: Java-EurekaClient/v1.6.2
1a0
{"instance":{
"instanceId":"192.168.70.132:應用名:10001",
"hostName":"192.168.70.132",
"app":"應用名",
"ipAddr":"192.168.70.132",
"status":"UP",
"overriddenstatus":"UNKNOWN",
"port": { "\(":10001, "@enabled" : "true" }, "securePort": { "\)":443, "@enabled" : "false"},
"countryId":1,
"dataCenterInfo":{"@class":"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
"name":"MyOwn"
}
Eureka Client註冊流程總結
大體來說,Eureka Client的註冊是由Spring Cloud的
AutoServiceRegistration自動註冊
發起,在設定應用例項Instance初始狀態為UP時,觸發了InstanceInfoReplicator#onDemandUpdate()按需更新
方法,將例項Instance資訊通過DiscoveryClient
註冊到Eureka Server,期間經過了一些EurekaHttpClient的裝飾類,實現了諸如定期重連、失敗重試、註冊重定向、統計收集Metrics資訊等功能,最後由JerseryClient
傳送POST請求呼叫Eureka Server的【/eureka/apps/應用名】端點,請求體攜帶InstanceInfo例項資訊,完成註冊
EurekaAutoServiceRegistration#start(): 實現Spring的SmartLifecycle,在Spring容器refresh()最後一步finishRefresh()會呼叫生命週期的start()方法
EurekaServiceRegistry#register(EurekaRegistration): 使用服務註冊器註冊服務資訊
ApplicationInfoManager#setInstanceStatus(初始狀態): 應用例項資訊管理器更新初始狀態為 UP
StatusChangeListener: 觸發例項狀態監聽(此Listener是在DiscoveryClient#initScheduledTasks()方法中設定的)
InstanceInfoReplicator.onDemandUpdate(): 例項狀態複製器執行按需狀態更新
DiscoveryClient#register(): DiscoveryClient發起註冊例項資訊
EurekaHttpClientDecorator#execute(): 執行EurekaHttpClient的裝飾類,實現其各自功能
SessionedEurekaHttpClient: 定時重連
RetryableEurekaHttpClient: 候選範圍內失敗重試
RedirectingEurekaHttpClient: 按Eureka Server端要求重定向到新Server
MetricsCollectingEurekaHttpClient: 統計收集執行情況
- JerseyApplicationClient#register(): 封裝註冊請求資料
- JerseyClient傳送Post註冊請求
- JerseyApplicationClient#register(): 封裝註冊請求資料