1. 程式人生 > >SpringCloud 原始碼系列(3)—— 註冊中心 Eureka(下)

SpringCloud 原始碼系列(3)—— 註冊中心 Eureka(下)

SpringCloud 原始碼系列(1)—— 註冊中心 Eureka(上)

SpringCloud 原始碼系列(2)—— 註冊中心 Eureka(中)

SpringCloud 原始碼系列(3)—— 註冊中心 Eureka(下)

十一、Eureka Server 叢集

在實際的生產環境中,可能有幾十個或者幾百個的微服務例項,Eureka Server 承擔了非常高的負載,而且為了保證註冊中心高可用,一般都要部署成叢集的,下面就來看看 eureka server 的叢集。

1、搭建 Eureka Server 叢集

首先來搭建一個三個節點的 eureka-server 叢集,看看效果。

① 叢集配置

首先在本地 hosts 檔案中配置如下對映:

1 127.0.0.1 peer1
2 127.0.0.1 peer2
3 127.0.0.1 peer3

更改註冊中心的 application.yml 配置檔案,增加三個 profile,分別對應三個 eureka-server 的客戶端配置。

eureka-server 在叢集中作為客戶端就需要抓取登錄檔,並配置 eureka-server 的地址。

 1 spring:
 2   application:
 3     name: sunny-register
 4 
 5 ---
 6 spring:
 7   profiles: peer1
 8 server:
 9   port: 8001
10 
11 eureka:
12   instance:
13     hostname: peer1
14   client:
15     # 是否向註冊中心註冊自己
16     register-with-eureka: false
17     # 是否抓取登錄檔
18     fetch-registry: true
19     service-url:
20       defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka
21 
22 
23 ---
24 spring:
25   profiles: peer2
26 server:
27   port: 8002
28 
29 eureka:
30   instance:
31     hostname: peer2
32   client:
33     # 是否向註冊中心註冊自己
34     register-with-eureka: false
35     # 是否抓取登錄檔
36     fetch-registry: true
37     service-url:
38       defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka
39 
40 ---
41 spring:
42   profiles: peer3
43 server:
44   port: 8003
45 
46 eureka:
47   instance:
48     hostname: peer3
49   client:
50     # 是否向註冊中心註冊自己
51     register-with-eureka: false
52     # 是否抓取登錄檔
53     fetch-registry: true
54     service-url:
55       defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka

② 啟動叢集

分別啟動三個註冊中心,環境變數 spring.profiles.active 啟用對應的叢集配置。

啟動之後訪問 http://peer1:8001/ 進入 peer1 這個註冊中心,就可以看到另外兩個分片 peer2、peer3,說明叢集中有3個節點了。

③ 啟動客戶端

首先客戶端配置增加叢集地址:

1 eureka:
2   client:
3     serviceUrl:
4       defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka

啟動幾個客戶端例項,過一會 之後,會發現三個 eureka-server 上都註冊上去了:

到此 eureka-server 叢集就搭建起來了,可以看到註冊中心的例項會互相同步,每隔註冊註冊都可以接收註冊、續約、下線等請求,它們是對等的。

2、Eureka Server 叢集架構

一般來說,分散式系統的資料在多個副本之間的複製方式,可分為主從複製和對等複製。

① 主從複製

主從複製就是 Master-Slave 模式,即一個主副本,其它副本都為從副本。所有對資料的寫操作都提交到主副本,然後再由主副本同步到從副本。

對於主從複製模式來說,寫操作的壓力都在主副本上,它是整個系統的瓶頸,而從副本則可以幫助主副本分擔讀請求。

② 對等複製

對等複製就是 Peer to Peer 的模式,副本之間不分主從,任何副本都可以接收寫操作,每個副本之間相互進行資料更新同步。

Peer to Peer 模式每個副本之間都可以接收寫請求,不存在寫操作壓力瓶頸。但是由於每個副本都可以進行寫操作,各個副本之間的資料同步及衝突處理是一個棘手的問題。

③ Eureka Server 叢集架構

Eureka Server 採用的就是 Peer to Peer 的複製模式,比如一個客戶端例項隨機向其中一個server註冊,然後它就會同步到其它節點中。

3、Eureka Server 啟動時抓取登錄檔

前面已經分析過了,在 eureka server 啟動初始化的時候,即 EurekaBootStrap 初始化類,先初始化了 DiscoveryClient,DiscoveryClient 會向註冊中心全量抓取登錄檔到本地。

初始化的最後呼叫了 registry.syncUp() 來同步登錄檔,就是將 DiscoveryClient 快取的例項註冊到 eureka-server 的登錄檔裡去。

需要注意的是 eureka 配置的登錄檔同步重試次數預設為5,springcloud 中預設為 0,因此需要新增如下配置來開啟登錄檔同步。

1 eureka:
2   server:
3     registry-sync-retries: 5

將 DiscoveryClient 本地的例項註冊到登錄檔中:

4、叢集節點同步

① 註冊、續約、下線

前面也分析過了,在客戶端註冊、續約、下線的時候,都會同步到叢集其它節點。可以看到都呼叫了 replicateToPeers 方法來複制到其它叢集。

 1 /////////////////////// 註冊 ///////////////////////
 2 public void register(final InstanceInfo info, final boolean isReplication) {
 3     int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
 4     // 如果例項中沒有周期的配置,就設定為預設的 90 秒
 5     if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
 6         leaseDuration = info.getLeaseInfo().getDurationInSecs();
 7     }
 8     // 註冊例項
 9     super.register(info, leaseDuration, isReplication);
10     // 複製到叢集其它 server 節點
11     replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
12 }
13 
14 
15 /////////////////////// 下線 ///////////////////////
16 public boolean cancel(final String appName, final String id,
17                       final boolean isReplication) {
18     if (super.cancel(appName, id, isReplication)) {
19         replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
20 
21         return true;
22     }
23     return false;
24 }
25 
26 
27 /////////////////////// 續約 ///////////////////////
28 public boolean renew(final String appName, final String id, final boolean isReplication) {
29     // 呼叫父類(AbstractInstanceRegistry)的 renew 續約
30     if (super.renew(appName, id, isReplication)) {
31         // 續約完成後同步到叢集其它節點
32         replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
33         return true;
34     }
35     return false;
36 }

② 同步到其它節點

