1. 程式人生 > >SpringCloud 原始碼系列(1)—— 註冊中心 Eureka(上)

SpringCloud 原始碼系列(1)—— 註冊中心 Eureka(上)

Eureka 是 Netflix 公司開源的一個服務註冊與發現的元件,和其他 Netflix 公司的服務元件(例如負載均衡、熔斷器、閘道器等)一起,被 Spring Cloud 整合為 Spring Cloud Netflix 模組。不過 Eureka 2.0 開始閉源了,但 1.x 還在繼續維護中,可以繼續使用。這篇文章就來深入學習下 Eureka 註冊中心,便於我們更好的使用和調優註冊中心。

關於版本:本文章使用的 Spring cloud 版本為 Hoxton.SR8,Spring boot 版本為 2.3.3.RELEASE,依賴的 eureka 版本則為 1.9.25。

一、Eureka 初體驗

Eureka 分為 Eureka Server 和 Eureka Client,Eureka Server 為 Eureka 註冊中心,Eureka Client 為 Eureka 客戶端。這節先通過demo把註冊中心的架子搭起來,看看註冊中心的基礎架構。

1、Eureka Server

① 建立註冊中心服務:sunny-register

首先建立一個 maven 工程,服務名稱為 sunny-register,並在 pom.xml 中引入註冊中心服務端的依賴。

1 <dependencies>
2     <dependency>
3         <groupId>org.springframework.cloud</groupId>
4         <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
5     </dependency>
6 </dependencies>

② 新增配置檔案

在 resources 下新增 application.yml 配置檔案,並添加註冊中心相關配置。

 1 server:
 2   port: 8000
 3 spring:
 4   application:
 5     name: sunny-register
 6 
 7 eureka:
 8   instance:
 9     hostname: dev.lyyzoo.com
10   client:
11     # 是否向註冊中心註冊自己
12     register-with-eureka: false
13     # 是否檢索服務
14     fetch-registry: false
15     service-url:
16       defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

③ 新增啟動類

新增啟動類,並在啟動類上加上 @EnableEurekaServer 註解,啟用註冊中心。

 1 package com.lyyzoo.sunny.register;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
 6 
 7 @EnableEurekaServer
 8 @SpringBootApplication
 9 public class RegisterApplication {
10 
11     public static void main(String[] args) {
12         SpringApplication.run(RegisterApplication.class, args);
13     }
14 }

④ 啟動註冊中心

啟動註冊中心後,訪問 http://dev.lyyzoo.com:8000/,就可以看到註冊中心的頁面了,現在還沒有例項註冊上來。(dev.lyyzoo.com 在本地 hosts 檔案中對映到 127.0.0.1)

2、Eureka Client

建立兩個 demo 服務,demo-producer 服務作為生產者提供一個介面,demo-consumer 服務作為消費者去呼叫 demo-producer 的介面。

① 建立客戶端服務:demo-producer

建立maven工程,服務名稱為 demo-producer,在 pom.xml 中引入註冊中心客戶端的依賴,並添加了 web 的依賴。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

② 新增配置檔案

在 resouces 下新增 application.yml 配置檔案,添加註冊中心客戶端相關的配置。

 1 server:
 2   port: 8010
 3 spring:
 4   application:
 5     name: demo-producer
 6 
 7 eureka:
 8   client:
 9     serviceUrl:
10       defaultZone: ${EUREKA_DEFAULT_ZONE:http://dev.lyyzoo.com:8000/eureka}

③ 新增啟動類

新增啟動類,並在啟動類上加上 @EnableEurekaClient 註解,啟用客戶端。

1 @EnableEurekaClient
2 @SpringBootApplication
3 public class ProducerApplication {
4 
5     public static void main(String[] args) {
6         SpringApplication.run(ProducerApplication.class, args);
7     }
8 }

④ 新增一個 rest 介面

新增一個介面用於測試呼叫:

 1 @RestController
 2 public class DemoController {
 3 
 4     private final Logger logger = LoggerFactory.getLogger(getClass());
 5 
 6     @GetMapping("/v1/uuid")
 7     public ResponseEntity<String> getUUID() {
 8         String uuid = UUID.randomUUID().toString();
 9         logger.info("generate uuid: {}", uuid);
10         return ResponseEntity.ok(uuid);
11     }
12 }

⑤  建立客戶端服務:demo-consumer

類似的方式,再建立消費者服務:demo-producer,這個服務中新增一個消費者介面,通過 RestTemplate 負載均衡的方式來呼叫 demo-producer 的介面。

因此需要先配置一個帶有負載均衡的 RestTemplate:

 1 @EnableEurekaClient
 2 @SpringBootApplication
 3 public class ConsumerApplication {
 4 
 5     @Bean
 6     @LoadBalanced
 7     public RestTemplate restTemplate() {
 8         return new RestTemplate();
 9     }
10 
11     public static void main(String[] args) {
12         SpringApplication.run(ConsumerApplication.class, args);
13     }
14 }

新增消費者介面,注意這裡 url 是寫的服務名稱,並不是具體的 ip 地址或埠,在微服務場景下,服務間呼叫也不可能寫死某個具體的地址。

 1 @RestController
 2 public class DemoController {
 3 
 4     private final Logger logger = LoggerFactory.getLogger(getClass());
 5 
 6     @Autowired
 7     private RestTemplate restTemplate;
 8 
 9     @GetMapping("/v1/id")
10     public ResponseEntity<String> getId() {
11         ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class);
12         String uuid = result.getBody();
13         logger.info("request id: {}", uuid);
14         return ResponseEntity.ok(uuid);
15     }
16 }

⑥ 啟動註冊中心客戶端

以兩個不同的埠啟動 demo-producer,可以通過環境變數的方式制定埠。然後再啟動 demo-consumer。

啟動完成之後,就可以在註冊中心看到註冊上來的兩個 demo-producer 例項和一個 demo-consumer 例項,並且狀態都為 UP。

⑦ 測試介面

呼叫消費者服務的介面,多次訪問 http://dev.lyyzoo.com:8020/v1/id 介面,會發現生產者服務 demo-consumer 兩個例項的控制檯會交替的輸出日誌資訊。這就說明消費者客戶端通過服務名稱訪問到生產者了。

3、Eureka 基礎架構

通過前面的體驗,可以發現,服務間呼叫只需知道某個服務的名稱就可以呼叫這個服務的api了,而不需要指定具體的ip地址和埠,那這是怎麼做到的呢?

不難看出,Eureka 的基礎架構包含三種角色:

  • 服務註冊中心:Eureka Server,提供服務註冊和發現的功能
  • 服務提供者:Eureka Client,提供服務(本身也可以作為消費者)
  • 服務消費者:Eureka Client,消費服務(本身也可以作為提供者)

