1. 程式人生 > >深入理解Eureka Server叢集同步(十)

深入理解Eureka Server叢集同步(十)

叢集啟動同步

protected void initEurekaServerContext() throws Exception {
   
    // ....省略N多程式碼
    // 同步資訊
   int registryCount = this.registry.syncUp();
   // ....省略N多程式碼
} void initEurekaServerContext() throws Exception {
   
    // ....省略N多程式碼
    // 同步資訊
   int registryCount = this.registry.syncUp();
   // ....省略N多程式碼
}

網上很多文章說是呼叫syncUp這個方法去其他Eureka Server節點複製註冊資訊,這個說法不是很準確, 在這個地方,SyncUp()這個方法並不會去其他Eureka Server節點複製資訊,而是從本地記憶體裡面獲取註冊資訊, 看原始碼就知道了。


public int syncUp() {
    // Copy entire entry from neighboring DS node
    // 獲取到的註冊節點數量
    int count = 0;
    // 如果count==0 , 那麼預設重試5次(前提是開啟了register-with-eureka = true,否則為0)
  for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {       if (i > 0) {           try { // 從第二次開始,每次預設沉睡30秒               Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());           } catch (InterruptedException e) {               logger
.warn("Interrupted during registry transfer..");               break;           }       } // 從本地記憶體裡面獲取註冊例項資訊       Applications apps = eurekaClient.getApplications();       for (Application app : apps.getRegisteredApplications()) {           for (InstanceInfo instance : app.getInstances()) {               try { // 判斷是否可以註冊                   if (isRegisterable(instance)) { // 註冊到當前Eureka Server裡面                       register(instance, instance.getLeaseInfo().getDurationInSecs(), true);                       count++;                   }               } catch (Throwable t) {                   logger.error("During DS init copy", t);               }           }       }   }   return count; }
public int syncUp() {   // Copy entire entry from neighboring DS node // 獲取到的註冊節點數量   int count = 0; // 如果count==0 , 那麼預設重試5次(前提是開啟了register-with-eureka = true,否則為0)   for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {       if (i > 0) {           try { // 從第二次開始,每次預設沉睡30秒               Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());           } catch (InterruptedException e) {               logger.warn("Interrupted during registry transfer..");               break;           }       } // 從本地記憶體裡面獲取註冊例項資訊       Applications apps = eurekaClient.getApplications();       for (Application app : apps.getRegisteredApplications()) {           for (InstanceInfo instance : app.getInstances()) {               try { // 判斷是否可以註冊                   if (isRegisterable(instance)) { // 註冊到當前Eureka Server裡面                       register(instance, instance.getLeaseInfo().getDurationInSecs(), true);                       count++;                   }               } catch (Throwable t) {                   logger.error("During DS init copy", t);               }           }       }   }   return count; }

引數說明:

regirstrySyncRetries : 當eureka伺服器啟動時嘗試去獲取叢集裡其他伺服器上的註冊資訊的次數,預設為5,

只有當 eureka.client.register-with-eureka = true 的時候才會是5,如果是false ,則為0

registrySyncRetryWaitMs : 當eureka伺服器啟動時獲取其他伺服器的註冊資訊失敗時,會再次嘗試獲取,期間需要等待的時間,預設為30 * 1000毫秒

count : 獲取到的註冊例項數量,如果為0 則根據重試次數進行重試,每次重試前沉默 30秒

PS: 在之前的文章中 https://blog.csdn.net/u012394095/article/details/80882684 ,講過Eureka Client啟動的時候預設會自動從Eureka Server獲取註冊資訊, 要想Eureka Server在啟動的時候可以同步其他叢集節點的註冊資訊,那麼必須開啟客戶端配置


eureka.client.register-with-eureka = true    ## 是否作為一個Eureka Client 註冊到Eureka Server上去
eureka.client.fetch-registry = true              ## 是否需要從Eureka Server上拉取註冊資訊到本地。eureka.client.register-with-eureka = true    ## 是否作為一個Eureka Client 註冊到Eureka Server上去
eureka.client.fetch-registry = true              ## 是否需要從Eureka Server上拉取註冊資訊到本地。

只有開啟了上面兩個配置,那麼叢集節點在啟動的時候,會初始化Eureka Client端的配置 ,會從其他Eureka Server拉取註冊資訊到本地,同時

在初始化Eureka Server的時候,會從本地記憶體裡面讀取 註冊資訊,自動註冊到本身的服務上

叢集同步型別

public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
​
    private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
​
    public com.netflix.servo.monitor.Timer getTimer() {
        return this.timer;
    }
}
​
​
Heartbeat  心跳續約Register  註冊Cancel  下線StatusUpdate  新增覆蓋狀態DeleteStatusOverride  刪除覆蓋狀態 enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
​
    private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
​
    public com.netflix.servo.monitor.Timer getTimer() {
        return this.timer;
    }
}
​
​
Heartbeat  心跳續約Register  註冊Cancel  下線StatusUpdate  新增覆蓋狀態DeleteStatusOverride  刪除覆蓋狀態

發起同步

這裡以註冊的程式碼為例


@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 發起註冊
    super.register(info, leaseDuration, isReplication);
    // 註冊完成後,在這裡發起同步,同步型別為Register
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 判斷是否是叢集同步請求,如果是,則記錄最後一分鐘的同步次數
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 叢集節點為空,或者這是一個Eureka Server 同步請求,直接return
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 迴圈相鄰的Eureka Server Node, 分別發起請求同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 判斷是否是自身的URL,過濾掉
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 發起同步請求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 發起註冊
    super.register(info, leaseDuration, isReplication);
    // 註冊完成後,在這裡發起同步,同步型別為Register
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 判斷是否是叢集同步請求,如果是,則記錄最後一分鐘的同步次數
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 叢集節點為空,或者這是一個Eureka Server 同步請求,直接return
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 迴圈相鄰的Eureka Server Node, 分別發起請求同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 判斷是否是自身的URL,過濾掉
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 發起同步請求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

步驟說明:

1.判斷叢集節點是否為空,為空則返回

2.isReplication 代表是否是一個複製請求, isReplication = true 表示是其他Eureka Server發過來的同步請求

這個時候是不需要繼續往下同步的。否則會陷入同步死迴圈

3.迴圈叢集節點,過濾掉自身的節點

4.發起同步請求 ,呼叫replicateInstanceActionsToPeers

PS: 這裡提到了PeerEurekaNode , 對於PeerEurekaNodes的叢集節點更新及資料讀取,可以看這個

https://blog.csdn.net/u012394095/article/details/80693681在服務啟動的時候,對PeerEurekaNodes叢集開啟了執行緒更新叢集節點資訊。每15分鐘一次


private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel: // 下線
                node.cancel(appName, id);
                break;
            case Heartbeat:
                // 心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                // 獲取本地最新的例項資訊
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register: // 註冊
                node.register(info);
                break;
            case StatusUpdate:  // 設定覆蓋狀態
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride: //刪除覆蓋狀態
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel: // 下線
                node.cancel(appName, id);
                break;
            case Heartbeat:
                // 心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                // 獲取本地最新的例項資訊
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register: // 註冊
                node.register(info);
                break;
            case StatusUpdate:  // 設定覆蓋狀態
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride: //刪除覆蓋狀態
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

這裡直接看註冊,其他的原理上是一致的。

PeerEurekaNode的register方法如下。


public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 預設採用的是批處理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 預設採用的是批處理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

預設採用的是批量任務處理器,就是將task放入任務佇列中,然後通過執行緒獲取任務佇列裡面的任務,模仿ThreadExecutorPool的方式,生成執行緒,

從佇列裡面抓取任務處理,統一批量執行,Eureka Server 那邊也是統一接收,這樣提高了同步效率

批量處理的任務執行器是com.netflix.eureka.cluster.ReplicationTaskProcessor


@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
    // 構建ReplicationInstance放入ReplicationList 
    ReplicationList list = createReplicationListOf(tasks);
    try {
        // 發起批量處理請求
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
            if (statusCode == 503) {
                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                return ProcessingResult.Congestion;
            } else {
                // Unexpected error returned from the server. This should ideally never happen.
                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                return ProcessingResult.PermanentError;
            }
        } else {
            // 處理執行結果 ,成功則呼叫handleSuccess ,失敗則呼叫handleFailure。
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
    // 構建ReplicationInstance放入ReplicationList 
    ReplicationList list = createReplicationListOf(tasks);
    try {
        // 發起批量處理請求
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
            if (statusCode == 503) {
                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                return ProcessingResult.Congestion;
            } else {
                // Unexpected error returned from the server. This should ideally never happen.
                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                return ProcessingResult.PermanentError;
            }
        } else {
            // 處理執行結果 ,成功則呼叫handleSuccess ,失敗則呼叫handleFailure。
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

請求批量處理的介面地址 : peerreplication/batch/

handleBatchResponse(tasks, response.getEntity().getResponseList()) , 迴圈呼叫處理結果,

成功則呼叫handleSuccess. , 失敗則呼叫handleFailure , 比如hearbeat的時候,呼叫返回碼為

404的時候,會重新發起註冊。


ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
    @Override
    public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
        return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
    }
​
    @Override
    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
        super.handleFailure(statusCode, responseEntity);
        if (statusCode == 404) {
                // 重新發起註冊。
                register(info);
            }
        } else if (config.shouldSyncWhenTimestampDiffers()) {
            InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
            if (peerInstanceInfo != null) {
                syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
            }
        }
    }
};ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
    @Override
    public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
        return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
    }
​
    @Override
    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
        super.handleFailure(statusCode, responseEntity);
        if (statusCode == 404) {
                // 重新發起註冊。
                register(info);
            }
        } else if (config.shouldSyncWhenTimestampDiffers()) {
            InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
            if (peerInstanceInfo != null) {
                syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
            }
        }
    }
};

Eureka Server接收同步

程式入口 : com.netflix.eureka.resources.PeerReplicationResource


@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    try {
        ReplicationListResponse batchResponse = new ReplicationListResponse();
        // 迴圈請求的任務
        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
            try {
                // 分發任務,同時將處理結果收集起來,等會統一返回
                batchResponse.addResponse(dispatch(instanceInfo));
            } catch (Exception e) {
                batchResponse.addResponse(new ReplicationInstanceResponse(Status.IN