來看看 replicateToPeers 方法:

  • 首先判斷 isReplication 引數,如果是叢集複製操作,最近一分鐘複製次數 numberOfReplicationsLastMin + 1。isReplication 是在請求頭中指定的,請求頭為 PeerEurekaNode.HEADER_REPLICATION(x-netflix-discovery-replication)。
  • 接著遍歷叢集列表,複製例項操作到叢集節點中。前面也分析過了,PeerEurekaNode 就代表了一個 eureka-server,PeerEurekaNodes 就代表了 eureka-server 叢集。
  • 複製例項操作到叢集的方法 replicateInstanceActionsToPeers 就是根據不同的操作型別呼叫叢集 PeerEurekaNode 對應的方法完成操作複製。
 1 private void replicateToPeers(Action action, String appName, String id,
 2                               InstanceInfo info /* optional */,
 3                               InstanceStatus newStatus /* optional */, boolean isReplication) {
 4     Stopwatch tracer = action.getTimer().start();
 5     try {
 6         if (isReplication) {
 7             // 如果是來自其它server節點的註冊請求,則最近一分鐘叢集同步次數+1
 8             numberOfReplicationsLastMin.increment();
 9         }
10         // If it is a replication already, do not replicate again as this will create a poison replication
11         if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
12             return;
13         }
14 
15         // 如果是來自客戶端的註冊請求,就同步到叢集中其它server節點
16         for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
17             // If the url represents this host, do not replicate to yourself.
18             if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
19                 continue;
20             }
21 
22             replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
23         }
24     } finally {
25         tracer.stop();
26     }
27 }
 1 private void replicateInstanceActionsToPeers(Action action, String appName,
 2                                              String id, InstanceInfo info, InstanceStatus newStatus,
 3                                              PeerEurekaNode node) {
 4     try {
 5         InstanceInfo infoFromRegistry;
 6         CurrentRequestVersion.set(Version.V2);
 7         switch (action) {
 8             case Cancel:
 9                 // 下線
10                 node.cancel(appName, id);
11                 break;
12             case Heartbeat:
13                 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
14                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
15                 // 續約
16                 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
17                 break;
18             case Register:
19                 // 註冊
20                 node.register(info);
21                 break;
22             case StatusUpdate:
23                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
24                 node.statusUpdate(appName, id, newStatus, infoFromRegistry);
25                 break;
26             case DeleteStatusOverride:
27                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
28                 node.deleteStatusOverride(appName, id, infoFromRegistry);
29                 break;
30         }
31     } catch (Throwable t) {
32         logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
33     } finally {
34         CurrentRequestVersion.remove();
35     }
36 }

③ isReplication

PeerEurekaNode 與 eureka-server 通訊的元件是 JerseyReplicationClient,這個類重寫了 addExtraHeaders 方法,並添加了請求頭 PeerEurekaNode.HEADER_REPLICATION,設定為 true。

這樣其它 eureka-server 收到這個複製操作後,就知道是來自叢集節點的同步操作,就不會再同步給其它節點了,從而避免死迴圈。

1 @Override
2 protected void addExtraHeaders(Builder webResource) {
3     webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
4 }

十二、叢集同步機制

Eureka Server 叢集間同步機制還是比較複雜的,試想如果每次客戶端的請求一過來,比如註冊、心跳,然後 eureka-server 就立馬同步給叢集中其它 server 節點,那 eureka-server 這種 Peer to Peer 的模式實際上就無法分擔客戶端的寫操作壓力,相當於每個 eureka-server 接收到的請求量都是一樣的。那 eureka server 為了避免這種情況,底層採用了三層佇列,加批量任務的方式來進行叢集間的同步。簡單來說就是先將客戶端操作放入佇列中,然後從佇列中取出一批操作,然後將這一批操作傳送給其它 Server 節點,Server節點接收到之後再將這批操作解析到本地。下面就來詳細看看是如何實現的。

1、叢集節點 PeerEurekaNode

之前分析 eureka-server 啟動初始化的時候,EurekaBootStrap 初始化了代表叢集的 PeerEurekaNodes,它裡面又根據配置的註冊中心地址構造了 PeerEurekaNode,叢集間同步核心的元件就是這個 PeerEurekaNode 了。下面以客戶端註冊為例來看下是如何同步的。

① 註冊同步

replicateInstanceActionsToPeers 中呼叫了 PeerEurekaNode 的 register 方法來同步註冊操作到叢集。

node.register 方法:

  • 可以看到先計算了過期時間,為當前時間 + 租約間隔時間(預設90秒)
  • 然後呼叫了 batchingDispatcher 批量任務分發器來處理任務,提交了一個 InstanceReplicationTask 的例項,其 execute 方法中呼叫了 replicationClient 來向這個 server 註冊同步。
 1 public void register(final InstanceInfo info) throws Exception {
 2     // 過期時間:當前時間 + 租約時間(預設90秒)
 3     long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
 4     batchingDispatcher.process(
 5             taskId("register", info),
 6             new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
 7                 public EurekaHttpResponse<Void> execute() {
 8                     return replicationClient.register(info);
 9                 }
10             },
11             expiryTime
12     );
13 }

再看下 getLeaseRenewalOf 這個方法,這裡應該是有bug的,這個方法返回的是毫秒數,可以看到它的衛語句的else部分是乘以 1000 了的,而 if 部分則沒有,返回的是 90,不過這裡 info.getLeaseInfo() 應該都不會為 null。

1 private static int getLeaseRenewalOf(InstanceInfo info) {
2     // bug : Lease.DEFAULT_DURATION_IN_SECS * 1000
3     return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
4 }

② PeerEurekaNode 的構造

batchingDispatcher 是在 PeerEurekaNode 的構造方法中初始化的,來看下它的構造方法:

  • registry:本地登錄檔
  • targetHost:eureka-server host
  • replicationClient:基於 jersey 的叢集複製客戶端通訊元件,它在請求頭中設定了 PeerEurekaNode.HEADER_REPLICATION 為 true
  • serviceUrl:eureka-server 地址
  • maxProcessingDelayMs:最大處理延遲毫秒數,預設為30000毫秒,即30秒,在下線的時候有用到
  • batcherName:批處理器名稱
  • taskProcessor:複製任務處理器,它封裝了 targetHost 和 replicationClient,主要就是 ReplicationTaskProcessor 在處理批量任務的提交
  • batchingDispatcher:批量任務分發器,它會將任務打成一個批次提交到 eureka-server,避免多次請求eureka-server,註冊時就是先用這個分發器提交的任務
  • nonBatchingDispatcher:非批量任務分發器,就是一個任務一個任務的提交
 1 public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
 2     this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);
 3 }
 4 
 5 PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
 6                                  HttpReplicationClient replicationClient, EurekaServerConfig config,
 7                                  int batchSize, long maxBatchingDelayMs,
 8                                  long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
 9     this.registry = registry;
