1. 程式人生 > >Netflix Eureka原始碼分析(19)——eureka server叢集機制原始碼剖析:登錄檔同步以及高可用

Netflix Eureka原始碼分析(19)——eureka server叢集機制原始碼剖析:登錄檔同步以及高可用

(1)eureka core的BootStrap裡面,有一塊程式碼,是PeerEurekaNodes的程式碼,其實是在處理eureka server叢集資訊的初始化,會執行PeerEurekaNodes.start()方法

public class EurekaBootStrap implements ServletContextListener {

    protected void initEurekaServerContext() throws Exception {

        PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                applicationInfoManager
        );

        serverContext = new DefaultEurekaServerContext(
                eurekaServerConfig,
                serverCodecs,
                registry,
                peerEurekaNodes,
                applicationInfoManager
        );

        //將serverContext放在了一個holder中
        EurekaServerContextHolder.initialize(serverContext);

        //serverContext初始化
        serverContext.initialize();
    }
}
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {

    @PostConstruct
    @Override
    public void initialize() throws Exception {
        logger.info("Initializing ...");
        //定時更新eureka server叢集的資訊
        peerEurekaNodes.start();
        //eureka server的登錄檔資訊初始化
        registry.init(peerEurekaNodes);
        logger.info("Initialized");
    }
}

解析配置檔案中的其他eureka server的url地址,基於url地址構造一個一個的PeerEurekaNode,一個PeerEurekaNode就代表了一個eureka server。啟動一個後臺的執行緒,預設是每隔10分鐘,會執行一個任務,就是基於配置檔案中的url來重新整理eureka server列表。

@Singleton
public class PeerEurekaNodes {

    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            //基於配置檔案中的url來重新整理eureka server列表
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        //基於配置檔案中的url來重新整理eureka server列表
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }

    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }

        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);

        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }

        // Remove peers no long available
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }

        // Add new peers
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }

        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

}

(2)registry.syncUp()

就是說,當前這個eureka server會從任何一個其他的eureka server拉取登錄檔過來放在自己本地,作為初始的登錄檔。將自己作為一個eureka client,找任意一個eureka server來拉取登錄檔,將拉取到的登錄檔放到自己本地去。

        // Copy registry from neighboring eureka node
        //從相鄰的eureka server節點,拷貝過來登錄檔
        int registryCount = registry.syncUp();

eurekaClient.getApplications();//獲取所有當前註冊的所有服務例項

eureka server自己本身本來就是個eureka client,在初始化的時候,就會去找任意的一個eureka server拉取登錄檔到自己本地來,把這個登錄檔放到自己身上來,作為自己這個eureka server的登錄檔

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            //獲取所有當前註冊的所有服務例項
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            //把拉取的登錄檔服務例項全部註冊到自己身上,作為自己這個eureka server的登錄檔
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
}

(3)註冊、下線、故障、心跳

如何從一臺eureka server同步到另外一臺eureka server上去的

ApplicationResource的addInstance()方法,負責註冊,現在自己本地完成一個註冊,接著會replicateToPeers()方法,這個方法就會將這次註冊請求,同步到其他所有的eureka server上去。。。

如果是某臺eureka client來找eureka server進行註冊,isReplication是false,此時會給其他所有的你配置的eureka server都同步這個註冊請求,此時一定會基於jersey,呼叫其他所有的eureka server的restful介面,去執行這個服務例項的註冊的請求

eureka-core-jersey2的工程,ReplicationHttpClient,此時同步註冊請求給其他eureka server的時候,一定會將isReplication設定為true,這個東西可以確保說什麼呢,其他eureka server接到這個同步的請求,僅僅在自己本地執行,不會再次向其他的eureka server去進行註冊

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        //同步到其他所有的eureka server上去
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

    /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                //每傳送一次心跳,currentBucket就累加一次
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            //如果已經複製了,不要再次複製,因為這會建立一個糟糕的複製
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
}

總結:eureka server叢集機制 流程圖