首先需要一個服務註冊中心,客戶端則向註冊中心註冊,將自己的資訊(比如服務名、服務的 IP 地址和埠資訊等)提交給註冊中心。客戶端向註冊中心獲取一份服務註冊列表的資訊,該列表包含了所有向註冊中心註冊的服務資訊。獲取服務註冊列表資訊之後,客戶端服務就可以根據服務名找到服務的所有例項,然後通過負載均衡選擇其中一個例項,根據其 IP 地址和埠資訊,就可以呼叫服務的API介面了。

這就是註冊中心最基礎的架構和功能了,提供服務註冊和發現,為各個客戶端提供服務註冊列表資訊。但為了完成這些工作,Eureka 有很多的機制來實現以及保證其高可用,如服務註冊、服務續約、獲取服務註冊列表、服務下線、服務剔除等等。Eureka 也提供了很多引數讓我們可以根據實際的場景來優化它的一些功能和配置,比如維持心跳的時間、拉取登錄檔的間隔時間、自我保護機制等等。下面我們就從 eureka 的原始碼層面來分析下 eureka 的這些功能以及引數,理解其原理,學習它的一些設計。

二、Eureka 原始碼準備

雖然我們在 pom.xml 中依賴的是 spring-cloud-starter-netflix-eureka-server 和 spring-cloud-starter-netflix-eureka-client,但 spring-cloud-starter-netflix 只是對 eureka 做了封裝,使得其可以通過 springboot 的方式來啟動和初始化,其底層其實是 netflix 的 eureka-core、eureka-client 等。所以我們先分析 netflix eureka 的原始碼,最後再看看 spring-cloud-starter-netflix 的原始碼。

1、原始碼環境準備

① 下載原始碼

Netflix Eureka:https://github.com/Netflix/eureka

Spring Cloud Netflix:https://github.com/spring-cloud/spring-cloud-netflix

克隆 eureka 的原始碼到本地:

$ git clone https://github.com/Netflix/eureka.git

由於我們依賴的是 1.9.25 版本,將程式碼克隆到本地後,將其切換到 1.9.25:

$ git checkout -b 1.9.25

然後到 eureka 根目錄下執行構建的命令:

$ ./gradlew clean build -x test

② IDEA 開啟原始碼

由於 eureka 使用 gradle 管理依賴,所以本地需要先安裝 gradle,之後 IDEA 中也需要安裝 gradle 的外掛,跟 maven 都是類似的,安裝教程可自行百度。

2、Eureka 工程結構

Eureka 主要包含如下模組:

  • eureka-client:eureka 客戶端
  • eureka-core:eureka 服務端,註冊中心的核心功能
  • eureka-resources:基於jsp的eureka控制檯,可以檢視註冊了哪些服務例項
  • eureka-server:註冊中心,集成了 eureka-client、eureka-core、eureka-resources,因為依賴了 eureka-client,因此 eureka-server 也是一個客戶端,在 eureka server 叢集模式下,eureka-server 也會作為客戶端註冊到其它註冊中心上
  • eureka-examples:eureka 例子
  • eureka-test-utils:eureka 單元測試工具
  • eureka-core|client-jersey2:對 jersey 框架的封裝,jersey 類似於 spring mvc,支援 http restful 請求,eureka-client 和 eureka-server 之間的通訊就是基於 jersey 框架來的

三、Eureka Server 啟動初始化

首先要看的是 eureka-server,註冊中心啟起來之後,客戶端才能來註冊服務和發現服務。

1、eureka-server 模組

① eureka-server 目錄

  • resources 目錄中主要是 eureka client 和 server 的配置檔案
  • webapp 下有一個 web.xml 配置檔案,這裡面就配置了啟動初始化的入口,從這也可以看出,eureka-server 會被打包成 war 包來執行
  • test 下有個單元測試類 EurekaClientServerRestIntegrationTest,這裡面就包含了服務註冊、續約、下線等單元測試,我們就可以執行這些單元測試來除錯程式碼

② web.xml

web.xml 的內容:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <web-app version="2.5"
 3          xmlns="http://java.sun.com/xml/ns/javaee"
 4          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
 6     http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
 7   <!-- eureka 啟動初始化類 -->
 8   <listener>
 9     <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