10     // 叢集節點 host
11     this.targetHost = targetHost;
12     this.replicationClient = replicationClient;
13 
14     // 叢集節點地址
15     this.serviceUrl = serviceUrl;
16     this.config = config;
17     // 最大延遲時間 預設30秒
18     this.maxProcessingDelayMs = config.getMaxTimeForReplication();
19 
20     // 批處理器名稱
21     String batcherName = getBatcherName();
22 
23     // 複製任務處理器
24     ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
25     // 批量任務分發器
26     this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
27             batcherName,
28             // 複製池裡最大容量,預設 10000
29             config.getMaxElementsInPeerReplicationPool(),
30             batchSize, // 250
31             // 同步使用的最大執行緒數 預設 20
32             config.getMaxThreadsForPeerReplication(),
33             maxBatchingDelayMs, // 500
34             serverUnavailableSleepTimeMs, // 1000
35             retrySleepTimeMs, // 100
36             taskProcessor
37     );
38     // 單個任務分發器
39     this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
40             targetHost,
41             config.getMaxElementsInStatusReplicationPool(),
42             config.getMaxThreadsForStatusReplication(),
43             maxBatchingDelayMs,
44             serverUnavailableSleepTimeMs,
45             retrySleepTimeMs,
46             taskProcessor
47     );
48 }

2、批量分發器 TaskDispatcher

建立 batchingDispatcher 時呼叫了 TaskDispatchers.createBatchingTaskDispatcher 方法建立了 batchingDispatcher。

首先看下 createBatchingTaskDispatcher 的引數及預設值,後面分析程式碼的時候會用到這些引數:

  • id:批量分發器的名稱
  • maxBufferSize:快取池最大數量,預設 10000
  • workloadSize:工作負載數量,即一個批次最多多少任務,預設 250
  • workerCount:工作者數量,這個是執行緒池執行緒工作執行緒的數量,預設20
  • maxBatchingDelay:批量任務最大延遲毫秒數,預設為 500 毫秒
  • congestionRetryDelayMs:阻塞重試延遲毫秒數,預設為 1000 毫秒
  • networkFailureRetryMs:網路失敗重試延遲毫秒數,預設為 100 毫秒
  • taskProcessor:任務處理器,即 ReplicationTaskProcessor

再看下這個方法:

  • 首先建立了一個接收者執行器 AcceptorExecutor,主要的引數是快取、時間相關的
  • 再建立了一個任務處理器 TaskExecutors,主要的引數是工作執行緒數、任務處理器以及接收者執行器,可以猜測這應該就是最終執行批量任務提交的執行器
  • 最後建立了任務分發器 TaskDispatcher,從它的 process 方法可以看出,分發器提交的任務實際上又提交給了 AcceptorExecutor

從這裡可以知道,前面註冊時 batchingDispatcher.process() 提交的任務其實就是分發到 acceptorExecutor 這個接收者執行器了。建立的這個分發器 TaskDispatcher 主要有接收者執行器 AcceptorExecutor 和 任務處理器 TaskExecutors 這兩個元件,核心的分發功能就在這兩個元件中。

 1 public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize,
 2                                                                          int workerCount, long maxBatchingDelay, long congestionRetryDelayMs,
 3                                                                          long networkFailureRetryMs, TaskProcessor<T> taskProcessor) {
 4     // 接收者執行器 AcceptorExecutor
 5     final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
 6             id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
 7     );
 8 
 9     // 任務處理器 TaskExecutors, workerCount = 20
10     final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
11 
12     return new TaskDispatcher<ID, T>() {
13         @Override
14         public void process(ID id, T task, long expiryTime) {
15             // 任務由 acceptorExecutor 處理
16             acceptorExecutor.process(id, task, expiryTime);
17         }
18 
19         @Override
20         public void shutdown() {
21             acceptorExecutor.shutdown();
22             taskExecutor.shutdown();
23         }
24     };
25 }

3、接收者執行器 AcceptorExecutor

先看下建立 AcceptorExecutor 的構造方法:

  • 根據 congestionRetryDelayMs、networkFailureRetryMs 建立了一個時間調整器 TrafficShaper,應該主要就是用來調整補償時間的
  • 然後建立了一個後臺執行緒 acceptorThread,它執行的任務是 AcceptorRunner,主要就是將任務轉成批量任務的
  • 最後就是註冊了一些監控統計之類的
 1 AcceptorExecutor(String id,
 2                  int maxBufferSize,
 3                  int maxBatchingSize,
 4                  long maxBatchingDelay,
 5                  long congestionRetryDelayMs,
 6                  long networkFailureRetryMs) {
 7     // 批處理器名稱
 8     this.id = id;
 9     // 最大緩衝數:10000
10     this.maxBufferSize = maxBufferSize;
11     // 每批最大數量:250
12     this.maxBatchingSize = maxBatchingSize;
13     // 最大延遲時間:500 ms
14     this.maxBatchingDelay = maxBatchingDelay;
15     // 時間調整器
16     // congestionRetryDelayMs 阻塞重試延遲時間,1000ms
17     // networkFailureRetryMs 網路異常重試時間,100ms
18     this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
19 
20     // 接收者後臺處理執行緒
21     ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
22     this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
23     this.acceptorThread.setDaemon(true);
24     this.acceptorThread.start();
25 
26     // 監控統計相關
27     final double[] percentiles = {50.0, 95.0, 99.0, 99.5};
28     final StatsConfig statsConfig = new StatsConfig.Builder()
29             .withSampleSize(1000)
30             .withPercentiles(percentiles)
31             .withPublishStdDev(true)
32             .build();
33     final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
34     this.batchSizeMetric = new StatsTimer(config, statsConfig);
35     try {
36         Monitors.registerObject(id, this);
37     } catch (Throwable e) {
38         logger.warn("Cannot register servo monitor for this object", e);
39     }
40 }

然後看看 AcceptorExecutor 的屬性,它定義了幾個佇列以及容器來處理批量任務,我們先知道有這些東西,後面再來看看都怎麼使用的。

