1. 程式人生 > >深入理解Spring cloud原始碼篇之Eureka原始碼

深入理解Spring cloud原始碼篇之Eureka原始碼

1.eureka功能分析

       首先,eureka在springcloud中充當服務註冊功能,相當於dubbo+zk裡面得zk,但是比zk要簡單得多,zk可以做得東西太多了,包括分散式鎖,分散式佇列都是基於zk裡面得四種節點加watch機制通過長連線來實現得,但是eureka不一樣,eureka是基於HTTPrest來實現的,就是把服務的資訊放到一個ConcurrentHashMap中,然後服務啟動的時候去讀取這個map,來把所有服務關聯起來,然後伺服器之間呼叫的時候通過資訊,進行http呼叫。eureka包括兩部分,一部分就是服務提供者(對於eureka來說就是客戶端),一部分是服務端,客戶端需要每個讀取每個服務的資訊,然後註冊到服務端,很明顯了,這個服務端就是接受客戶端提供的自身的一些資訊。

2.eureka客戶端原始碼分析

       如果看spring的原始碼的話我們一般會找到Spring 原始碼包裡面的META-INF資料夾下面的spring.handlers檔案,然後直接找到XXXHandler的原始碼檔案,緊著著就會分析springxml裡面的各種標籤解析。在看cloud原始碼的時候,我們則是找到META-INF檔案下的spring.factories,找到裡面的類去分析功能。
       我們根據上面的描述首先找到eureka-client(1.4.0)包下面的spring.factories檔案中的EurekaClientAutoConfiguration配置類。我們知道一個eureka客戶端最重要的功能也就是四點:

  • 2.1讀取該專案的ip,instance_id,埠號,註冊到服務端
  • 2.2服務下架
  • 2.3心跳機制
  • 2.4獲取其他伺服器資訊

2.1服務註冊

       基於這個思想,我們先找到第一個配置就是在哪讀取的application.properties檔案,我們看到eurekaInstanceConfigBean()方法,就是讀取配置檔案到EurekaInstanceConfigBean物件中,並且有@bean註冊到ioc的容器中。EurekaInstanceConfigBean物件就包括客戶端的ip,instance_id,埠號等等資訊。我們看到以下程式碼是對EurekaInstanceConfigBean的一個包裝:

        @Bean
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {//上文說的eurekaInstanceConfigBean是EurekaInstanceConfig的實現類
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }

       接著就是服務註冊了:

        @Bean(destroyMethod = "shutdown")
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
            manager.getInfo(); // force initialization
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

       我們直接看到super裡面的方法,在initScheduledTasks();之上就是建立一些執行緒池,initScheduledTasks裡面開啟了一個執行緒heartbeat,我們看到了:

private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

       在renew方法裡,如果返回為404的話,則會呼叫register()方法去註冊,這個傳送心跳的時間間隔也可配置,在配置原始碼的定時器裡可以找到,跟讀原始碼的時候發現呼叫這個register方法除了renew還有InstanceInfoReplicator執行緒裡面的run方法,這個定時器的時間間隔是40秒,在服務啟動的時候也會去設定條件合適去執行定時器,這個定時器的作用就是當配置資訊改變的時候去呼叫register,當初次啟動的時候也會去呼叫一下,因為呼叫了refreshInstanceInfo(),所以isInstanceInfoDirty的值就變成了true,所以,初次註冊的時候也會註冊到這裡,之後除了特殊情況其他的的都不會走register().特殊情況包括:IP的改變,某些配置檔案引數的改變,從下面程式碼可以看出來:

 public void refreshDataCenterInfoIfRequired() {
        String existingAddress = instanceInfo.getHostName();

        String newAddress;
        if (config instanceof RefreshableInstanceConfig) {
            // Refresh data center info, and return up to date address
            newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
        } else {
            newAddress = config.getHostName(true);
        }
        String newIp = config.getIpAddress();

        if (newAddress != null && !newAddress.equals(existingAddress)) {
            logger.warn("The address changed from : {} => {}", existingAddress, newAddress);

            // :( in the legacy code here the builder is acting as a mutator.
            // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
            // We will most likely re-write the client at sometime so not fixing for now.
            InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
            builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
            instanceInfo.setIsDirty();//設定isInstanceInfoDirty為true,lastDirtyTimestamp為當前時間
        }
    }

    public void refreshLeaseInfoIfRequired() {
        LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
        if (leaseInfo == null) {
            return;
        }
        int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
        int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
        if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {//配置引數變了
            LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(currentLeaseRenewal)
                    .setDurationInSecs(currentLeaseDuration)
                    .build();
            instanceInfo.setLeaseInfo(newLeaseInfo);
            instanceInfo.setIsDirty();
        }
    }

       以上就是eureka客戶端的註冊。

