1. 程式人生 > >【一起學原始碼-微服務】Ribbon 原始碼三:Ribbon與Eureka整合原理分析

【一起學原始碼-微服務】Ribbon 原始碼三:Ribbon與Eureka整合原理分析

前言

前情回顧

上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfigurationRibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。

本講目錄

這一講我們會進一步往下探究Ribbon和Eureka是如何結合的。

通過上一講ILoadBalancer 我們已經可以拿到一個服務所有的服務節點資訊了,這裡面是怎麼把服務的名稱轉化為對應的具體host請求資訊的呢?

通過這一講 我們來一探究竟

目錄如下:

  1. EurekaClientAutoConfiguration.getLoadBalancer()回顧
  2. 再次梳理Ribbon初始化過程
  3. ServerList實現類初始化過程
  4. getUpdatedListOfServers()獲取登錄檔列表分析
  5. ribbon如何更新自己儲存的登錄檔資訊?

說明

原創不易,如若轉載 請標明來源!

部落格地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫

原始碼閱讀

EurekaClientAutoConfiguration.getLoadBalancer()回顧

上一講我們已經深入的講解過getLoadBalancer() 方法的實現,每個serviceName都對應一個自己的SpringContext上下文資訊,然後通過ILoadBalancer.class

從上下文資訊中獲取預設的LoadBalancer:ZoneAwareLoadBalancer, 我們看下這個類的建構函式:

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                             IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                             ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

繼續跟父類DynamicServerListLoadBalancer的初始化方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
}

構造方法中有個restOfInit()方法,進去後又會有updateListOfServers() 方法,看方法名就知道這個肯定是和server登錄檔相關的,繼續往後看,servers = serverListImpl.getUpdatedListOfServers();,這裡直接呼叫getUpdatedListOfServers()就獲取到了所有的登錄檔資訊。

可以看到ServerList有四個實現類,這個到底是該呼叫哪個實現類的getUpdatedListOfServers()方法呢?接著往下看。

再次梳理Ribbon初始化過程

第二講我們已經見過Ribbon的初始化過程,並畫了圖整理,這裡針對於之前的圖再更新一下:

這裡主要是增加了RibbonEurekaAutoConfigurationEurekaRibbonClientConfiguration兩個配置類的初始化。

ServerList實現類初始化過程

上面已經梳理過 Ribbon初始化的過程,其中在EurekaRibbonClientConfiguration 會初始化RibbonServerList,程式碼如下:

@Configuration
    public class EurekaRibbonClientConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }
}

這裡實際的ServerList實際就是DiscoveryEnabledNIWSServerList,我們看下這個類:

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{

}

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {

}

所以可以看出來ServerList 實際就是在這裡進行初始化的,上面那個serverListImpl.getUpdatedListOfServers();即為呼叫DiscoveryEnabledNIWSServerList.getUpdatedListOfServers() 方法了,繼續往下看。

getUpdatedListOfServers()獲取登錄檔分析

直接看DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()原始碼:

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }

    EurekaClient eurekaClient = eurekaClientProvider.get();
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {

                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                        }

                        // copy is necessary since the InstanceInfo builder just uses the original reference,
                        // and we don't want to corrupt the global eureka copy of the object which may be
                        // used by other clients in our system
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }

                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
            }
        }
    }
    return serverList;
}

看到這裡程式碼就已經很明顯了,我們來解讀下這段程式碼:

  1. 通過eurekaClientProvider獲取對應EurekaClient
  2. 通過vipAdress(實際就是serviceName)獲取對應登錄檔集合資訊
  3. 將註冊資訊組裝成DiscoveryEnabledServer列表

再回到DynamicServerListLoadBalancer.updateListOfServers() 中,這裡獲取到對應的DiscoveryEnabledServer list後呼叫updateAllServerList()方法,一路跟蹤這裡最終會呼叫BaseLoadBalancer.setServersList()

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
            
    public void setServersList(List lsrv) {
        Lock writeLock = allServerLock.writeLock();
        logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
        
        ArrayList<Server> newServers = new ArrayList<Server>();
        writeLock.lock();
        try {
            ArrayList<Server> allServers = new ArrayList<Server>();
            for (Object server : lsrv) {
                if (server == null) {
                    continue;
                }

                if (server instanceof String) {
                    server = new Server((String) server);
                }

                if (server instanceof Server) {
                    logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                    allServers.add((Server) server);
                } else {
                    throw new IllegalArgumentException(
                            "Type String or Server expected, instead found:"
                                    + server.getClass());
                }

            }
            boolean listChanged = false;
            if (!allServerList.equals(allServers)) {
                listChanged = true;
                if (changeListeners != null && changeListeners.size() > 0) {
                   List<Server> oldList = ImmutableList.copyOf(allServerList);
                   List<Server> newList = ImmutableList.copyOf(allServers);                   
                   for (ServerListChangeListener l: changeListeners) {
                       try {
                           l.serverListChanged(oldList, newList);
                       } catch (Exception e) {
                           logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                       }
                   }
                }
            }
            if (isEnablePrimingConnections()) {
                for (Server server : allServers) {
                    if (!allServerList.contains(server)) {
                        server.setReadyToServe(false);
                        newServers.add((Server) server);
                    }
                }
                if (primeConnections != null) {
                    primeConnections.primeConnectionsAsync(newServers, this);
                }
            }
            // This will reset readyToServe flag to true on all servers
            // regardless whether
            // previous priming connections are success or not
            allServerList = allServers;
            if (canSkipPing()) {
                for (Server s : allServerList) {
                    s.setAlive(true);
                }
                upServerList = allServerList;
            } else if (listChanged) {
                forceQuickPing();
            }
        } finally {
            writeLock.unlock();
        }
    }
}

這個過程最後用一張圖總結為:

ribbon如何更新自己儲存的登錄檔資訊?

上面已經講了 Ribbon是如何通過serviceName拉取到登錄檔的,我們知道EurekaClient預設是30s拉取一次登錄檔資訊的,因為Ribbon要關聯登錄檔資訊,那麼Ribbon該如何更新自己儲存的登錄檔資訊呢?

繼續回到DynamicSeverListLoadBalancer.restOfInit()方法中:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {

    protected volatile ServerListUpdater serverListUpdater;

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }
}

重點檢視enableAndInitLearnNewServersFeature()方法,從名字我們就可以看出來這意思為啟用和初始化學習新服務的功能,這裡實際上就啟動serverListUpdater中的一個執行緒。

在最上面Ribbon初始化的過程中我們知道,在RibbonClientConfiguration中預設初始化的ServerListUpdaterPollingServreListUpdater,我們繼續跟這個類的start方法:

@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

這裡只要是執行updateAction.doUpdate();,然後後面啟動了一個排程任務,預設30s執行一次。

繼續往後跟doUpdate()方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
}

這裡又呼叫了之前通過serviceName獲取對應註冊服務列表的方法了。

總結到一張圖如下:

總結

本文主要是重新梳理了Ribbon的初始化過程,主要是幾個Configure初始化的過程,然後是Ribbon與Eureka的整合,這裡也涉及到了登錄檔的更新邏輯。

看到這裡真是被Spring的各種AutoConfigure繞暈了,哈哈,但是最後分析完 還是覺得挺清晰的,對於複雜的業務畫張流程圖還挺容易理解的。

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

相關推薦

一起原始碼-服務Ribbon 原始碼RibbonEureka整合原理分析

前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下

一起原始碼-服務Feign 原始碼Feign結合Ribbon實現負載均衡的原理分析

前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d

一起原始碼-服務Feign 原始碼原始碼初探,通過Demo Debug Feign原始碼

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Feign 原始碼Feign動態代理構造過程

前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr

一起原始碼-服務Hystrix 原始碼Hystrix基礎原理Demo搭建

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix非降級邏輯流程梳理

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix降級、熔斷等原理剖析

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講解了Hystrix在配合feign的過程中,一個正常的請求邏輯該怎樣處理,這裡涉及到執行緒池的建立、HystrixCommand的執行等邏輯。 如圖所示:

一起原始碼-服務Ribbon 原始碼Ribbon概念理解及Demo除錯

前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類

一起原始碼-服務Ribbon 原始碼通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析

前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的

一起原始碼-服務Ribbon 原始碼進一步探究Ribbon的IRule和IPing

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Ribbon原始碼Ribbon原始碼解讀彙總篇~

前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取

前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之EurekaServer上下文EurekaClient建立

前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli

一起原始碼-服務Nexflix Eureka 原始碼在眼花繚亂的程式碼中,EurekaClient是如何註冊的?

前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka

一起原始碼-服務Nexflix Eureka 原始碼通過單元測試來Debug Eureka註冊過程

前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。

一起原始碼-服務Nexflix Eureka 原始碼EurekaClient登錄檔抓取 精妙設計分析

前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI

一起原始碼-服務Nexflix Eureka 原始碼服務續約原始碼分析

前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著

一起原始碼-服務Nexflix Eureka 原始碼服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?

前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註

一起原始碼-服務Nexflix Eureka 原始碼十一EurekaServer自我保護機制竟然有這麼多Bug?

前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下

一起原始碼-服務Nexflix Eureka 原始碼十二EurekaServer叢集模式原始碼分析

前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收