然後可以看到 AcceptorExecutor 大量使用了併發包下的一些類,以及佇列的特性,這裡我們需要了解下這些類的特性:

  • LinkedBlockingQueue:基於連結串列的單端阻塞佇列,就是隊尾入隊,隊首出隊
  • Deque:雙端佇列,就是隊首、隊尾都可以入隊、出隊
  • Semaphore:訊號量,需要通過 acquire(或 tryAcquire) 獲取到許可證之後才可以進入臨界區,通過 release 釋放許可證。只要能拿到許可證,Semaphore 是可以允許多個執行緒進入臨界區的。另外注意它這裡設定的許可證數量是0,說明要先呼叫了 release 放入一個許可證,才有可能呼叫 acquire 獲取到許可證。
 1 // 接收任務的佇列
 2 private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
 3 // 重試任務的佇列
 4 private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
 5 // 後臺接收者執行緒
 6 private final Thread acceptorThread;
 7 // 待處理任務容器
 8 private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
 9 // 處理中的佇列
10 private final Deque<ID> processingOrder = new LinkedList<>();
11 
12 // 單項佇列請求的訊號量
13 private final Semaphore singleItemWorkRequests = new Semaphore(0);
14 // 單項任務佇列
15 private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
16 
17 // 批量佇列請求的訊號量
18 private final Semaphore batchWorkRequests = new Semaphore(0);
19 // 批量任務佇列
20 private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
21 // 時間調整器
22 private final TrafficShaper trafficShaper;

TaskDispatcher 呼叫 acceptorExecutor.process 將任務轉給 AcceptorExecutor,可以看到就是將任務新增到接收者佇列 acceptorQueue 的隊尾了。

1 void process(ID id, T task, long expiryTime) {
2     acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
3     acceptedTasks++;
4 }

4、接收者任務 AcceptorRunner

任務新增到 acceptorQueue 了,那任務在哪處理的呢?這就是在 AcceptorRunner 這個任務裡去處理的了,這個任務比較複雜,我先把整個程式碼放出來,再來分析。

  1 class AcceptorRunner implements Runnable {
  2     @Override
  3     public void run() {
  4         long scheduleTime = 0;
  5         while (!isShutdown.get()) {
  6             try {
  7                 // 排出輸入佇列的任務:將 reprocessQueue、acceptorQueue 佇列的任務轉移到 pendingTasks
  8                 drainInputQueues();
  9 
 10                 // 待處理的數量
 11                 int totalItems = processingOrder.size();
 12 
 13                 long now = System.currentTimeMillis();
 14                 if (scheduleTime < now) {
 15                     // 時間補償,正常情況下 transmissionDelay() 返回 0
 16                     scheduleTime = now + trafficShaper.transmissionDelay();
 17                 }
 18                 if (scheduleTime <= now) {
 19                     // 分配批量工作任務:將 pendingTasks 的任務分一批到(最多250個) batchWorkQueue 佇列中
 20                     assignBatchWork();
 21                     // 分配單項工作任務:pendingTasks 如果還有剩餘任務,將沒有過期的轉移到 singleItemWorkQueue 佇列中
 22                     assignSingleItemWork();
 23                 }
 24 
 25                 // If no worker is requesting data or there is a delay injected by the traffic shaper,
 26                 // sleep for some time to avoid tight loop.
 27                 if (totalItems == processingOrder.size()) {
 28                     Thread.sleep(10);
 29                 }
 30             } catch (InterruptedException ex) {
 31                 // Ignore
 32             } catch (Throwable e) {
 33                 // Safe-guard, so we never exit this loop in an uncontrolled way.
 34                 logger.warn("Discovery AcceptorThread error", e);
 35             }
 36         }
 37     }
 38 
 39     private boolean isFull() {
 40         // 待處理的任務 >= 10000,也就是說 pendingTasks 最多放 10000 個任務
 41         return pendingTasks.size() >= maxBufferSize;
 42     }
 43 
 44     private void drainInputQueues() throws InterruptedException {
 45         do {
 46             // 排出 reprocessQueue,將 reprocessQueue 佇列的任務轉移到 pendingTasks
 47             drainReprocessQueue();
 48             // 排出 acceptorQueue,將 acceptorQueue 佇列的任務轉移到 pendingTasks
 49             drainAcceptorQueue();
 50 
 51             if (isShutdown.get()) {
 52                 break;
 53             }
 54             // If all queues are empty, block for a while on the acceptor queue
 55             if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
 56                 // 等待任務放入 acceptorQueue,等待 10 毫秒
 57                 TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
 58                 if (taskHolder != null) {
 59                     // 放入之後 acceptorQueue、pendingTasks 就不為空了
 60                     appendTaskHolder(taskHolder);
 61                 }
 62             }
 63             // pendingTasks 為空、acceptorQueue 不為空、reprocessQueue不為空時,就會一直迴圈
 64             // 如果所有任務都處理完了,reprocessQueue、acceptorQueue、pendingTasks 都是空的,
 65             // 這時就會迴圈等待任務進入 acceptorQueue,每次等待 10 毫秒
 66         } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
 67     }
 68 
 69     private void drainAcceptorQueue() {
 70         while (!acceptorQueue.isEmpty()) {
 71             // 將 acceptorQueue 的任務轉移到 pendingTasks
 72             appendTaskHolder(acceptorQueue.poll());
 73         }
 74     }
 75 
 76     private void drainReprocessQueue() {
 77         long now = System.currentTimeMillis();
 78         while (!reprocessQueue.isEmpty() && !isFull()) {
 79             // 從 reprocessQueue 隊尾取出任務
 80             TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
 81             ID id = taskHolder.getId();
 82             if (taskHolder.getExpiryTime() <= now) {
 83                 // 任務過期
 84                 expiredTasks++;
 85             } else if (pendingTasks.containsKey(id)) {
 86                 // pendingTasks 已存在
 87                 overriddenTasks++;
 88             } else {
 89                 // 將 reprocessQueue 佇列的任務放到 pendingTasks
 90                 pendingTasks.put(id, taskHolder);
 91                 // 新增到 processingOrder 佇列的頭部,reprocessQueue 是失敗重試的佇列,所以優先順序高一些
 92                 processingOrder.addFirst(id);
 93             }
 94         }
 95         if (isFull()) {
 96             queueOverflows += reprocessQueue.size();
 97             // pendingTasks 滿了,就清空 reprocessQueue
 98             reprocessQueue.clear();
 99         }
100     }
101 
102     private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
103         if (isFull()) {
104             // pendingTasks 滿了就移除一個元素
105             pendingTasks.remove(processingOrder.poll());
106             queueOverflows++;
107         }
108         // 將 acceptorQueue 裡的任務放到 pendingTasks
109         TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
110         if (previousTask == null) {
111             // 原本不存在,將任務ID新增到 processingOrder 佇列的最後
112             processingOrder.add(taskHolder.getId());
113         } else {
114             // 已經存在了,就是覆蓋
115             overriddenTasks++;
116         }
117     }
118 
119     void assignSingleItemWork() {
120         if (!processingOrder.isEmpty()) {
121             if (singleItemWorkRequests.tryAcquire(1)) {
122                 long now = System.currentTimeMillis();
123                 while (!processingOrder.isEmpty()) {
124                     ID id = processingOrder.poll();
125                     TaskHolder<ID, T> holder = pendingTasks.remove(id);
126                     if (holder.getExpiryTime() > now) {
127                         // 將 pendingTasks 的任務移到 singleItemWorkQueue
128                         singleItemWorkQueue.add(holder);
129                         return;
130                     }
131                     expiredTasks++;
132                 }
133                 singleItemWorkRequests.release();
134             }
135         }
136     }
137 
138     void assignBatchWork() {
139         // 有足夠的任務做一個批處理
140         if (hasEnoughTasksForNextBatch()) {
141             if (batchWorkRequests.tryAcquire(1)) {
142                 long now = System.currentTimeMillis();
143                 // 一批任務最多 250 個
144                 int len = Math.min(maxBatchingSize, processingOrder.size());
145                 List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
146                 // 將 pendingTasks 中的任務移動一批到 holders 中
147                 // 也就是說,如果佇列中有500個任務,這一批任務最多也是250個
148                 while (holders.size() < len && !processingOrder.isEmpty()) {
149                     ID id = processingOrder.poll();
150                     TaskHolder<ID, T> holder = pendingTasks.remove(id);
151                     if (holder.getExpiryTime() > now) {
152                         holders.add(holder);
153                     } else {
154                         expiredTasks++;
155                     }
156                 }
157                 if (holders.isEmpty()) {
158                     batchWorkRequests.release();
159                 } else {
160                     batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
161                     // 新增到批量佇列中
162                     batchWorkQueue.add(holders);
163                 }
164             }
165         }
166     }
167 
168     // 是否有足夠的任務做一個批處理
169     private boolean hasEnoughTasksForNextBatch() {
170         if (processingOrder.isEmpty()) {
171             return false;
172         }
173         if (pendingTasks.size() >= maxBufferSize) {
174             return true;
175         }
176 
177         // 從 processingOrder 隊首取一個任務ID,然後從 pendingTasks 讀取這個任務。注意 peek() 只是取出元素,並不會移除隊首的元素
178         TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
179         // 判斷任務提交到現在的時間差是否超過最大批任務延遲時間(500毫秒)
180         long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
181         return delay >= maxBatchingDelay;
182     }
183 }
View Code

