1. 程式人生 > >Spring Cloud-Eureka Client 原理解析

Spring Cloud-Eureka Client 原理解析

前面一些 demo 中已經介紹瞭如何使用 SOFABoot 來整合 Spring Cloud Netflix Eureka 元件。本篇將來先解析下 Eureka Client 的工作原理。

Netflix 和 SpringCloud

spring-cloud-commons 模組是 spring 在分散式領域上(服務發現,服務註冊,斷路器,負載均衡)的規範定義。spring-cloud-netflix 是基於此規範的具體實現,Netflix OSS 裡的各種元件也都實現了這個 commons 規範。關係如下:

image.png

Spring Cloud Netflix Eureka 服務發現實現原理

基於上圖,這裡以 Eureka 中的服務發現為例,來具體講下是如何實現的。Spring Cloud common 中提供了用於服務發現的兩個關鍵類:DiscoveryClient 介面 和 EnableDiscoveryClient 註解。

DiscoveryClient 介面

下面這張圖描述的是在服務發現這個功能上,SpringCloud 是如何與 Netflix 整合的。 在 spring-cloud-netflix-eureka-client 中對 Spring Cloud Common 中的 DiscoveryClient 介面進行了實現,實現類是 EurekaDiscoveryClient 。

image.png

DiscoveryClient 的介面定義與方法:

/**
 * DiscoveryClient表示服務發現常用的讀取操作,例如Netflix Eureka或consul.io
 * @author Spencer Gibb
 */
public interface DiscoveryClient {

	/**
	 * 實現描述
	 * @return the description
	 */
	String description();

	/**
	 * 獲取與特定serviceId關聯的所有ServiceInstances
	 * @param serviceId the serviceId to query
	 * @return a List of ServiceInstance
	 */
	List<ServiceInstance> getInstances(String serviceId);

	/**
	 * 返回所有已知的服務ID
	 */
	List<String> getServices();
}
複製程式碼

EurekaDiscoveryClient 中實現了這幾個方法,但是 EurekaDiscoveryClient 自身沒有實現如何與服務端互動的邏輯,而是通過 com.netflix.DiscoveryClient 類來完成。所以 spring-cloud-netflix-eureka-client 乾的事情就是實現了 Spring Cloud Common 規範,然後在實現上包裝了 netflix 。

@EnableDiscoveryClient 註解


```java @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { //是否自動註冊,預設是true。 boolean autoRegister() default true; } ```

EnableDiscoveryClientImportSelector 將會從 META-INF/spring.factories 裡找出 key 為org.springframework.cloud.client.discovery.EnableDiscoveryClient 的類。

對於 autoRegister :

  • 如果自動註冊屬性為true,會在找出的這些類裡再加上一個類:AutoServiceRegistrationConfiguration, AutoServiceRegistrationConfiguration 內部會使用@EnableConfigurationProperties(AutoServiceRegistrationProperties.class) 觸發構造AutoServiceRegistrationProperties 這個 bean。像eureka,nacos,它們的自動化配置類裡都使用了@ConditionalOnBean(AutoServiceRegistrationProperties.class) 來確保存在AutoServiceRegistrationProperties 這個 bean 存在的時候才會構造 AutoServiceRegistration 進行註冊。
  • 如果自動註冊屬性為 false,在Environment 里加一個 PropertySource,內部的配置項是spring.cloud.service-registry.auto-registration.enabled,值是false(代表不構造AutoServiceRegistrationProperties.class)。這樣 eureka 就不會註冊。

對應上面這段邏輯的程式碼如下:

image.png

spring-cloud-netflix-eureka-client 自己也提供了一個註解 EnableEurekaClient,其作用於這個註解一樣

Eureka 架構圖

image.png

  • consumer  : 服務消費方,eureka client 角色,可以從 eureka server 上拉取到其他已註冊服務的資訊,從而根據這些資訊找到自己所需的服務,然後發起遠端呼叫。
  • provider : 服務提供方,eureka client 角色,可以向 eureka server 上註冊和更新自己的資訊,當然作為 eureka client ,它也可以從server 上獲取到其他服務的資訊。
  • Eureka server : 服務註冊中心,提供服務註冊和服務發現功能;
  • 同步複製 : eureka server 之間進行註冊服務資訊的同步,這樣可以保證叢集中每個server 都能提供完整的服務資訊。

關於 AWS 上 Regin 和 Availability Zone 的概念,請自行查閱相關資料

原始碼解析

配置資訊讀取

Eureka Client的自動配置類是 org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration ,這裡面主要就負責了一些配置資訊的服務諸如 DiscoveryClient 、EurekaServiceRegistry等主要bean的初始化工作。

另外還有一個 EurekaDiscoveryClientConfiguration 類,負責配置自動註冊和應用的健康檢查器初始化。

讀取 eureka.client.*

@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
	EurekaClientConfigBean client = new EurekaClientConfigBean();
  if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
    // 預設情況下,我們不會在引導過程中註冊,但是以後會有另一個機會。
    client.setRegisterWithEureka(false);
  }
  return client;
}
複製程式碼