2.2服務下架

       我們看EurekaClient介面,裡面有個shutdown,我們看到@PreDestroy當servlet關閉的時候就會觸發。

  if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            cancelScheduledTasks();//關閉心跳,服務替換,快取重新整理等定時器

            // If APPINFO was registered
            if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//設定狀態為down
                unregister();//通知服務端客戶端下線
            }

            logger.info("Completed shut down of DiscoveryClient");
        }

2.3心跳機制

       2.1我們分析了服務註冊,設計到了renew()當返回404的時候是服務註冊,200的時候就是傳送心跳的機制預設30秒傳送一次。

2.4服務獲取

       當eureka客戶端啟動的時候會註冊到eureka服務端上,其他客戶端也需要感知該eureka啟動,從而讀取配置資訊,服務之間的資訊獲取也是通過定時器獲取的,在initScheduledTasks();方法中,我們看到啟動了一個CacheRefreshThread執行緒,時間間隔預設為30秒,我們直接看該執行緒裡面的fetchRegistry(boolean forceFullRegistryFetch);方法,這裡有兩種拉取,一種是全量拉取,一種是增量拉取。全量拉取方法為getAndStoreFullRegistry()程式碼:

  private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())//rest請求伺服器獲得例項資訊
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));//存放到DiscoveryClient物件的localRegionApps的AtomicReference物件中
        } else {   
        }
    }

3.eureka服務端原始碼分析

        分析eureka客戶端功能的時候我們發現客戶端是通過httprest請求來註冊/拉取資訊的,那麼eureka服務端一定是一個類似spring MVC的專案結構。找到EurekaServerAutoConfiguration類,看到jerseyApplication()方法,在容器中存放了一個jerseyApplication物件,jerseyApplication()方法裡的東西和Spring原始碼裡掃描@Component邏輯類似,掃描@Path和@Provider標籤,然後封裝成beandefinition,封裝到Application的set容器裡。通過filter過濾器來過濾url進行對映到物件的Controller。

    @Bean
    public FilterRegistrationBean jerseyFilterRegistration(
            javax.ws.rs.core.Application eurekaJerseyApp) {
        FilterRegistrationBean bean = new FilterRegistrationBean();//核心是一個filter
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(Ordered.LOWEST_PRECEDENCE);
        bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));//攔截/eureka開頭的所有請求

        return bean;
    }

以上是對jersey的初步介紹,通過分析eureka客戶端,我們大概知道客戶端有這幾個功能

  • 服務接受請求(認識jersey)
  • 接受客戶端註冊/心跳/下架請求並處理
  • 服務剔除

以下是eureka服務端自身高可用層面的功能點

  • 自我保護
  • 服務之間的資訊同步

3.1服務怎麼接受請求

       上面介紹了jersey和eureka怎麼整合jersey,這裡就不多說。

3.2接受客戶端註冊/心跳/下架請求並處理

服務端接受客戶端的註冊

       在eurekawiki上https://github.com/Netflix/eureka/wiki/Eureka-REST-operations我們我們知道註冊到服務端是呼叫的POST /eureka/v2/apps/appID 介面,找到了ApplicationsResource類中呼叫了ApplicationResource的addInstance()方法,找到register()方法

@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;//預設有效時長90m
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);//例項註冊,下面具體看這個
        //同步到其他服務
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
 private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();//執行緒安全的一個服務例項map,name為cloud專案中的例項名字,巢狀裡面的map是以key為instanceId,Lease物件為value的一個map
 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//appname就為cloud配置裡的spring.application.name
                REGISTER.increment(isReplication);
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//如果第一個例項註冊進來的時候會給registryput進去一個空的lease
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//這個id就是instanceId

                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                //put了一個lease
                gMap.put(registrant.getId(), lease);

            } finally {
                read.unlock();
            }
        }

服務端接受客戶端的續約(心跳)

       介面在InstanceResource#renewLease()。服務續約其實就是維護例項狀態,更新一下最後更新時間,然後同步到其他服務端。直接看renew()方法

  public boolean renew(String appName, String id, boolean isReplication) {
            RENEW.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到例項對應的lease物件
            Lease<InstanceInfo> leaseToRenew = null;
            if (gMap != null) {
                leaseToRenew = gMap.get(id);//得到例項
            }
            if (leaseToRenew == null) {//error
                return false;
            } else {
                InstanceInfo instanceInfo = leaseToRenew.getHolder();
                if (instanceInfo != null) {
                    // touchASGCache(instanceInfo.getASGName());
                    InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                            instanceInfo, leaseToRenew, isReplication);
                    ...
                    if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                        Object[] args = {
                                instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId()
                        };
                        instanceInfo.setStatus(overriddenInstanceStatus);//修改例項狀態
                    }
                }
                renewsLastMin.increment();
                leaseToRenew.renew();//更新組後更新時間
                return true;
            }
        }