先看它的 run 方法:

① 佇列中的任務轉移到待處理容器中

drainInputQueues 將輸入佇列(reprocessQueue、acceptorQueue)的任務轉移到 pendingTasks 這個待處理容器中。

先是 drainReprocessQueue 將重處理佇列 reprocessQueue 中的任務轉移到 pendingTasks:

  • 如果 pendingTasks 已滿(超過10000),就直接清空了 reprocessQueue。任務丟棄會不會有影響呢?
  • 否則,如果 reprocessQueue 非空,就從 reprocessQueue 隊尾一個個取出來:
    • 如果過期了就丟掉這個任務,說明已經超過續約週期了(90秒)。比如例項註冊,如果多次同步失敗後,然後就直接丟棄,那不是其它 server 永遠無法知道註冊的這個例項?後面再分析這個問題。
    • 如果 pendingTasks 已經存在了,也丟棄這個重試任務
    • 否則就新增到 pendingTasks 中,並且往 processingOrder 的頭部添加了任務ID
    • 注意它這裡是從 reprocessQueue 隊尾一個個取出,放入 processingOrder 頭部,最終任務在 processingOrder 中的順序跟 reprocessQueue 是一樣的

然後是 drainAcceptorQueue 將接收者佇列 acceptorQueue 中的任務轉移到 pendingTasks:

  • 只要 acceptorQueue 非空,就從隊首取出任務
  • 如果 pendingTasks 已滿,則從 processingOrder 隊首取出第一個任務的ID,並從 pendingTasks 中移除這個任務
  • 否則就將任務新增到 pendingTasks,如果之前不存在相同ID的任務,就將任務ID新增到 processingOrder 隊尾
  • 注意它這裡是從 acceptorQueue 隊首取出任務,放到 processingOrder 隊尾,最終任務在 processingOrder 中的順序跟 acceptorQueue 是一樣的

從這段任務轉移以及後面的使用來看,processingOrder 將決定任務的處理順序,最前面的將最先處理,也說明了 reprocessQueue 的優先順序比 acceptorQueue 更高。而 pendingTasks 是一個 key-value 的佇列,便於快速通過ID讀取任務。

 1 private void drainAcceptorQueue() {
 2     while (!acceptorQueue.isEmpty()) {
 3         // 將 acceptorQueue 的任務轉移到 pendingTasks
 4         appendTaskHolder(acceptorQueue.poll());
 5     }
 6 }
 7 
 8 private void drainReprocessQueue() {
 9     long now = System.currentTimeMillis();
10     while (!reprocessQueue.isEmpty() && !isFull()) {
11         // 從 reprocessQueue 隊尾取出任務
12         TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
13         ID id = taskHolder.getId();
14         if (taskHolder.getExpiryTime() <= now) {
15             // 任務過期
16             expiredTasks++;
17         } else if (pendingTasks.containsKey(id)) {
18             // pendingTasks 已存在
19             overriddenTasks++;
20         } else {
21             // 將 reprocessQueue 佇列的任務放到 pendingTasks
22             pendingTasks.put(id, taskHolder);
23             // 新增到 processingOrder 佇列的頭部,reprocessQueue 是失敗重試的佇列,所以優先順序高一些
24             processingOrder.addFirst(id);
25         }
26     }
27     if (isFull()) {
28         queueOverflows += reprocessQueue.size();
29         // pendingTasks 滿了,就清空 reprocessQueue
30         reprocessQueue.clear();
31     }
32 }
33 
34 private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
35     if (isFull()) {
36         // pendingTasks 滿了就移除一個元素
37         pendingTasks.remove(processingOrder.poll());
38         queueOverflows++;
39     }
40     // 將 acceptorQueue 裡的任務放到 pendingTasks
41     TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
42     if (previousTask == null) {
43         // 原本不存在,將任務ID新增到 processingOrder 佇列的最後
44         processingOrder.add(taskHolder.getId());
45     } else {
46         // 已經存在了,就是覆蓋
47         overriddenTasks++;
48     }
49 }