EurekaClientConfigBean 封裝的是 eureka client 和 eureka server 互動所需要的配置資訊,比如前面demo工程中的 eureka.client.service-url.defaultZone 的配置。

讀取 eureka.instance.*

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
	ManagementMetadataProvider managementMetadataProvider) {
  // 程式碼較長,此處省略
}
複製程式碼

EurekaInstanceConfigBean 封裝的是 eureka client 自身例項的配置資訊,提供服務註冊的基本元資料資訊。

核心元件 bean 初始化

這裡也例項化了一些核心的元件bean。

ApplicationInfoManager

  • EurekaClientConfiguration#eurekaApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
  InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
  return new ApplicationInfoManager(config, instanceInfo);
}
複製程式碼
  • RefreshableEurekaClientConfiguration#eurekaApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
  InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
  return new ApplicationInfoManager(config, instanceInfo);
}
複製程式碼

RefreshScope ,被此註解標註的情況下,將會被動態重新整理。包括屬性資訊等,注意,對於動態重新整理,被RefreshScope標記的類不能是final的。

ApplicationInfoManager 是應用資訊管理器,用於管理服務例項的資訊類 InstanceInfo 和服務例項的配置資訊類 EurekaInstanceConfig 。

DiscoveryClient

@Bean
public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
	return new EurekaDiscoveryClient(config, client);
}
複製程式碼

DiscoveryClient ,前面說到,這個類是Spring Cloud 中用於服務發現使用的客戶端介面。注意這裡是SpringCloud提供的介面,不是netflix中的類。

EurekaServiceRegistry

@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
	return new EurekaServiceRegistry();
}
複製程式碼

EurekaServiceRegistry 是 ServiceRegistry 的實現類。ServiceRegistry 是 SpringCloud 提供了註冊和登出等方法,這些方法允許使用者提供自定義註冊服務。

EurekaRegistration

@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
	public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {
		return EurekaRegistration.builder(instanceConfig)
				.with(applicationInfoManager)
				.with(eurekaClient)
				.with(healthCheckHandler)
				.build();
	}
複製程式碼

每個 ServiceRegistry 實現都有自己的 Registry 實現。

  • ZookeeperRegistration -> ZookeeperServiceRegistry
  • ZookeeperRegistration -> EurekaServiceRegistry
  • ConsulRegistration       -> ConsulServiceRegistry

如果你需要自定義實現 ServiceRegistry ,則也不要提供一個 Registration  的實現。

服務發現

服務發現的基本情況在上面已經提到了,但是由於 SpingCloud 中並沒有提供具體的互動操作而是由 com.netflix.discovery.DiscoveryClient 來完成具體工作。所以關於服務服務發現這裡就直接圍繞這個類來展開。

image.png

LookopService

public interface LookupService<T> {
    // 根據服務例項註冊的appName 來獲取 Application
    Application getApplication(String appName);
    // 返回當前登錄檔中所有的服務例項資訊
    Applications getApplications();
    // 根據服務例項Id獲取服務例項資訊
    List<InstanceInfo> getInstancesById(String id);
    /**
     * 獲取下一個可能的伺服器,以處理來自從eureka接收到的登錄檔資訊的請求。
     * @virtualHostname 與伺服器關聯的虛擬主機名。
     * @secure 指示是HTTP還是HTTPS請求
     *
     */
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
複製程式碼

LookupService 介面的作用就是用於查詢活動服務例項;總共提供了四個方法,很好理解。每個方法的作用見註釋。

EurekaClient

EurekaClient 也是一個介面,整合並且擴充套件了 LookupService。

This interface does NOT try to clean up the current client interface for eureka 1.x. Rather it tries to provide an easier transition path from eureka 1.x to eureka 2.x. 從這來看,EurekaClient 的存在是為了給 Eureka1.x 向 Eureka 2.x 升級提供容錯能力。

EurekaClient 在 LookupService 基礎上擴充套件了很多方法,如下:

public interface EurekaClient extends LookupService {
  	// 省去@Deprecated方法和獲取服務例項資訊的介面方法
		// 註冊健康檢查處理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
		// 監聽client服務資訊的更新
    public void registerEventListener(EurekaEventListener eventListener);
   	// 取消監聽
    public boolean unregisterEventListener(EurekaEventListener eventListener);
 		// 獲取當前健康檢查處理器
    public HealthCheckHandler getHealthCheckHandler();
		// 關閉 eureka 客戶端。還向eureka伺服器傳送撤銷註冊請求。
    public void shutdown();
  	// EurekaClientConfig
    public EurekaClientConfig getEurekaClientConfig();
 		// ApplicationInfoManager
    public ApplicationInfoManager getApplicationInfoManager();
}
複製程式碼

HealthCheckHandler 這個是用於檢查當前客戶端狀態的,這個在後面心跳機制裡面會說道。

DiscoveryClient

com.netflix.discovery.DiscoveryClient,這個類會在建構函式中完成一系列重要的操作,如:拉取登錄檔資訊,服務註冊,初始化心跳機制,快取重新整理,按需註冊定時任務等等。