服務端接受客戶端要下架請求

       服務下架介面在InstanceResource#cancelLease()方法,直接看internalCancel()方法

protected boolean internalCancel(String appName, String id, boolean isReplication) {
            try {
                Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到所有例項
                Lease<InstanceInfo> leaseToCancel = null;
                if (gMap != null) {
                    leaseToCancel = gMap.remove(id);//從map中移除掉下架例項
                }
                synchronized (recentCanceledQueue) {
                    recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
                }
                InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
                if (instanceStatus != null) {
                    logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
                }
                if (leaseToCancel == null) {
                    return false;
                } else {
                    leaseToCancel.cancel();
                    InstanceInfo instanceInfo = leaseToCancel.getHolder();
                    String vip = null;
                    String svip = null;
                    if (instanceInfo != null) {
                        instanceInfo.setActionType(ActionType.DELETED);
                        recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                        instanceInfo.setLastUpdatedTimestamp();
                        vip = instanceInfo.getVIPAddress();
                        svip = instanceInfo.getSecureVipAddress();
                    }
                    invalidateCache(appName, vip, svip);
                    logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                    return true;
                }
            } finally {
            }
        }

3.3服務剔除

       當客戶端長時間(預設90秒)沒有給服務端傳送請求的時候,就說明客戶端down了,看過Spring原始碼得都明白,Spring原始碼比較重要得方法就在AbstractApplicationContext#refresh()方法,裡面從掃描了xml/java檔案到掃描註解,到進行DI到ioc容器然後再到銷燬bean,最後有一個finishRefresh();方法,這是Spring所有工作做完之後呼叫得方法,一直調到了DefaultLifecycleProcessor#onRefresh()下得#startBeans(true);下的#start();下的doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);下的start()方法。在這裡會呼叫到實現了Lifecycle介面的所有的start()方法,而在EurekaServerAutoConfiguration類中,我們看到import了一個實現了Lifecycle介面的EurekaServerInitializerConfiguration類,在start方法裡初始化了一個單獨的EurekaServerContext的上下文。在initEurekaServerContext()方法中,
執行了registry.openForTraffic(applicationInfoManager, registryCount);最後一句呼叫了AbstractInstanceRegistry#postInit()方法,在此方法裡開啟了一個每60秒呼叫一次EvictionTask#evict()的定時器。

  public void evict(long additionalLeaseMs) {
          if (!isLeaseExpirationEnabled()) {//如果開啟自我保護,則不自動剔除。預設開啟
            logger.debug("DS: lease expiration is currently disabled.");
            return;
            }

            List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
            for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                if (leaseMap != null) {
                    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                        Lease<InstanceInfo> lease = leaseEntry.getValue();
                        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                            expiredLeases.add(lease);//如果過期了,加入到expiredLeases的list中
                        }
                    }
                }
            }
            // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
            // triggering self-preservation. Without that we would wipe out full registry.
            int registrySize = (int) getLocalRegistrySize();
            int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
            int evictionLimit = registrySize - registrySizeThreshold;

            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < toEvict; i++) {
                    // Pick a random item (Knuth shuffle algorithm)
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = expiredLeases.get(i);

                    String appName = lease.getHolder().getAppName();
                    String id = lease.getHolder().getId();
                    EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    internalCancel(appName, id, false);//移除伺服器快取,同步其他伺服器
                }
            }
        }

3.4服務自我保護模式

       客戶端長時間不傳送續約(心跳),服務端預設每一分鐘會進行一次服務剔除,3.3裡又一個isLeaseExpirationEnabled()方法:

     /**
     * 期望 最大 每分鐘 續租 次數。 計算公式  當前註冊的應用例項數 x 2
     */
     protected volatile int expectedNumberOfRenewsPerMin ;
     /**
     * 期望 最小 每分鐘 續租 次數。 計算公式 expectedNumberOfRenewsPerMin * 續租百分比( eureka.renewalPercentThreshold )
     */
     protected volatile int numberOfRenewsPerMinThreshold ;
  @Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {//預設開啟自我保護,false則關閉自我保護
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;//每分鐘心跳數大於期望最小每分鐘續租次數代表這個例項還活著
    }

3.5服務之間資訊同步

       上面說到的服務註冊,服務剔除,服務續約等功能的時候在修改完本地業務之後會呼叫PeerAwareInstanceRegistryImpl#replicateToPeers()方法,同步到其他伺服器。

 private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {//
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
 /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

       Eureka二次傳播問題:當eureka A配置到B,B配置到C的時候,客戶端註冊到伺服器A,這個時候伺服器A,B會有客戶端資訊,C則沒有。程式碼分析結果如下:
這裡寫圖片描述
當客戶端註冊到服務端A的時候A上有客戶端資訊,這個時候會同步一遍B服務端,則,B同步到C的時候isReplication則為false,就不會同步過去了。