② 接下來通過 trafficShaper 獲取了一個補償時間,它主要是在發生阻塞或網路異常導致任務提交失敗後,在任務排程週期內做一個時間補償,這塊等分析到提交任務失敗的時候再回來看看。

1 long now = System.currentTimeMillis();
2 if (scheduleTime < now) {
3     // 時間補償,正常情況下 transmissionDelay() 返回 0
4     scheduleTime = now + trafficShaper.transmissionDelay();
5 }

③ 任務打包

接著看 assignBatchWork ,它就是將任務打包成一個批次:

  • 首先呼叫 hasEnoughTasksForNextBatch 判斷是否有足夠的任務來打成一個批次,注意它判斷了最新提交的任務的時間是否超過了延遲時間 maxBatchingDelay(500ms),也就是說批次任務每隔500毫秒執行一次。
  • 能夠打包後,要獲取 batchWorkRequests 訊號量的一個許可證,因為許可證預設數量是 0,那一定是先有地方呼叫了 batchWorkRequests.release() 放入許可證,否則這裡就不會打包了。
  • 然後可以看出,一個批次的任務數量最多是250個
  • 它從 processingOrder 的隊首取出這個批次的任務ID,並從 pendingTasks 中取出任務,如果是過期的任務就直接丟棄了。
  • 然後如果這個批次並沒有任務,他才呼叫 batchWorkRequests.release() 釋放了許可證,否則就把這個批次任務新增到批量工作佇列 batchWorkQueue 中,注意並沒有釋放許可證。
 1 void assignBatchWork() {
 2     // 有足夠的任務做一個批處理
 3     if (hasEnoughTasksForNextBatch()) {
 4         // 獲取許可證
 5         if (batchWorkRequests.tryAcquire(1)) {
 6             long now = System.currentTimeMillis();
 7             // 一批任務最多 250 個
 8             int len = Math.min(maxBatchingSize, processingOrder.size());
 9             List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
10             // 將 pendingTasks 中的任務移動一批到 holders 中
11             // 也就是說,如果佇列中有500個任務,這一批任務最多也是250個
12             while (holders.size() < len && !processingOrder.isEmpty()) {
13                 ID id = processingOrder.poll();
14                 TaskHolder<ID, T> holder = pendingTasks.remove(id);
15                 if (holder.getExpiryTime() > now) {
16                     holders.add(holder);
17                 } else {
18                     expiredTasks++;
19                 }
20             }
21             if (holders.isEmpty()) {
22                 batchWorkRequests.release();
23             } else {
24                 batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
25                 // 新增到批量佇列中
26                 batchWorkQueue.add(holders);
27             }
28         }
29     }
30 }
31 
32 // 是否有足夠的任務做一個批處理
33 private boolean hasEnoughTasksForNextBatch() {
34     if (processingOrder.isEmpty()) {
35         return false;
36     }
37     if (pendingTasks.size() >= maxBufferSize) {
38         return true;
39     }
40 
41     // 從 processingOrder 隊首取一個任務ID,然後從 pendingTasks 讀取這個任務。注意 peek() 只是取出元素,並不會移除隊首的元素
42     TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
43     // 判斷任務提交到現在的時間差是否超過最大批任務延遲時間(500毫秒)
44     long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
45     return delay >= maxBatchingDelay;
46 }

接著看分配單項任務的方法 assignSingleItemWork:

  • 如果 processingOrder 非空且獲取到了 singleItemWorkRequests 訊號量的許可證,就將 processingOrder 佇列剩餘的任務都取出來,放入單項工作佇列 singleItemWorkQueue 中
  • 也就是前面已經打了一批任務(250個)之後,processingOrder 中還有任務,就全部取出來放到 singleItemWorkQueue 佇列中
 1 void assignSingleItemWork() {
 2     if (!processingOrder.isEmpty()) {
 3         if (singleItemWorkRequests.tryAcquire(1)) {
 4             long now = System.currentTimeMillis();
 5             while (!processingOrder.isEmpty()) {
 6                 ID id = processingOrder.poll();
 7                 TaskHolder<ID, T> holder = pendingTasks.remove(id);
 8                 if (holder.getExpiryTime() > now) {
 9                     // 將 pendingTasks 的任務移到 singleItemWorkQueue
10                     singleItemWorkQueue.add(holder);
11                     return;
12                 }
13                 expiredTasks++;
14             }
15             singleItemWorkRequests.release();
16         }
17     }
18 }

5、任務處理器 TaskExecutors

batchWorkQueue 中的批量任務以及 singleItemWorkQueue 中的單項任務都已經準備好了,那是在哪裡傳送到叢集節點的呢,那就是任務執行器 TaskExecutors 了。

① 建立 TaskExecutors

