1. 程式人生 > >Eureka高可用之Eureka Server複製機制:PeerAwareInstanceRegistryImpl

Eureka高可用之Eureka Server複製機制:PeerAwareInstanceRegistryImpl

還是先提出幾個疑問,看本篇文章前最好看過Eureka高可用之Client重試機制:RetryableEurekaHttpClient,要知道Eureka Client只會向一個Server節點進行註冊(心跳、狀態改變等類似),註冊失敗時才會嘗試下一個server節點。當然正是由於這種機制,才會有Eureka Server的複製行為,個人認為,Eureka Client向每個Eureka Server都發送註冊、心跳等事件,會更好的保證一致性

1、如果有4個Eureka Server叢集節點,一個Client向其中一個Server進行註冊(或者心跳、狀態改變事件等),那麼這個server是怎樣通知剩餘3個server叢集節點的?

2、一個Eureka Server在收到其他server節點發送的複製資訊時,它是怎麼樣處理的,它會把收到的複製資訊繼續向其它節點複製嗎?

先看一下PeerAwareInstanceRegistryImpl的繼承類圖,它繼承了抽象的例項註冊,實現了複製例項註冊(能意識到臨節點的例項註冊),那麼它就具備註冊例項資訊,還能複製給臨節點的功能

直接去PeerAwareInstanceRegistryImpl裡面看程式碼,看看是如何將例項資訊複製給臨節點的(只列出register方法,heartBeat等類似),在收到client的註冊資訊時,isReplication為false,而當收到其他Eureka Server節點的複製資訊時,isReplication為true

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        //預設租約90s,如果使用者更改了心跳週期等,使用使用者自定義的租約
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //呼叫父類的註冊,註冊到本地雙層Map中
    super.register(info, leaseDuration, isReplication);
    //本地註冊完成後,向其他節點複製,注意isReplication這個屬性
    //用來判斷是client發來的註冊,還是其他Eureka Server臨節點複製過來的註冊
    //如果是複製過來的註冊資訊,那麼就不再向其他Eureka Server節點進行傳播複製
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

super.register(info, leaseDuration, isReplication)先不介紹,這個方法是把例項資訊儲存到自己的記憶體中,重點看replicateToPeers方法,在這個方法中,有個for迴圈遍歷了所有的Eureka Server臨節點,然後向他們複製這些資訊

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            //記錄每分鐘收到的複製次數
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            //如果沒有Eureka Server臨節點,或者是別的Eureka Server複製過來的資訊
            //那麼就不再向其他臨節點進行復制,
            //也就是說既然收到了複製過來的資訊,那麼其他eureka server節點也會收到
            //所以就沒必要再去傳送一遍複製了,return。
            return;
        }

        //遍歷所有的Eureka Server鄰節點,向它們複製register、cancel等資訊
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            //如果這個節點url是自己的,那麼不向自身複製
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

一個Eureka Server在收到了Client的註冊等資訊時,會挨個通知其他Eureka Server臨節點,複製的流程圖也就是下面這個樣子

那麼再來看看for迴圈裡面的replicateInstanceActionsToPeers方法,在迴圈裡面,根據請求型別action的不同,呼叫不同PeerEurekaNode的方法,重點還是這個異常處理。

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        //由於此方法是迴圈複製操作,如果發生異常不進行處理,直接丟擲,那麼就不會向後面的節點複製了
        //比如有10個Eureka Server節點,再向第2個複製的時候丟擲異常,後面8個節點就收不到複製資訊
        //這個地方,只是做log,並沒有丟擲異常
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}