10   </listener>
11 
12   <!-- 狀態過濾器 -->
13   <filter>
14     <filter-name>statusFilter</filter-name>
15     <filter-class>com.netflix.eureka.StatusFilter</filter-class>
16   </filter>
17 
18   <!-- 認證過濾器 -->
19   <filter>
20     <filter-name>requestAuthFilter</filter-name>
21     <filter-class>com.netflix.eureka.ServerRequestAuthFilter</filter-class>
22   </filter>
23 
24   <!-- 限流過濾器 -->
25   <filter>
26     <filter-name>rateLimitingFilter</filter-name>
27     <filter-class>com.netflix.eureka.RateLimitingFilter</filter-class>
28   </filter>
29   <filter>
30     <filter-name>gzipEncodingEnforcingFilter</filter-name>
31     <filter-class>com.netflix.eureka.GzipEncodingEnforcingFilter</filter-class>
32   </filter>
33 
34   <!-- jersey 容器 -->
35   <filter>
36     <filter-name>jersey</filter-name>
37     <filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
38     <init-param>
39       <param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
40       <param-value>/(flex|images|js|css|jsp)/.*</param-value>
41     </init-param>
42     <init-param>
43       <param-name>com.sun.jersey.config.property.packages</param-name>
44       <param-value>com.sun.jersey;com.netflix</param-value>
45     </init-param>
46 
47     <!-- GZIP content encoding/decoding -->
48     <init-param>
49       <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
50       <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
51     </init-param>
52     <init-param>
53       <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
54       <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
55     </init-param>
56   </filter>
57 
58   <filter-mapping>
59     <filter-name>statusFilter</filter-name>
60     <url-pattern>/*</url-pattern>
61   </filter-mapping>
62 
63   <filter-mapping>
64     <filter-name>requestAuthFilter</filter-name>
65     <url-pattern>/*</url-pattern>
66   </filter-mapping>
67 
68   <!-- Uncomment this to enable rate limiter filter.
69   <filter-mapping>
70     <filter-name>rateLimitingFilter</filter-name>
71     <url-pattern>/v2/apps</url-pattern>
72     <url-pattern>/v2/apps/*</url-pattern>
73   </filter-mapping>
74   -->
75 
76   <filter-mapping>
77     <filter-name>gzipEncodingEnforcingFilter</filter-name>
78     <url-pattern>/v2/apps</url-pattern>
79     <url-pattern>/v2/apps/*</url-pattern>
80   </filter-mapping>
81 
82   <filter-mapping>
83     <filter-name>jersey</filter-name>
84     <url-pattern>/*</url-pattern>
85   </filter-mapping>
86 
87   <!-- 歡迎頁 -->
88   <welcome-file-list>
89     <welcome-file>jsp/status.jsp</welcome-file>
90   </welcome-file-list>
91 
92 </web-app>
View Code

web.xml 中可以得知如下資訊:

  • eureka server 啟動時首先通過 com.netflix.eureka.EurekaBootStrap 類來進行啟動初始化相關的工作
  • 配置了 StatusFilter(server 狀態過濾器)、ServerRequestAuthFilter(認證過濾器)、RateLimitingFilter(限流過濾器) 等過濾器,但 RateLimitingFilter 預設沒有啟用
  • 配置了 jersey 的 servlet 容器,其實就跟 springframework 的 DispatcherServlet 是類似的,用來攔截處理 http restful 請求,這塊我們不用過於關注
  • 最後還配置了 eureka server 的歡迎頁為 jsp/status.jsp 頁面,這個頁面在 eureka-resources 模組下,也就是前面看到的 eureka 控制檯頁面

③ 單元測試類 EurekaClientServerRestIntegrationTest

首先看 setUp 方法,每個測試用例執行之前都會先執行 setUp 方法來初始化執行環境。

 1 @BeforeClass
 2 public static void setUp() throws Exception {
 3     // 初始化 eureka 配置
 4     injectEurekaConfiguration();
 5     // 啟動 eureka server,會找 build/libs 目錄下的 eureka-server.*.war 包來執行
 6     // 這一步啟動時,就會載入 web.xm 配置檔案,然後進入 EurekaBootStrap 初始化類
 7     startServer();
 8     // eureka server 配置
 9     createEurekaServerConfig();
10 
11     // 建立 jersey 客戶端,使用 jersey 客戶端來呼叫資源
12     httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder()
13             .withClientName("testEurekaClient")
14             .withConnectionTimeout(1000)
15             .withReadTimeout(1000)
16             .withMaxConnectionsPerHost(1)
17             .withMaxTotalConnections(1)
18             .withConnectionIdleTimeout(1000)
19             .build();
20 
21     jerseyEurekaClient = httpClientFactory.newClient(new DefaultEndpoint(eurekaServiceUrl));
22 
23     ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
24     jerseyReplicationClient = JerseyReplicationClient.createReplicationClient(
25             eurekaServerConfig,
26             serverCodecs,
27             eurekaServiceUrl
28     );
29 }

這個類提供瞭如下的一些測試用例,我們可以執行這些測試用例來進行除錯。

2、EurekaBootStrap 初始化

EurekaBootStrap 是監聽器的入口,實現了 ServletContextListener 介面,主要完成了 eureka server 的啟動初始化。

從 contextInitialized 方法進去,整體上來說,分為 eureka 環境初始化和 eureka server 上下文初始化。

 1 @Override
 2 public void contextInitialized(ServletContextEvent event) {
 3     try {
 4         // eureka 環境初始化
 5         initEurekaEnvironment();
 6         // eureka server 上下文初始化
 7         initEurekaServerContext();
 8 
 9         ServletContext sc = event.getServletContext();
10         sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
11     } catch (Throwable e) {
12         logger.error("Cannot bootstrap eureka server :", e);
13         throw new RuntimeException("Cannot bootstrap eureka server :", e);
14     }
15 }

① eureka環境初始化

initEurekaEnvironment 方法內主要是設定資料中心和執行環境引數:

  • archaius.deployment.datacenter = default
  • archaius.deployment.environment = test

② eureka server 上下文初始化

initEurekaServerContext 上下文初始化則包含了很多階段:

  • 構造 eureka 註冊中心配置:EurekaServerConfig
  • 構造 eureka 例項配置:EurekaInstanceConfig
  • 構造例項資訊:InstanceInfo
  • 構造例項管理器:ApplicationInfoManager 
  • 構造 eureka 客戶端配置:EurekaClientConfig
  • 建立 eureka 客戶端:EurekaClient(DiscoveryClient)
  • 建立登錄檔(可以感知eureka叢集的登錄檔):PeerAwareInstanceRegistry
  • 建立叢集:PeerEurekaNodes
  • 將資訊封裝到eureka上下文:EurekaServerContext
  • 將eureka上下文放到一個全域性容器中:EurekaServerContextHolder
  • 初始化eureka上下文
  • 同步eureka server的登錄檔
  • 開啟追蹤
  • 註冊監控統計
 1 protected void initEurekaServerContext() throws Exception {
 2     // 1、eureka 註冊中心配置
 3     EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
 4 
 5     // For backward compatibility
 6     JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
 7     XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
 8 
 9     logger.info("Initializing the eureka client...");
10     logger.info(eurekaServerConfig.getJsonCodecName());
11     ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
12 
13     ApplicationInfoManager applicationInfoManager = null;
14 
15     if (eurekaClient == null) {
16         // 2、eureka 例項配置
17         EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
18                 ? new CloudInstanceConfig()
19                 : new MyDataCenterInstanceConfig();
20 
21         // 3、構造 InstanceInfo 例項資訊
22         // 4、構造 ApplicationInfoManager 應用管理器
23         applicationInfoManager = new ApplicationInfoManager(
24                 instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
25 
26         // 5、eureka 客戶端配置
27         EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
28         // 6、構造 EurekaClient,DiscoveryClient 封裝了客戶端相關的操作
29         eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
30     } else {
31         applicationInfoManager = eurekaClient.getApplicationInfoManager();
32     }
33 
34     PeerAwareInstanceRegistry registry;
35     if (isAws(applicationInfoManager.getInfo())) {
36         registry = new AwsInstanceRegistry(
37                 eurekaServerConfig,
38                 eurekaClient.getEurekaClientConfig(),
39                 serverCodecs,
40                 eurekaClient
41         );
42         awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
43         awsBinder.start();
44     } else {
45         // 7、構造感知eureka叢集的登錄檔
46         registry = new PeerAwareInstanceRegistryImpl(
47                 eurekaServerConfig,
48                 eurekaClient.getEurekaClientConfig(),
49                 serverCodecs,
50                 eurekaClient
51         );
52     }
53 
54     // 8、構造eureka-server叢集資訊
55     PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
56             registry,
57             eurekaServerConfig,
58             eurekaClient.getEurekaClientConfig(),
59             serverCodecs,
60             applicationInfoManager
61     );
62 
63     // 9、基於前面構造的物件建立 EurekaServerContext
64     serverContext = new DefaultEurekaServerContext(
65             eurekaServerConfig,
66             serverCodecs,
67             registry,
68             peerEurekaNodes,
69             applicationInfoManager
70     );
71 
72     // 將 serverContext 放到 EurekaServerContextHolder 上下文中,
73     // 這樣其它地方都可以通過 EurekaServerContextHolder 拿到 EurekaServerContext
74     EurekaServerContextHolder.initialize(serverContext);
75 
76     // 10、初始化eureka-server上下文
77     serverContext.initialize();
78     logger.info("Initialized server context");
79 
80     // 11、從相鄰的eureka-server同步登錄檔
81     int registryCount = registry.syncUp();
82     ///12、啟動登錄檔,啟動一些定時任務
83     registry.openForTraffic(applicationInfoManager, registryCount);
84 
85     ///13、註冊監控統計
86     EurekaMonitors.registerAllStats();
87 }

3、面向介面的配置讀取

初始化中有三個配置介面,EurekaServerConfig、EurekaInstanceConfig、EurekaClientConfig,分別對應了註冊中心、eureka例項、eureka客戶端的配置獲取。

從它們預設實現類的構造方法進去可以看到,EurekaServerConfig 是讀取的 eureka-server.properties 配置檔案,命名字首是 eureka.server;EurekaInstanceConfig、EurekaClientConfig 是讀取的 eureka-client.properties 配置檔案,命名字首分別是 eureka.instance、eureka.client。

這裡可以看到,eureka 在程式碼中獲取配置的方式是通過介面方法的形式來獲取的,在其預設的實現類裡通過硬編碼的方式定義了配置的編碼以及預設值。這種基於介面的配置讀取方式是可以借鑑的,這種方式讀取配置更易於維護,不用維護一堆常量,如果配置編碼變了只需更改實現類即可。

例如下面的配置:

 1 @Override
 2 public int getExpectedClientRenewalIntervalSeconds() {
 3     final int configured = configInstance.getIntProperty(
 4             namespace + "expectedClientRenewalIntervalSeconds",
 5             30).get();
 6     return configured > 0 ? configured : 30;
 7 }
 8 
 9 @Override
10 public double getRenewalPercentThreshold() {
11     return configInstance.getDoubleProperty(
12             namespace + "renewalPercentThreshold", 0.85).get();
13 }
14 
15 @Override
16 public boolean shouldEnableReplicatedRequestCompression() {
17     return configInstance.getBooleanProperty(
18             namespace + "enableReplicatedRequestCompression", false).get();
19 }

4、基於建造者模式構造服務例項

看 new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get() 這段程式碼,在 get 方法中完成了服務例項資訊的構造。它這裡主要用到了建造者設計模式來構建 LeaseInfo 和 InstanceInfo,以 InstanceInfo 為例,它的內部有一個靜態的 Builder 類,通過 newBuilder() 方法建立了 InstanceInfo 物件,然後可以呼叫 Builder 的屬性設定方法來設定屬性,在設定這些屬性的時候,會做一些關聯性的校驗,在設定完成後,就呼叫 build() 方法返回物件,也可以在 build 方法中再做一些最終的校驗。建造者模式就很適合用於構建這種複雜的物件。

 1 public synchronized InstanceInfo get() {
 2     if (instanceInfo == null) {
 3         // 續約資訊:主要有續約間隔時間(預設30秒)和續約過期時間(預設90秒)
 4         LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
 5                 .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
 6                 .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
 7 
 8         if (vipAddressResolver == null) {
 9             vipAddressResolver = new Archaius1VipAddressResolver();
10         }
11 
12         // 基於建造者模式來建立 InstanceInfo
13         InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);
14 
15         // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname
16         String instanceId = config.getInstanceId();
17         if (instanceId == null || instanceId.isEmpty()) {
18             DataCenterInfo dataCenterInfo = config.getDataCenterInfo();
19             if (dataCenterInfo instanceof UniqueIdentifier) {
20                 instanceId = ((UniqueIdentifier) dataCenterInfo).getId();
21             } else {
22                 instanceId = config.getHostName(false);
23             }
24         }
25 
26         String defaultAddress;
27         if (config instanceof RefreshableInstanceConfig) {
28             // Refresh AWS data center info, and return up to date address
29             defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
30         } else {
31             defaultAddress = config.getHostName(false);
32         }
33 
34         // fail safe
35         if (defaultAddress == null || defaultAddress.isEmpty()) {
36             defaultAddress = config.getIpAddress();
37         }
38 
39         // 設定屬性
40         builder.setNamespace(config.getNamespace())
41                 .setInstanceId(instanceId)
42                 .setAppName(config.getAppname())
43                 .setAppGroupName(config.getAppGroupName())
44                 .setDataCenterInfo(config.getDataCenterInfo())
45                 .setIPAddr(config.getIpAddress())
46                 .setHostName(defaultAddress)
47                 .setPort(config.getNonSecurePort())
48                 .enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
49                 .setSecurePort(config.getSecurePort())
50                 .enablePort(PortType.SECURE, config.getSecurePortEnabled())
51                 .setVIPAddress(config.getVirtualHostName())
52                 .setSecureVIPAddress(config.getSecureVirtualHostName())
53                 .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
54                 .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
55                 .setASGName(config.getASGName())
56                 .setHealthCheckUrls(config.getHealthCheckUrlPath(),
57                         config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());
58 
59 
60         // Start off with the STARTING state to avoid traffic
61         if (!config.isInstanceEnabledOnit()) {
62             InstanceStatus initialStatus = InstanceStatus.STARTING;
63             LOG.info("Setting initial instance status as: {}", initialStatus);
64             builder.setStatus(initialStatus);
65         } else {
66             LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise "
67                      + "itself as available. You would instead want to control this via a healthcheck handler.",
68                      InstanceStatus.UP);
69         }
70 
71         // Add any user-specific metadata information
72         for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
73             String key = mapEntry.getKey();
74             String value = mapEntry.getValue();
75             // only add the metadata if the value is present
76             if (value != null && !value.isEmpty()) {
77                 builder.add(key, value);
78             }
79         }
80 
81         // 呼叫 build 方法做屬性校驗並建立 InstanceInfo 例項
82         instanceInfo = builder.build();
83         instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
84     }
85     return instanceInfo;
86 }
View Code

LeaseInfo 就是續約資訊,可以看到主要的兩個配置就是續約間隔時間和多久未續約認為例項過期,例項過期就會被剔除。然後就是基於 config 設定 InstanceInfo,就是例項資訊,包含了例項ID、主機名稱、埠、LeaseInfo 等等。

5、註冊中心構造客戶端 DiscoveryClient

在叢集模式下,eureka server 也會作為客戶端註冊到其它註冊中心,此時,它本身就是一個 eureka client。因此會去構建 EurekaClient,其預設實現類是 DiscoveryClient。DiscoveryClient 包含了 eureka 客戶端的大部分核心功能,比如服務註冊、續約、維持心跳、拉取登錄檔等。

一步步進入到DiscoveryClient最複雜的那個構造方法,我們先整體分析下做了哪些事情,抓大放小,很多元件的細節等後面分析具體功能的時候再來看。

  • 將 EurekaClientConfig、EurekaInstanceConfig、EurekaTransportConfig、InstanceInfo、ApplicationInfoManager 等儲存到本地變數中
  • 如果要獲取登錄檔,就建立一個登錄檔狀態度量器
  • 如果要註冊到註冊中心,就建立一個心跳狀態度量器
  • 如果不獲取登錄檔且不註冊到註冊中心,就不會建立排程器、心跳執行緒池這些了,釋放一些資源
  • 如果要註冊到註冊中心且要抓取登錄檔,就初始化一些排程的資源:
    • 建立了支援排程的執行緒池,有兩個核心執行緒,從後面可以看出,主要就是處理心跳和快取重新整理的任務
    • 建立了維持心跳的執行緒池,核心執行緒數為1,最大執行緒數配置預設為5
    • 建立了重新整理快取的執行緒池,核心執行緒數為1,最大執行緒數配置預設為5
    • 建立了eureka client 與 eureka server 進行網路通訊的元件 EurekaTransport,並進行了一些初始化 。EurekaTransport 裡的客戶端主要就是封裝了對 server 的 api 呼叫介面,便於呼叫
    • 接著,如果要抓取登錄檔,就會抓取登錄檔了,fetchRegistry 裡面可以看到是分為全量抓取和增量抓取的,第一次啟動的時候就是全量抓取登錄檔
  • 開始初始化排程任務:
    • 如果要抓取登錄檔,就建立重新整理快取的任務,並開始排程,預設每隔30秒抓取一次登錄檔
    • 如果要註冊到註冊中心,就建立傳送心跳的任務,並開始排程,預設每隔30秒傳送一次心跳
    • 如果要註冊到註冊中心,還會建立例項副本傳播器(內部也是一個定時排程任務)、例項狀態變更的監聽器
  1 DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
  2                 Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
  3     if (args != null) {
  4         this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
  5         this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
  6         this.eventListeners.addAll(args.getEventListeners());
  7         this.preRegistrationHandler = args.preRegistrationHandler;
  8     } else {
  9         this.healthCheckCallbackProvider = null;
 10         this.healthCheckHandlerProvider = null;
 11         this.preRegistrationHandler = null;
 12     }
 13 
 14     // 將例項資訊、配置資訊儲存到本地
 15     this.applicationInfoManager = applicationInfoManager;
 16     InstanceInfo myInfo = applicationInfoManager.getInfo();
 17     clientConfig = config;
 18     staticClientConfig = clientConfig;
 19     transportConfig = config.getTransportConfig();
 20     instanceInfo = myInfo;
 21     if (myInfo != null) {
 22         appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
 23     } else {
 24         logger.warn("Setting instanceInfo to a passed in null value");
 25     }
 26 
 27     this.backupRegistryProvider = backupRegistryProvider;
 28     this.endpointRandomizer = endpointRandomizer;
 29     this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
 30     localRegionApps.set(new Applications());
 31 
 32 
 33     fetchRegistryGeneration = new AtomicLong(0);
 34     remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
 35     // 從遠端拉取登錄檔的地址陣列,使用的原子類,在執行中可能會動態更新地址
 36     remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
 37 
 38     // 如果要獲取登錄檔,就會註冊狀態監視器
 39     if (config.shouldFetchRegistry()) {
 40         this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
 41     } else {
 42         this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
 43     }
 44 
 45     // 如果要註冊到 eureka-server,就會建立心跳狀態監視器
 46     if (config.shouldRegisterWithEureka()) {
 47         this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
 48     } else {
 49         this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
 50     }
 51 
 52     logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
 53 
 54     // 如果不註冊到註冊中心,且不拉取登錄檔,就不建立排程器、執行緒池等資源了
 55     if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
 56         logger.info("Client configured to neither register nor query for data.");
 57         scheduler = null;
 58         heartbeatExecutor = null;
 59         cacheRefreshExecutor = null;
 60         eurekaTransport = null;
 61         instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
 62 
 63         // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
 64         // to work with DI'd DiscoveryClient
 65         DiscoveryManager.getInstance().setDiscoveryClient(this);
 66         DiscoveryManager.getInstance().setEurekaClientConfig(config);
 67 
 68         initTimestampMs = System.currentTimeMillis();
 69         initRegistrySize = this.getApplications().size();
 70         registrySize = initRegistrySize;
 71         logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
 72                 initTimestampMs, initRegistrySize);
 73 
 74         return;  // no need to setup up an network tasks and we are done
 75     }
 76 
 77     try {
 78         // 建立定時排程器,預設有2個核心執行緒,主要處理心跳任務和快取重新整理任務
 79         scheduler = Executors.newScheduledThreadPool(2,
 80                 new ThreadFactoryBuilder()
 81                         .setNameFormat("DiscoveryClient-%d")
 82                         .setDaemon(true)
 83                         .build());
 84 
 85         // 維持心跳的執行緒池,一個核心執行緒,最大執行緒數預設5。
 86         // 注意其使用的佇列是 SynchronousQueue 佇列,這個佇列只能放一個任務,一個執行緒將任務取走後,才能放入下一個任務,否則只能阻塞。
 87         heartbeatExecutor = new ThreadPoolExecutor(
 88                 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
 89                 new SynchronousQueue<Runnable>(),
 90                 new ThreadFactoryBuilder()
 91                         .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
 92                         .setDaemon(true)
 93                         .build()
 94         );  // use direct handoff
 95 
 96         // 重新整理快取的執行緒池,一個核心執行緒,最大執行緒資料預設為5
 97         cacheRefreshExecutor = new ThreadPoolExecutor(
 98                 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
 99                 new SynchronousQueue<Runnable>(),
100                 new ThreadFactoryBuilder()
101                         .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
102                         .setDaemon(true)
103                         .build()
104         );  // use direct handoff
105 
106         // eureka http 呼叫客戶端,支援 eureka client 與 eureka server 之間的通訊
107         eurekaTransport = new EurekaTransport();
108         // 初始化 eurekaTransport
109         scheduleServerEndpointTask(eurekaTransport, args);
110 
111         AzToRegionMapper azToRegionMapper;
112         if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
113             azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
114         } else {
115             azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
116         }
117         if (null != remoteRegionsToFetch.get()) {
118             azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
119         }
120         instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
121     } catch (Throwable e) {
122         throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
123     }
124 
125     if (clientConfig.shouldFetchRegistry()) {
126         try {
127             // 拉取登錄檔:全量抓取和增量抓取
128             boolean primaryFetchRegistryResult = fetchRegistry(false);
129             if (!primaryFetchRegistryResult) {
130                 logger.info("Initial registry fetch from primary servers failed");
131             }
132             boolean backupFetchRegistryResult = true;
133             if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
134                 backupFetchRegistryResult = false;
135                 logger.info("Initial registry fetch from backup servers failed");
136             }
137             if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
138                 throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
139             }
140         } catch (Throwable th) {
141             logger.error("Fetch registry error at startup: {}", th.getMessage());
142             throw new IllegalStateException(th);
143         }
144     }
145 
146     // call and execute the pre registration handler before all background tasks (inc registration) is started
147     if (this.preRegistrationHandler != null) {
148         this.preRegistrationHandler.beforeRegistration();
149     }
150 
151     if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
152         try {
153             if (!register() ) {
154                 throw new IllegalStateException("Registration error at startup. Invalid server response.");
155             }
156         } catch (Throwable th) {
157             logger.error("Registration error at startup: {}", th.getMessage());
158             throw new IllegalStateException(th);
159         }
160     }
161 
162     // 初始化一些排程任務:重新整理快取的排程任務、傳送心跳的排程任務、例項副本傳播器
163     initScheduledTasks();
164 
165     try {
166         Monitors.registerObject(this);
167     } catch (Throwable e) {
168         logger.warn("Cannot register timers", e);
169     }
170 
171     // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
172     // to work with DI'd DiscoveryClient
173     DiscoveryManager.getInstance().setDiscoveryClient(this);
174     DiscoveryManager.getInstance().setEurekaClientConfig(config);
175 
176     // 初始化的時間
177     initTimestampMs = System.currentTimeMillis();
178     initRegistrySize = this.getApplications().size();
179     registrySize = initRegistrySize;
180     logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
181             initTimestampMs, initRegistrySize);
182 }
183 
184 ////////////////////////////////////////////////////////////////////
185 
186 private void initScheduledTasks() {
187     if (clientConfig.shouldFetchRegistry()) {
188         // 抓取登錄檔的間隔時間,預設30秒
189         int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
190         // 重新整理快取排程器延遲時間擴大倍數,在任務超時的時候,將擴大延遲時間
191         // 這在出現網路抖動、eureka-sever 不可用時,可以避免頻繁發起無效的排程
192         int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
193         // 登錄檔重新整理的定時任務
194         cacheRefreshTask = new TimedSupervisorTask(
195                 "cacheRefresh",
196                 scheduler,
197                 cacheRefreshExecutor,
198                 registryFetchIntervalSeconds,
199                 TimeUnit.SECONDS,
200                 expBackOffBound,
201                 new CacheRefreshThread() // 重新整理登錄檔的任務
202         );
203         // 30秒後開始排程重新整理登錄檔的任務
204         scheduler.schedule(
205                 cacheRefreshTask,
206                 registryFetchIntervalSeconds, TimeUnit.SECONDS);
207     }
208 
209     if (clientConfig.shouldRegisterWithEureka()) {
210         // 續約間隔時間,預設30秒
211         int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
212         // 心跳排程器的延遲時間擴大倍數,預設10
213         int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
214         logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
215 
216         // 心跳的定時任務
217         heartbeatTask = new TimedSupervisorTask(
218                 "heartbeat",
219                 scheduler,
220                 heartbeatExecutor,
221                 renewalIntervalInSecs,
222                 TimeUnit.SECONDS,
223                 expBackOffBound,
224                 new HeartbeatThread()
225         );
226         // 30秒後開始排程心跳的任務
227         scheduler.schedule(
228                 heartbeatTask,
229                 renewalIntervalInSecs, TimeUnit.SECONDS);
230 
231         // 例項副本傳播器,用於定時更新自己狀態
232         instanceInfoReplicator = new InstanceInfoReplicator(
233                 this,
234                 instanceInfo,
235                 clientConfig.getInstanceInfoReplicationIntervalSeconds(),
236                 2); // burstSize
237 
238         // 例項狀態變更的監聽器
239         statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
240             @Override
241             public String getId() {
242                 return "statusChangeListener";
243             }
244 
245             @Override
246             public void notify(StatusChangeEvent statusChangeEvent) {
247                 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
248                     logger.error("Saw local status change event {}", statusChangeEvent);
249                 } else {
250                     logger.info("Saw local status change event {}", statusChangeEvent);
251                 }
252                 instanceInfoReplicator.onDemandUpdate();
253             }
254         };
255 
256         // 向 ApplicationInfoManager 註冊監聽器
257         if (clientConfig.shouldOnDemandUpdateStatusChange()) {
258             applicationInfoManager.registerStatusChangeListener(statusChangeListener);
259         }
260 
261         // 啟動副本傳播器,預設延遲時間40秒
262         instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
263     } else {
264         logger.info("Not registering with Eureka server per configuration");
265     }
266 }
View Code

6、定時任務監管器的設計

可以看到,eureka client 為了定時傳送心跳以及定時抓取登錄檔,使用了定時任務和排程器,我覺得它這裡的定時排程的設計思想是可以參考和借鑑的。

以心跳任務的這段程式碼為例:

 1 if (clientConfig.shouldFetchRegistry()) {
 2     // 抓取登錄檔的間隔時間,預設30秒
 3     int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
 4     // 重新整理快取排程器延遲時間擴大倍數,在任務超時的時候,將擴大延遲時間
 5     // 這在出現網路抖動、eureka-sever 不可用時,可以避免頻繁發起無效的排程
 6     int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
 7     // 登錄檔重新整理的定時任務
 8     cacheRefreshTask = new TimedSupervisorTask(
 9             "cacheRefresh",
10             scheduler,
11             cacheRefreshExecutor,
12             registryFetchIntervalSeconds,
13             TimeUnit.SECONDS,
14             expBackOffBound,
15             new CacheRefreshThread() // 重新整理登錄檔的任務
16     );
17     // 30秒後開始排程重新整理登錄檔的任務
18     scheduler.schedule(
19             cacheRefreshTask,
20             registryFetchIntervalSeconds, TimeUnit.SECONDS);
21 }

上面這段程式碼其實並不複雜,主要就是建立了一個定時任務,然後使用排程器在一定的延遲之後開始排程。但它這裡並不是直接使用排程器排程任務(CacheRefreshThread),也不是以一個固定的頻率排程(每隔30秒)。它定義了一個任務的監管器 TimedSupervisorTask,在建立這個監管器的時候,傳入了排程器、要執行的任務、以及間隔時間等引數,然後排程器排程 TimedSupervisorTask。

看 TimedSupervisorTask 的構造方法,主要有以下幾個點:

  • 任務的超時時間等於間隔時間,也就是預設30秒的超時時間,然後延遲時間預設等於超時時間  如果 eureka server down 了,或者網路問題,就有可能出現超時
  • 設定了最大的延遲時間,預設在超時時間的基礎上擴大10倍,即300秒
  • 最後建立了一些計數器,分別統計成功、超時、拒絕、異常的次數,可以看到,它這裡對任務的排程是有做統計的
 1 public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
 2                            int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
 3     this.name = name;
 4     this.scheduler = scheduler;
 5     this.executor = executor;
 6     // 任務超時時間就等於任務排程的間隔時間
 7     this.timeoutMillis = timeUnit.toMillis(timeout);
 8     this.task = task;
 9     // 延遲時間預設為超時時間
10     this.delay = new AtomicLong(timeoutMillis);
11     // 最大延遲時間,預設在超時時間的基礎上擴大10倍
12     this.maxDelay = timeoutMillis * expBackOffBound;
13 
14     // 初始化計數器並註冊
15     successCounter = Monitors.newCounter("success");
16     timeoutCounter = Monitors.newCounter("timeouts");
17     rejectedCounter = Monitors.newCounter("rejectedExecutions");
18     throwableCounter = Monitors.newCounter("throwables");
19     threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
20     Monitors.registerObject(name, this);
21 }

再看 TimedSupervisorTask 的 run 方法:

  • 1)首先將任務非同步提交到執行緒池去執行,它這裡並不是直接執行任務,而是非同步提交到執行緒池中,這樣可以實現超時等待,不影響主任務
  • 2)任務如果超時,比如出現網路延遲、eureka server 不可用等情況,超時了,它這個時候就會認為如果還是30秒後排程,可能 eureka server 還是不可用的狀態,那麼就增大延遲時間,那麼第一次超時就會在300秒後再排程。如果300秒內 eureka server 可用了,然後有新的服務例項註冊上去了,那這個客戶端就不能及時感知到了,因此我覺得可以將 getCacheRefreshExecutorExponentialBackOffBound 對應的引數適當設定小一點(預設10倍)。
  • 3)如果任務沒有超時,在排程成功後,就會重置延遲時間為預設的超時時間。最後在 finally 中進行下一次的排程。
 1 public void run() {
 2     Future<?> future = null;
 3     try {
 4         // 提交任務到執行緒池
 5         future = executor.submit(task);
 6         threadPoolLevelGauge.set((long) executor.getActiveCount());
 7         // 阻塞直到任務完成或超時
 8         future.get(timeoutMillis, TimeUnit.MILLISECONDS);
 9         // 任務完成後,重置延遲時間為超時時間,即30秒
10         delay.set(timeoutMillis);
11         threadPoolLevelGauge.set((long) executor.getActiveCount());
12         // 成功次數+1
13         successCounter.increment();
14     } catch (TimeoutException e) {
15         logger.warn("task supervisor timed out", e);
16         // 超時次數+1
17         timeoutCounter.increment();
18 
19         // 如果任務超時了,就會增大延遲時間,當前延遲時間*2,然後取一個最大值
20         long currentDelay = delay.get();
21         long newDelay = Math.min(maxDelay, currentDelay * 2);
22         // 設定為最大的一個延遲時間
23         delay.compareAndSet(currentDelay, newDelay);
24 
25     } catch (RejectedExecutionException e) {
26         if (executor.isShutdown() || scheduler.isShutdown()) {
27             logger.warn("task supervisor shutting down, reject the task", e);
28         } else {
29             logger.warn("task supervisor rejected the task", e);
30         }
31 
32         rejectedCounter.increment();
33     } catch (Throwable e) {
34         if (executor.isShutdown() || scheduler.isShutdown()) {
35             logger.warn("task supervisor shutting down, can't accept the task");
36         } else {
37             logger.warn("task supervisor threw an exception", e);
38         }
39 
40         throwableCounter.increment();
41     } finally {
42         if (future != null) {
43             future.cancel(true);
44         }
45 
46         if (!scheduler.isShutdown()) {
47             // 延遲 delay 時間後,繼續排程任務
48             scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
49         }
50     }
51 }

總結一下這塊設計:

  • 1)首先在遠端呼叫的時候要考慮到網路不可用、server 端 down 了等情況導致呼叫超時,可以使用執行緒池非同步提交任務,實現等待超時機制。
  • 2)超時之後,可以假想服務恢復可用狀態可能需要一定的時間,如果還是按原來的時間間隔排程,可能還是會超時,因此增大延遲時間。如果呼叫成功,說明已經恢復了,則重置延遲時間。
  • 3)定時任務的排程以一定的延遲時間來迴圈排程(schedule),延遲時間可以根據實際情況變化,而不是一開始就按一個固定的頻率來排程(scheduleAtFixedRate)。
  • 4)定時任務、執行緒池裡的任務,最好做好任務執行狀態的統計,便於觀察任務的排程情況。

7、構造登錄檔

接著構造 PeerAwareInstanceRegistry,從命名來看,這是一個可以感知 eureka 叢集的登錄檔,就是在叢集模式下,eureka server 從其它 server 節點拉取登錄檔。它的預設實現類是 PeerAwareInstanceRegistryImpl,繼承自 AbstractInstanceRegistry,就是例項登錄檔。

① 構造 PeerAwareInstanceRegistry

進入 PeerAwareInstanceRegistryImpl 的構造方法:

  • 首先是將前面構造的 EurekaServerConfig、EurekaClientConfig、EurekaClient 等傳入構造方法來構造 PeerAwareInstanceRegistry
  • 呼叫了 super 的構造方法,主要初始化了如下幾個東西:
    • 儲存最近下線例項的迴圈佇列
    • 儲存最近註冊例項的迴圈佇列
    • 最近一分鐘續約次數的計數器
    • 定時任務剔除 recentlyChangedQueue 中的例項
  • 然後建立了一個最近一分鐘叢集同步次數的計數器 numberOfReplicationsLastMin。MeasuredRate 我們到後面再來分析它的設計。
 1 public PeerAwareInstanceRegistryImpl(
 2         EurekaServerConfig serverConfig,
 3         EurekaClientConfig clientConfig,
 4         ServerCodecs serverCodecs,
 5         EurekaClient eurekaClient
 6 ) {
 7     super(serverConfig, clientConfig, serverCodecs);
 8     this.eurekaClient = eurekaClient;
 9     // 最近一分鐘叢集同步的次數計數器
10     this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
11     // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
12     // then we check the status of a potentially existing lease.
13     this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
14             new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
15 }
16 
17 
18 ///////////////////////////////////////////////
19 
20 
21 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
22     this.serverConfig = serverConfig;
23     this.clientConfig = clientConfig;
24     this.serverCodecs = serverCodecs;
25     // 最近下線的迴圈佇列
26     this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
27     // 最近註冊的迴圈佇列
28     this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
29 
30     // 最近一分鐘續約的計數器
31     this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
32     // 一個定時排程任務,定時剔除最近改變佇列中過期的例項
33     this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
34             serverConfig.getDeltaRetentionTimerIntervalInMs(),
35             serverConfig.getDeltaRetentionTimerIntervalInMs());
36 }
View Code

這塊的具體細節等後面分析具體功能的時候再來看,我們先知道有這些佇列、計數器就行了。

② 迴圈佇列 CircularQueue 的設計

從構造方法可以看到,它使用了迴圈佇列來儲存最近下線和最近註冊的例項資訊,容量固定為1000,這樣就把最近的例項數量控制在1000以內。

CircularQueue 是它自定義的一個迴圈佇列,繼承自 AbstractQueue。其內部其實就是代理了 ArrayBlockingQueue,然後重寫了入隊的 offer 方法,當佇列滿了,就取出頭部的一個元素,然後再放到佇列尾部。

 1 static class CircularQueue<E> extends AbstractQueue<E> {
 2     private final ArrayBlockingQueue<E> delegate;
 3     private final int capacity;
 4 
 5     public CircularQueue(int capacity) {
 6         this.capacity = capacity;
 7         this.delegate = new ArrayBlockingQueue<>(capacity);
 8     }
 9 
10     @Override
11     public Iterator<E> iterator() {
12         return delegate.iterator();
13     }
14 
15     @Override
16     public int size() {
17         return delegate.size();
18     }
19 
20     @Override
21     public boolean offer(E e) {
22         // 如果佇列滿了,就取出頭部的一個元素,然後再放到尾部
23         while (!delegate.offer(e)) {
24             delegate.poll();
25         }
26         return true;
27     }
28 
29     @Override
30     public E poll() {
31         return delegate.poll();
32     }
33 
34     @Override
35     public E peek() {
36         return delegate.peek();
37     }
38 }

8、建立 Eureka Server 上下文並初始化

接下來先是建立了 PeerEurekaNodes,應該就是代表 eureka 叢集的。然後基於前面建立的一些東西建立 eureka server 上下文 EurekaServerContext,從 DefaultEurekaServerContext 構造方法進去可以看到,只是將前面構造的東西封裝起來,便於全域性使用。然後將 serverContext 放到 EurekaServerContextHolder 中,這樣其它地方就可以通過這個 holder 獲取 serverContext 了。

接著就是初始化eureka server上下文:

  • 啟動 eureka 叢集:
    • 主要是啟動一個定時任務(間隔時間預設10分鐘)更新 eureka 叢集節點的資訊,根據配置的 eureka server 地址更新 PeerEurekaNode,這樣當有 eureka server 下線或上線後,就可以及時感知到其它 server 節點。PeerEurekaNode 主要就是用於叢集節點間的資料同步,這塊後面分析叢集的時候再具體分析。
  • 登錄檔初始化:
    • 首先啟動了前面建立的計數器:numberOfReplicationsLastMin
    • 初始化響應快取,eureka server 構造了一個多級快取來響應客戶端抓取登錄檔的請求,這個多級快取的設計就是響應頻繁抓取登錄檔請求的核心所在,等後面分析客戶端抓取登錄檔的時候再具體分析
    • 定時排程任務更新續約閾值,主要就是更新 numberOfRenewsPerMinThreshold 這個值,即每分鐘續約次數,等分析續約的時候再來分析
    • 初始化 RemoteRegionRegistry,猜測是跟 eureka 多個區域(region)部署有關的
 1 public void initialize() {
 2     logger.info("Initializing ...");
 3     // 啟動eureka叢集
 4     peerEurekaNodes.start();
 5     try {
 6         // 登錄檔初始化
 7         registry.init(peerEurekaNodes);
 8     } catch (Exception e) {
 9         throw new RuntimeException(e);
10     }
11     logger.info("Initialized");
12 }

PeerEurekaNodes 的 start 方法:

 1 public void start() {
 2     // 單個執行緒的執行緒池
 3     taskExecutor = Executors.newSingleThreadScheduledExecutor(
 4             new ThreadFactory() {
 5                 @Override
 6                 public Thread newThread(Runnable r) {
 7                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
 8                     thread.setDaemon(true);
 9                     return thread;
10                 }
11             }
12     );
13     try {
14         // 根據叢集地址更新 PeerEurekaNode,PeerEurekaNode 就包含了排程其它註冊中心的客戶端
15         updatePeerEurekaNodes(resolvePeerUrls());
16         Runnable peersUpdateTask = new Runnable() {
17             @Override
18             public void run() {
19                 try {
20                     updatePeerEurekaNodes(resolvePeerUrls());
21                 } catch (Throwable e) {
22                     logger.error("Cannot update the replica Nodes", e);
23                 }
24 
25             }
26         };
27         // 定時跟新叢集資訊 PeerEurekaNode,如果有eureka-server不可用了,就可以及時下線,或者新上線了eureka-server,可以及時感知到
28         taskExecutor.scheduleWithFixedDelay(
29                 peersUpdateTask,
30                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
31                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
32                 TimeUnit.MILLISECONDS
33         );
34     } catch (Exception e) {
35         throw new IllegalStateException(e);
36     }
37     for (PeerEurekaNode node : peerEurekaNodes) {
38         logger.info("Replica node URL:  {}", node.getServiceUrl());
39     }
40 }
View Code

PeerAwareInstanceRegistryImpl 的 init 方法:

 1 public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
 2     // 啟動計數器
 3     this.numberOfReplicationsLastMin.start();
 4     this.peerEurekaNodes = peerEurekaNodes;
 5     // 初始化響應快取,eureka server 構造了一個多級快取來響應客戶端抓取登錄檔的請求
 6     initializedResponseCache();
 7     // 定時排程任務更新續約閥值,主要就是更新 numberOfRenewsPerMinThreshold 這個值,即每分鐘續約次數
 8     scheduleRenewalThresholdUpdateTask();
 9     // 初始化 RemoteRegionRegistry
10     initRemoteRegionRegistry();
11 
12     try {
13         Monitors.registerObject(this);
14     } catch (Throwable e) {
15         logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
16     }
17 }
View Code

9、完成 Eureka Server 初始化

接下來看最後幾步:

  • 首先呼叫 registry.syncUp() 將 EurekaClient 本地的例項同步到登錄檔,在叢集模式下,eureka server 也是一個客戶端,因此會獲取到其它註冊中心