 DiscoveryClient(ApplicationInfoManager applicationInfoManager, 
 								 EurekaClientConfig config, 
                 AbstractDiscoveryClientOptionalArgs args,
                 Provider<BackupRegistry> backupRegistryProvider) {
 // ... 
 }
複製程式碼

幾個引數的釋義如下:

  • applicationInfoManager :應用資訊管理器
  • config :client 與 server 互動的配置資訊
  • args :客戶端提供的過濾器型別(支援jersey1和jersey2),後面用來構建 EurekaTransport
  • backupRegistryProvider : 備份註冊中心

服務發現

下面程式碼片段也是在 DiscoveryClient 的建構函式裡面的,這裡就是拉取註冊服務資訊的邏輯:

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
	fetchRegistryFromBackup();
}
複製程式碼

clientConfig.shouldFetchRegistry() 這個方法拿到的就是配置檔案中 eureka.client.fetch-registry 的值,預設為true,表示從 eureka server 拉取登錄檔資訊。

fetchRegistry(boolean)是從 eureka server 拉取註冊資訊的方法,引數用於表示是否是強制拉取全量的註冊資訊;此方法除非在協調eureka伺服器和客戶端登錄檔資訊方面存在問題,否則此方法只嘗試在第一次進行全量獲取,後面均是增量獲取。

fetchRegistryFromBackup() 如果 eureka server 服務不可用,則採用的備用方案。

底層通訊實現 EurekaTransport

EurekaTransport 是 DiscoveryClient 的內部類,EurekaTransport 封裝了具體的基於 jersey 的底層通訊實現。

FetchRegistry

image.png
上圖為拉取註冊資訊的整個過程。對於黃色貼條上的條件,如果滿足其中一個,則都會進行全量拉取;否則進行增量拉取。計算 hash 值是為了後面可以與server端應用資訊的進行對比,用於感知是否需要重新進行拉取操作。

服務註冊

服務註冊邏輯也是在 DiscoveryClient 的建構函式中完成,程式碼片段如下:

if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
  try {
    if (!register() ) {
    throw new IllegalStateException("Registration error at startup. Invalid server response.");
  	}
  } catch (Throwable th) {
    logger.error("Registration error at startup: {}", th.getMessage());
    throw new IllegalStateException(th);
  }
}
複製程式碼

向server端註冊需要滿足的兩個條件是:1、允許向server端註冊  2、是否在客戶端初始化期間強制註冊

boolean register() throws Throwable {
  logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
  EurekaHttpResponse<Void> httpResponse;
  try {
  	httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  } catch (Exception e) {
    logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
    throw e;
  }
  if (logger.isInfoEnabled()) {
  	logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
  }
  return httpResponse.getStatusCode() == 204;
}
複製程式碼

通過 eurekaTransport 物件,基於 REST 呼叫向 eureka server 進行服務註冊。

心跳機制

心跳機制的初始化工作也是在 DiscoveryClient 建構函式中完成。在DiscoveryClient建構函式的最後,有一個初始化排程任務的方法,在這個方法裡就包括心跳的初始化。

heartbeatExecutor 心跳執行緒池:

heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
複製程式碼

scheduler 提交週期執行:

// Heartbeat timer
scheduler.schedule(
                  new TimedSupervisorTask(
                  "heartbeat",
                  scheduler,
                  heartbeatExecutor,
                  renewalIntervalInSecs,
                  TimeUnit.SECONDS,
                  expBackOffBound,
                  new HeartbeatThread()
                  ),
									renewalIntervalInSecs, TimeUnit.SECONDS);
複製程式碼

TimedSupervisorTask 是 eureka 中自動調節間隔的週期性任務類。HeartbeatThread 是具體執行任何的執行緒,run方法中執行的就是 renew() 續期。

boolean renew() {
  EurekaHttpResponse<InstanceInfo> httpResponse;
  try {
    // 通過 eurekaTransport 來與 server 通訊續期
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    // 404 標識當前服務例項不存在
    if (httpResponse.getStatusCode() == 404) {
      // 記錄心跳次數
      REREGISTER_COUNTER.increment();
      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
      long timestamp = instanceInfo.setIsDirtyWithTime();
      // 重新註冊
      boolean success = register();
      if (success) {
      	instanceInfo.unsetIsDirty(timestamp);
      }
    	return success;
    }
    // 200 狀態正常
    return httpResponse.getStatusCode() == 200;
  } catch (Throwable e) {
    logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    return false;
  }
}
複製程式碼

服務下線

關閉 eureka client,還向 eureka server 傳送撤銷註冊請求。該方法在DiscoveryClient#shutdown 方法中。

@PreDestroy
    @Override
    public synchronized void shutdown() {
  			// 保證原子操作
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");
            if (statusChangeListener != null && applicationInfoManager != null) {
              	// 應用管理器取消狀態監聽
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
						// 清理任務排程執行
            cancelScheduledTasks();
            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
              	//設定服務例項狀態為 DOWN
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
              	//登出註冊
                unregister();
            }
						// 關閉 jersey 客戶端
            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();
            logger.info("Completed shut down of DiscoveryClient");
        }
    }
複製程式碼