1. 程式人生 > >【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析

【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析

前言

前情回顧

上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。

哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。

今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收穫了點東西。後面還會繼續feign、ribbon、hystrix的原始碼學習,依然文章連載的形式輸出。

本講目錄

本講主要是EurekaServer叢集模式的資料同步講解,主要目錄如下。

目錄如下:

  1. eureka server叢集機制
  2. 註冊、下線、續約的登錄檔同步機制
  3. 登錄檔同步三層佇列機制詳解

技術亮點:

  1. 3層佇列機制實現登錄檔的批量同步需求

說明

原創不易,如若轉載 請標明來源!

部落格地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫

原始碼分析

eureka server叢集機制

Eureka Server會在註冊、下線、續約的時候進行資料同步,將資訊同步到其他Eureka Server節點。

可以想象到的是,這裡肯定不會是實時同步的,往後繼續看登錄檔的同步機制吧。

註冊、下線、續約的登錄檔同步機制

我們以Eureka Client註冊為例,看看Eureka Server是如何同步給其他節點的。

PeerAwareInstanceRegistryImpl.java

:

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}
  1. 註冊完成後,呼叫replicateToPeers(),注意這裡面有一個引數isReplication,如果是true,代表是其他Eureka Server節點同步的,false則是EurekaClient註冊來的。
  2. replicateToPeers()中一段邏輯,如果isReplication為true則直接跳出,這裡意思是client註冊來的服務例項需要向其他節點擴散,如果不是則不需要去同步
  3. peerEurekaNodes.getPeerEurekaNodes()拿到所有的Eureka Server節點,迴圈遍歷去同步資料,呼叫replicateInstanceActionsToPeers()
  4. replicateInstanceActionsToPeers()方法中根據註冊、下線、續約等去處理不同邏輯

接下來就是真正執行同步邏輯的地方,這裡主要用了三層佇列對同步請求進行了batch操作,將請求打成一批批 然後向各個EurekaServer進行http請求。

登錄檔同步三層佇列機制詳解

到了這裡就是真正進入了同步的邏輯,這裡還是以上面註冊邏輯為主線,接著上述程式碼繼續往下跟:

PeerEurekaNode.java :

public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

這裡會執行batchingDispatcher.process() 方法,我們繼續點進去,然後會進入 TaskDispatchers.createBatchingTaskDispatcher() 方法,檢視其中的匿名內部類中的process()方法:

void process(ID id, T task, long expiryTime) {
        // 將請求都放入到acceptorQueue中
        acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
        acceptedTasks++;
    }

將需要同步的Task資料放入到acceptorQueue佇列中。
接著回到createBatchingTaskDispatcher()方法中,看下AcceptorExecutor,它的建構函式中會啟動一個後臺執行緒:

ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");

this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);

我們繼續跟AcceptorRunner.java:

class AcceptorRunner implements Runnable {
    @Override
    public void run() {
        long scheduleTime = 0;
        while (!isShutdown.get()) {
            try {
                // 處理acceptorQueue佇列中的資料
                drainInputQueues();

                int totalItems = processingOrder.size();

                long now = System.currentTimeMillis();
                if (scheduleTime < now) {
                    scheduleTime = now + trafficShaper.transmissionDelay();
                }
                if (scheduleTime <= now) {
                    // 將processingOrder拆分成一個個batch,然後進行操作
                    assignBatchWork();
                    assignSingleItemWork();
                }

                // If no worker is requesting data or there is a delay injected by the traffic shaper,
                // sleep for some time to avoid tight loop.
                if (totalItems == processingOrder.size()) {
                    Thread.sleep(10);
                }
            } catch (InterruptedException ex) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
                logger.warn("Discovery AcceptorThread error", e);
            }
        }
    }

    private void drainInputQueues() throws InterruptedException {
        do {
            drainAcceptorQueue();

            if (!isShutdown.get()) {
                // If all queues are empty, block for a while on the acceptor queue
                if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                    TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                    if (taskHolder != null) {
                        appendTaskHolder(taskHolder);
                    }
                }
            }
        } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
    }

    private void drainAcceptorQueue() {
        while (!acceptorQueue.isEmpty()) {
            // 將acceptor佇列中的資料放入到processingOrder佇列中去,方便後續拆分成batch
            appendTaskHolder(acceptorQueue.poll());
        }
    }

    private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
        if (isFull()) {
            pendingTasks.remove(processingOrder.poll());
            queueOverflows++;
        }
        TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
        if (previousTask == null) {
            processingOrder.add(taskHolder.getId());
        } else {
            overriddenTasks++;
        }
    }
            
}

認真跟這裡面的程式碼,可以看到這裡是將上面的acceptorQueue放入到processingOrder, 其中processingOrder也是一個佇列。

AcceptorRunner.javarun()方法中,還會呼叫assignBatchWork()方法,這裡面就是將processingOrder打成一個個batch,接著看程式碼:

void assignBatchWork() {
            if (hasEnoughTasksForNextBatch()) {
                if (batchWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    int len = Math.min(maxBatchingSize, processingOrder.size());
                    List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                    while (holders.size() < len && !processingOrder.isEmpty()) {
                        ID id = processingOrder.poll();
                        TaskHolder<ID, T> holder = pendingTasks.remove(id);
                        if (holder.getExpiryTime() > now) {
                            holders.add(holder);
                        } else {
                            expiredTasks++;
                        }
                    }
                    if (holders.isEmpty()) {
                        batchWorkRequests.release();
                    } else {
                        batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                        // 將批量資料放入到batchWorkQueue中
                        batchWorkQueue.add(holders);
                    }
                }
            }
        }

        private boolean hasEnoughTasksForNextBatch() {
            if (processingOrder.isEmpty()) {
                return false;
            }
            // 預設maxBufferSize為250
            if (pendingTasks.size() >= maxBufferSize) {
                return true;
            }

            TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
            // 預設maxBatchingDelay為500ms
            long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
            return delay >= maxBatchingDelay;
        }

這裡加入batch的規則是:maxBufferSize 預設為250
maxBatchingDelay 預設為500ms,打成一個個batch後就開始傳送給server端。至於怎麼傳送 我們接著看 PeerEurekaNode.java, 我們在最開始呼叫register()方法就是呼叫PeerEurekaNode.register(), 我們來看看它的構造方法:

PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                     HttpReplicationClient replicationClient, EurekaServerConfig config,
                                     int batchSize, long maxBatchingDelayMs,
                                     long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
    this.registry = registry;
    this.targetHost = targetHost;
    this.replicationClient = replicationClient;

    this.serviceUrl = serviceUrl;
    this.config = config;
    this.maxProcessingDelayMs = config.getMaxTimeForReplication();

    String batcherName = getBatcherName();
    ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
    this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
            batcherName,
            config.getMaxElementsInPeerReplicationPool(),
            batchSize,
            config.getMaxThreadsForPeerReplication(),
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
}

這裡會例項化一個ReplicationTaskProcessor.java, 我們跟進去,發下它是實現TaskProcessor的,所以一定會執行此類中的process()方法,執行方法如下:

public ProcessingResult process(List<ReplicationTask> tasks) {
    ReplicationList list = createReplicationListOf(tasks);
    try {
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
            if (statusCode == 503) {
                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                return ProcessingResult.Congestion;
            } else {
                // Unexpected error returned from the server. This should ideally never happen.
                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                return ProcessingResult.PermanentError;
            }
        } else {
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

這裡面是將List<ReplicationTask> tasks 通過submitBatchUpdate() 傳送給server端。
server端在PeerReplicationResource.batchReplication()去處理,實際上就是迴圈呼叫ApplicationResource.addInstance() 方法,又回到了最開始註冊的方法。

到此 EurekaServer同步的邏輯就結束了,這裡主要是三層佇列的資料結構很繞,通過一個batchList去批量同步資料的。

注意這裡還有一個很重要的點,就是Client註冊時呼叫addInstance()方法,這裡到了server端PeerAwareInstanceRegistryImpl會執行同步其他EurekaServer邏輯。

而EurekaServer同步註冊介面仍然會呼叫addInstance()方法,這裡難不成就死迴圈呼叫了?當然不是,addInstance()中也有個引數:isReplication, 在最後呼叫server端方法的時候如下:registry.register(info, "true".equals(isReplication));

我們知道,EurekaClient在註冊的時候isReplication傳遞為空,所以這裡為false,而Server端同步的時候呼叫:

PeerReplicationResource:

private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
        applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
        return new Builder().setStatusCode(Status.OK.getStatusCode());
    }

這裡的REPLICATION 為true

另外在AbstractJersey2EurekaHttpClient中傳送register請求的時候,有個addExtraHeaders()方法,如下圖:

如果是使用的Jersey2ReplicationClient傳送的,那麼header中的x-netflix-discovery-replication配置則為true,在後面執行註冊的addInstance()方法中會接收這個引數的:

總結

仍然一圖流,文中解析的內容都包含在這張圖中了:

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

相關推薦

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer叢集模式原始碼分析

前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取

前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之EurekaServer上下文EurekaClient建立

前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli

一起原始碼-服務Nexflix Eureka 原始碼在眼花繚亂的程式碼中,EurekaClient是如何註冊的?

前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka

一起原始碼-服務Nexflix Eureka 原始碼通過單元測試來Debug Eureka註冊過程

前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。

一起原始碼-服務Nexflix Eureka 原始碼EurekaClient登錄檔抓取 精妙設計分析

前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI

一起原始碼-服務Nexflix Eureka 原始碼服務續約原始碼分析

前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著

一起原始碼-服務Nexflix Eureka 原始碼服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?

前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer自我保護機制竟然有這麼多Bug?

前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下

一起原始碼-服務Nexflix Eureka 原始碼十三Eureka原始碼解讀完結撒花篇~!

前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的

一起原始碼-服務Ribbon 原始碼Ribbon概念理解及Demo除錯

前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類

一起原始碼-服務Ribbon 原始碼通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析

前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的

一起原始碼-服務Ribbon 原始碼Ribbon與Eureka整合原理分析

前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下

一起原始碼-服務Ribbon 原始碼進一步探究Ribbon的IRule和IPing

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Ribbon原始碼Ribbon原始碼解讀彙總篇~

前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F

一起原始碼-服務Feign 原始碼原始碼初探,通過Demo Debug Feign原始碼

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Feign 原始碼Feign動態代理構造過程

前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr

一起原始碼-服務Feign 原始碼Feign結合Ribbon實現負載均衡的原理分析

前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d

一起原始碼-服務Hystrix 原始碼Hystrix基礎原理與Demo搭建

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix非降級邏輯流程梳理

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr