1. 程式人生 > >第三章:Spring Cloud Eureka(Spring cloud微服務實戰)下

第三章:Spring Cloud Eureka(Spring cloud微服務實戰)下

本章主要內容:

1.原始碼分析

2.配置詳解

原始碼分析

    我們從Eureka的客戶端看它如何完成通訊行為的。

    我們將一個普通的Spring Boot應用註冊到Eureka Server 或者是 從Eureka Server 中獲取服務列表時,主要做了兩個事情:

  •     在應用主類中配置了@EnableDiscoveryClient 註解
  •     在application.properties中用eureka.client.serviceUrl.defaultZone引數指定了服務註冊中心的位置。

我們來看一下@EnableDiscoveryClient 註解的原始碼

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

}

該註解主要是用來開啟DiscoveryClient的例項

org.springframework.cloud.client.discovery.DiscoveryClient是Spring Cloud 的介面,定義了用來發現服務的常用抽象方法,通過該介面可以有效地遮蔽服務治理的實現細節,所以使用Spring Cloud構建的微服務應用可以方便切換不同服務治理框架,不用改動程式程式碼,只需要新增一些針對服務治理框架的配置即可。

package org.springframework.cloud.client.discovery;

import java.util.List;
import org.springframework.cloud.client.ServiceInstance;

public interface DiscoveryClient
{

    public abstract String description();

    public abstract ServiceInstance getLocalServiceInstance();

    public abstract List getInstances(String s);

    public abstract List getServices();
}

org.springframework.cloud.netflix.eureka.EnableDiscoveryClient是對DiscoveryClient介面的實現,實現的是對Eureka發現服務的封裝。真正實現發現服務的是com.netflix.discovery.DiscoveryClient類

DiscoveryClient類主要用於幫助與Eureka Server互相協作。

Eureka Client 負責下面的任務:

  1. 向Eureka Server註冊服務例項
  2. 向Eureka Server 服務租約
  3. 當服務關閉期間,向Eureka Server 取消租約
  4. 查詢Eureka Server中的服務例項列表

Eureka Cient 還需要配置一個Eureka Server的URL列表

先分析一下Eureka Server的URL列表:

com.netflix.discovery.endpoint.EndpointUtils

    public static List getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone)
    {
        List orderedUrls = new ArrayList();
        String region = getRegion(clientConfig);
        String availZones[] = clientConfig.getAvailabilityZones(clientConfig.getRegion());
        if(availZones == null || availZones.length == 0)
        {
            availZones = new String[1];
            availZones[0] = "default";
        }
        logger.debug("The availability zone for the given region {} are {}", region, Arrays.toString(availZones));
        int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
        List serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
        if(serviceUrls != null)
            orderedUrls.addAll(serviceUrls);
        for(int currentOffset = myZoneOffset != availZones.length - 1 ? myZoneOffset + 1 : 0; currentOffset != myZoneOffset;)
        {
            serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
            if(serviceUrls != null)
                orderedUrls.addAll(serviceUrls);
            if(currentOffset == availZones.length - 1)
                currentOffset = 0;
            else
                currentOffset++;
        }

        if(orderedUrls.size() < 1)
            throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
        else
            return orderedUrls;
    }

從上面的函式中發現,客戶端一共載入了兩個內容,一個是Region,一個是Zone。

    public static String getRegion(EurekaClientConfig clientConfig)
    {
        String region = clientConfig.getRegion();
        if(region == null)
            region = "default";
        region = region.trim().toLowerCase();
        return region;
    }

getRegion函式從配置中讀取了一個Region返回,所以一個微服務應用只可以屬於一個Region。預設是default。通過eureka.client.region屬性定義region。

    public String[] getAvailabilityZones(String region)
    {
        String value = (String)availabilityZones.get(region);
        if(value == null)
            value = "defaultZone";
        return value.split(",");
    }

getAvailabilityZones函式,預設是defaultZone,可以看到Region與ZOne是一對多的關係,Zone可以設定多個,用逗號分隔。

在獲取了Region和Zone的資訊後,才開始真正載入Eureka Server的具體地址。根據傳入的引數按一定演算法載入位於哪一個Zone配置的serviceUrls。

int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
List serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);

看一下clientConfig.getEurekaServerServiceUrls的實現:

    public List getEurekaServerServiceUrls(String myZone)
    {
        String serviceUrls = (String)serviceUrl.get(myZone);
        if(serviceUrls == null || serviceUrls.isEmpty())
            serviceUrls = (String)serviceUrl.get("defaultZone");
        if(!StringUtils.isEmpty(serviceUrls))
        {
            String serviceUrlsSplit[] = StringUtils.commaDelimitedListToStringArray(serviceUrls);
            List eurekaServiceUrls = new ArrayList(serviceUrlsSplit.length);
            String as[] = serviceUrlsSplit;
            int i = as.length;
            for(int j = 0; j < i; j++)
            {
                String eurekaServiceUrl = as[j];
                if(!endsWithSlash(eurekaServiceUrl))
                    eurekaServiceUrl = (new StringBuilder()).append(eurekaServiceUrl).append("/").toString();
                eurekaServiceUrls.add(eurekaServiceUrl);
            }

            return eurekaServiceUrls;
        } else
        {
            return new ArrayList();
        }
    }

當在微服務應用中使用Ribbon實現服務呼叫時,對於Zone的設定可以在負載均衡時實現區域親和特性:Ribbon的預設策略會優先訪問客戶端處於同一個Zone的服務端例項,只有當同一個Zone中沒有可用服務端例項的時候才會訪問其他Zone中的例項。

服務註冊

接著看DiscoveryClient如何實現服務註冊的:

    private void initScheduledTasks()
    {
        if(clientConfig.shouldFetchRegistry())
        {
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()), registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        if(clientConfig.shouldRegisterWithEureka())
        {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info((new StringBuilder()).append("Starting heartbeat executor: renew interval is: ").append(renewalIntervalInSecs).toString());
            scheduler.schedule(new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()), renewalIntervalInSecs, TimeUnit.SECONDS);
            instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
            statusChangeListener = new com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener() {

                public String getId()
                {
                    return "statusChangeListener";
                }

                public void notify(StatusChangeEvent statusChangeEvent)
                {
                    if(com.netflix.appinfo.InstanceInfo.InstanceStatus.DOWN == statusChangeEvent.getStatus() || com.netflix.appinfo.InstanceInfo.InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus())
                        DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
                    else
                        DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
                    instanceInfoReplicator.onDemandUpdate();
                }

                final DiscoveryClient this$0;

            
            {
                this.this$0 = DiscoveryClient.this;
                super();
            }
            };
            if(clientConfig.shouldOnDemandUpdateStatusChange())
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else
        {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

可以看到在if(clientConfig.shouldRegisterWithEureka())裡有一個InstanceInfoReplicator的例項,它會執行一個定時任務,該類的run()函式如下:

    public void run()
    {
        discoveryClient.refreshInstanceInfo();
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if(dirtyTimestamp != null)
        {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp.longValue());
        }
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
        break MISSING_BLOCK_LABEL_140;
        Throwable t;
        t;
        logger.warn("There was a problem with the instance info replicator", t);
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
        break MISSING_BLOCK_LABEL_140;
        Exception exception;
        exception;
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
        throw exception;
    }

真正觸發呼叫註冊的地方就在discoveryClient.register();內容如下:

    boolean register()
        throws Throwable
    {
        logger.info((new StringBuilder()).append("DiscoveryClient_").append(appPathIdentifier).append(": registering service...").toString());
        EurekaHttpResponse httpResponse;
        try
        {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        }
        catch(Exception e)
        {
            logger.warn("{} - registration failed {}", new Object[] {
                (new StringBuilder()).append("DiscoveryClient_").append(appPathIdentifier).toString(), e.getMessage(), e
            });
            throw e;
        }
        if(logger.isInfoEnabled())
            logger.info("{} - registration status: {}", (new StringBuilder()).append("DiscoveryClient_").append(appPathIdentifier).toString(), Integer.valueOf(httpResponse.getStatusCode()));
        return httpResponse.getStatusCode() == 204;
    }

註冊操作是通過REST請求的方式進行的。同時可以看到發起註冊請求的時候,傳入了一個com.netflix.appinfo.InstanceInfo物件,該物件就是註冊時客戶端給服務端的元資料。

服務獲取與服務續約

DiscoveryClient的initScheduledTasks函式中,還有兩個定時任務,分別是服務獲取和服務續約:

cacheRefresh和heartbeat

服務註冊中心處理

Eureka Server對於各類REST請求的定義都位於com.netflix.eureka.resources包下

配置詳解

Eureka客戶端的配置主要分為兩個方面:

1.服務註冊相關的配置資訊,包括服務註冊中心的地址、服務獲取的間隔時間、可用區域等。

2.服務例項相關的配置資訊,包括服務例項的名稱、IP地址、埠號、健康檢查路徑等。

服務註冊類配置

指定註冊中心

    通過eureka.client.serviceUrl引數實現。它的配置值儲存在HashMap中,並且設定有一組預設值,預設值的key為defaultZone、Value為http://localhost:8761/eureka/

    通常配置為:eureka.client.serviceUrl.defaultZone=http://localhost:1111/eureka/

    為了服務註冊中心的安全考慮,可以加入安全校驗,配置如下:

    http://<username>:<password>@localhost:1111/eureka/

    username為安全校驗資訊的使用者名稱,password為該使用者的密碼。

其他配置

EurekaClientConfigBean定義了常用的配置引數,這些引數都以eureka.client為字首

引數名說明預設值
enabled啟用Enable客戶端true
registryFetchIntervalSeconds從Eureka伺服器獲取註冊資訊的間隔時間,單位是秒30
instanceInfoReplicationIntervalSeconds更新例項資訊的變化到Eureka伺服器的間隔時間,單位是秒30
initialInstanceInfoReplicationIntervalSeconds初始化例項資訊到Eureka服務端的間隔時間,單位是秒40
eurekaServiceUrlPollIntervalSeconds輪詢Eureka服務端地址更改的間隔時間,單位是秒300
eurekaServerReadTimeoutSeconds讀取Eureka Server資訊的超時時間,單位是秒   8
eurekaServerConnectTimeoutSeconds連線Eureka Server的超時時間,單位是秒5
eurekaServerTotalConnections從Eureka客戶端到所有Eureka服務端主機的連線總數200
eurekaServerTotalConnectionsPerHost從Eureka 客戶端到每個Eureka服務端主機的連線總數50
eurekaConnectionIdleTimeoutSecondsEureka服務端連結的空閒關閉時間,單位是秒30
heartbeatExecutorThreadPoolSize心跳連線池的初始化執行緒數2
heartbeatExecutorExponentialBackOffBound心跳超時重試延遲時間的最大乘數值10
cacheRefreshExecutorThreadPoolSize重新整理快取執行緒池的初始化執行緒數2
cacheRefreshExecutorExponentialBackOffBound快取重新整理重試延遲時間的最大乘數值10
useDnsForFetchingServiceUrls使用DNS來獲取Eureka服務端的serviceURLfalse
registerWithEureka是否要將自身的例項資訊註冊到Eureka服務端true
preferSameZoneEureka是否偏好使用處於相同Zone的Eureka服務端true
filterOnlyUpInstances獲取例項時是否過濾, 僅保留UP狀態的例項true
fetchRegistry是否從Eureka服務端獲取註冊資訊true

服務例項類配置

例項名配置:

預設使用的主機名,可以通過spring.application.name 或者spring.application.instance_id設定