從建立 TaskExecutors 的方法中可以看出:

  • 批量處理任務的類是 BatchWorkerRunnable,它主要就是處理批量任務佇列 batchWorkQueue 中的任務
  • 處理單項任務的類是 SingleTaskWorkerRunnable,它主要就是處理單項任務佇列 singleItemWorkQueue 中的任務
  • TaskExecutors 建立了一個執行緒池,batchExecutors 預設有20個工作執行緒(不太理解他為什麼不用JDK現成的執行緒池。。),singleItemExecutors 預設只有一個工作執行緒。
 1 static <ID, T> TaskExecutors<ID, T> singleItemExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) {
 2     final AtomicBoolean isShutdown = new AtomicBoolean();
 3     final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
 4     registeredMonitors.put(name, metrics);
 5     // workerCount = 1
 6     return new TaskExecutors<>(idx -> new SingleTaskWorkerRunnable<>("TaskNonBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
 7 }
 8 
 9 ////////////////////////////////////////////////
10 
11 static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) {
12     final AtomicBoolean isShutdown = new AtomicBoolean();
13     final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
14     registeredMonitors.put(name, metrics);
15     // BatchWorkerRunnable 批量任務處理
16     return new TaskExecutors<>(idx -> new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
17 }
18 
19 ////////////////////////////////////////////////
20 
21 private final List<Thread> workerThreads;
22 
23 TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
24     this.isShutdown = isShutdown;
25     // 工作執行緒集合
26     this.workerThreads = new ArrayList<>();
27 
28     // 建立20個執行緒,相當於是搞了一個執行緒池
29     ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
30     for (int i = 0; i < workerCount; i++) {
31         WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
32         Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
33         workerThreads.add(workerThread);
34         workerThread.setDaemon(true);
35         workerThread.start();
36     }
37 }

② BatchWorkerRunnable

看批量處理的任務:

  • 首先 getWork 獲取批量任務,它呼叫 taskDispatcher.requestWorkItems(),實際就是返回了 taskDispatcher 的 batchWorkQueue,並且呼叫 batchWorkRequests.release() 往訊號量放入一個許可證,這樣前面 AcceptorRunner 就可以得到許可證然後去打包批量任務了
  • 如果 batchWorkQueue 中沒有批量任務,可以看到是一直在 while 迴圈等待的,直到拿到一個批量任務。它這個 BatchWorkerRunnable 任務和前面的 AcceptorRunner 任務,感覺通過訊號量的方式就形成了一個等待通知的機制,BatchWorkerRunnable 放入一個許可證,讓 AcceptorRunner 拿到這個許可證去打個批次的任務過來。
  • 拿到這個批次任務後,就呼叫 processor(ReplicationTaskProcessor)來處理任務。
  • 如果任務處理結果是 Congestion(阻塞)、TransientError(傳輸失敗)就要重處理,呼叫了 taskDispatcher.reprocess 將這個批次的任務提交到重處理佇列 reprocessQueue 中。
 1 static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
 2 
 3     BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
 4         super(workerName, isShutdown, metrics, processor, acceptorExecutor);
 5     }
 6 
 7     @Override
 8     public void run() {
 9         try {
10             while (!isShutdown.get()) {
11                 // 獲取一個批量任務
12                 List<TaskHolder<ID, T>> holders = getWork();
13                 metrics.registerExpiryTimes(holders);
14                 // TaskHolder 提取 ReplicationTask
15                 List<T> tasks = getTasksOf(holders);
16                 // processor => 任務複製處理器 ReplicationTaskProcessor
17                 ProcessingResult result = processor.process(tasks);
18                 switch (result) {
19                     case Success:
20                         break;
21                     case Congestion:
22                     case TransientError:
23                         // 阻塞或網路失敗就重新處理這批任務
24                         taskDispatcher.reprocess(holders, result);
25                         break;
26                     case PermanentError:
27                         logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
28                 }
29                 metrics.registerTaskResult(result, tasks.size());
30             }
31         } catch (InterruptedException e) {
32             // Ignore
33         } catch (Throwable e) {
34             // Safe-guard, so we never exit this loop in an uncontrolled way.
35             logger.warn("Discovery WorkerThread error", e);
36         }
37     }
38 
39     private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
40         // 獲取批量佇列 batchWorkQueue
41         BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
42         List<TaskHolder<ID, T>> result;
43         do {
44             result = workQueue.poll(1, TimeUnit.SECONDS);
45             // 迴圈等待,直到取到一個批量任務
46         } while (!isShutdown.get() && result == null);
47         return (result == null) ? new ArrayList<>() : result;
48     }
49 
50     private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
51         List<T> tasks = new ArrayList<>(holders.size());
52         for (TaskHolder<ID, T> holder : holders) {
53             tasks.add(holder.getTask());
54         }
55         return tasks;
56     }
57 }
1 BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
2     singleItemWorkRequests.release();
3     return singleItemWorkQueue;
4 }
5 
6 BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
7     batchWorkRequests.release();
8     return batchWorkQueue;
9 }

③ 任務重處理

可以看到處理失敗後,就是將這批任務新增到重處理佇列 reprocessQueue 中去,然後向時間調整期註冊失敗,這就和前面 AcceptorRunner 處理 reprocessQueue 對應起來了。

1 void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
2     // 新增到重處理佇列 reprocessQueue
3     reprocessQueue.addAll(holders);
4     replayedTasks += holders.size();
5     // 時間調整器註冊失敗
6     trafficShaper.registerFailure(processingResult);
7 }

④ TrafficShaper 

還記得前面 AcceptorRunner 中又這樣一段程式碼,可以看到是通過 trafficShaper 計算了一個延遲時間,這裡就來看看是如何計算的。

 1 long now = System.currentTimeMillis();
 2 if (scheduleTime < now) {
 3     // 時間補償,正常情況下 transmissionDelay() 返回 0
 4     scheduleTime = now + trafficShaper.transmissionDelay();
 5 }
 6 if (scheduleTime <= now) {
 7     // 分配批量工作任務:將 pendingTasks 的任務分一批到(最多250個) batchWorkQueue 佇列中
 8     assignBatchWork();
 9     // 分配單項工作任務:pendingTasks 如果還有剩餘任務,將沒有過期的轉移到 singleItemWorkQueue 佇列中
10     assignSingleItemWork();
11 }

時間調整器 TrafficShaper:

  • registerFailure 就是設定了失敗的最後時間
  • 然後看 transmissionDelay,以阻塞為例,如果上一次阻塞失敗到現在 500 毫秒,那麼 transmissionDelay 返回 500,那麼 transmissionDelay 就大於 now 了,就不會打包任務了。
  • 總結下來就是如果上一次阻塞導致批量任務提交失敗,就延遲1000毫秒後執行。如果上一次網路導致批量任務提交失敗,就延遲100毫秒執行。
 1 TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) {
 2     // 1000
 3     this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs);
 4     // 100
 5     this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs);
 6 }
 7 
 8 void registerFailure(ProcessingResult processingResult) {
 9     if (processingResult == ProcessingResult.Congestion) {
10         // 最後一次阻塞導致提交批處理失敗的時間
11         lastCongestionError = System.currentTimeMillis();
12     } else if (processingResult == ProcessingResult.TransientError) {
13         // 最後一次網路原因導致提交批處理失敗的時間
14         lastNetworkFailure = System.currentTimeMillis();
15     }
16 }
17 
18 // 計算傳輸延遲的時間
19 long transmissionDelay() {
20     if (lastCongestionError == -1 && lastNetworkFailure == -1) {
21         return 0;
22     }
23 
24     long now = System.currentTimeMillis();
25     if (lastCongestionError != -1) {
26         // 阻塞延遲時間
27         long congestionDelay = now - lastCongestionError;
28         if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
29             return congestionRetryDelayMs - congestionDelay;
30         }
31         lastCongestionError = -1;
32     }
33 
34     if (lastNetworkFailure != -1) {
35         // 網路延遲時間
36         long failureDelay = now - lastNetworkFailure;
37         if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
38             return networkFailureRetryMs - failureDelay;
39         }
40         lastNetworkFailure = -1;
41     }
42     return 0;
43 }

⑤ SingleTaskWorkerRunnable

單項任務處理跟批量任務處理的流程是類似的,只不過是一個個的傳送同步操作,處理失敗同樣也會放入重處理佇列中。

一個批量任務250個對於大部分場景來說其實不會觸發單項任務的處理,如果微服務叢集中有很多的例項,eureka 通過不斷的輪詢也能儘量使用批量處理,我覺得單項任務處理更像是對批量任務處理的一種補充。

6、複製任務處理器 ReplicationTaskProcessor

批量任務最終是提交到 ReplicationTaskProcessor 去處理的,可以看到,就是呼叫了 replicationClient 提交了批量任務,提交的介面是 POST peerreplication/batch,那我們就可以從這個入口去看 eureka-server 如何接收批量任務的。

 1 public ProcessingResult process(List<ReplicationTask> tasks) {
 2     // 任務封裝到 ReplicationList
 3     ReplicationList list = createReplicationListOf(tasks);
 4     try {
 5         // 提交批量任務:POST peerreplication/batch/
 6         EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
 7         int statusCode = response.getStatusCode();
 8         if (!isSuccess(statusCode)) {
 9             if (statusCode == 503) {
10                 logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
11                 return ProcessingResult.Congestion;
12             } else {
13                 // Unexpected error returned from the server. This should ideally never happen.
14                 logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
15                 return ProcessingResult.PermanentError;
16             }
17         } else {
18             // 處理批量任務結果
19             handleBatchResponse(tasks, response.getEntity().getResponseList());
20         }
21     } catch (Throwable e) {
22         if (maybeReadTimeOut(e)) {
23             //read timeout exception is more Congestion then TransientError, return Congestion for longer delay
24             return ProcessingResult.Congestion;
25         } else if (isNetworkConnectException(e)) {
26             logNetworkErrorSample(null, e);
27             return ProcessingResult.TransientError;
28         } else {
29             logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
30             return ProcessingResult.PermanentError;
31         }
32     }
33     return ProcessingResult.Success;
34 }

7、接收復制同步請求

很容易找到批量任務提交的介面在 PeerReplicationResource 的 batchReplication 方法中。

可以看到,其實遍歷批量任務,然後根據不同的操作型別,呼叫 XxxResource 介面進行對應的操作。比如註冊,就是呼叫 applicationResource.addInstance 完成例項的註冊。

  1 @Path("/{version}/peerreplication")
  2 @Produces({"application/xml", "application/json"})
  3 public class PeerReplicationResource {
  4 
  5     private static final Logger logger = LoggerFactory.getLogger(PeerReplicationResource.class);
  6 
  7     private static final String REPLICATION = "true";
  8 
  9     private final EurekaServerConfig serverConfig;
 10     private final PeerAwareInstanceRegistry registry;
 11 
 12     @Inject
 13     PeerReplicationResource(EurekaServerContext server) {
 14         this.serverConfig = server.getServerConfig();
 15         this.registry = server.getRegistry();
 16     }
 17 
 18     public PeerReplicationResource() {
 19         this(EurekaServerContextHolder.getInstance().getServerContext());
 20     }
 21 
 22     @Path("batch")
 23     @POST
 24     public Response batchReplication(ReplicationList replicationList) {
 25         try {
 26             ReplicationListResponse batchResponse = new ReplicationListResponse();
 27             for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
 28                 try {
 29                     // dispatch 分發任務
 30                     batchResponse.addResponse(dispatch(instanceInfo));
 31                 } catch (Exception e) {
 32                     batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
 33                     logger.error("{} request processing failed for batch item {}/{}",
 34                             instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
 35                 }
 36             }
 37             return Response.ok(batchResponse).build();
 38         } catch (Throwable e) {
 39             logger.error("Cannot execute batch Request", e);
 40             return Response.status(Status.INTERNAL_SERVER_ERROR).build();
 41         }
 42     }
 43 
 44     private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
 45         ApplicationResource applicationResource = createApplicationResource(instanceInfo);
 46         InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
 47 
 48         String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
 49         String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
 50         String instanceStatus = toString(instanceInfo.getStatus());
 51 
 52         Builder singleResponseBuilder = new Builder();
 53         // 根據不同的型別分別處理
 54         switch (instanceInfo.getAction()) {
 55             case Register:
 56                 singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
 57                 break;
 58             case Heartbeat:
 59                 singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
 60                 break;
 61             case Cancel:
 62                 singleResponseBuilder = handleCancel(resource);
 63                 break;
 64             case StatusUpdate:
 65                 singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
 66                 break;
 67             case DeleteStatusOverride:
 68                 singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
 69                 break;
 70         }
 71         return singleResponseBuilder.build();
 72     }
 73 
 74     /* Visible for testing */ ApplicationResource createApplicationResource(ReplicationInstance instanceInfo) {
 75         return new ApplicationResource(instanceInfo.getAppName(), serverConfig, registry);
 76     }
 77 
 78     /* Visible for testing */ InstanceResource createInstanceResource(ReplicationInstance instanceInfo,
 79                                                                       ApplicationResource applicationResource) {
 80         return new InstanceResource(applicationResource, instanceInfo.getId(), serverConfig, registry);
 81     }
 82 
 83     private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
 84         // addInstance
 85         applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
 86         return new Builder().setStatusCode(Status.OK.getStatusCode());
 87     }
 88 
 89     private static Builder handleCancel(InstanceResource resource) {
 90         // cancelLease
 91         Response response = resource.cancelLease(REPLICATION);
 92         return new Builder().setStatusCode(response.getStatus());
 93     }
 94 
 95     private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
 96         Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
 97         int responseStatus = response.getStatus();
 98         Builder responseBuilder = new Builder().setStatusCode(responseStatus);
 99 
100         if ("false".equals(config.getExperimental("bugfix.934"))) {
101             if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
102                 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
103             }
104         } else {
105             if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
106                     && response.getEntity() != null) {
107                 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
108             }
109         }
110         return responseBuilder;
111     }
112 
113     private static Builder handleStatusUpdate(ReplicationInstance instanceInfo, InstanceResource resource) {
114         Response response = resource.statusUpdate(instanceInfo.getStatus(), REPLICATION, toString(instanceInfo.getLastDirtyTimestamp()));
115         return new Builder().setStatusCode(response.getStatus());
116     }
117 
118     private static Builder handleDeleteStatusOverride(ReplicationInstance instanceInfo, InstanceResource resource) {
119         Response response = resource.deleteStatusUpdate(REPLICATION, instanceInfo.getStatus(),
120                 instanceInfo.getLastDirtyTimestamp().toString());
121         return new Builder().setStatusCode(response.getStatus());
122     }
123 
124     private static <T> String toString(T value) {
125         if (value == null) {
126             return