1. 程式人生 > >eureka原始碼解讀之服務端

eureka原始碼解讀之服務端

剖析eureka服務端啟動流程

服務端啟動類-入口處

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {
	
	 public static void main(String[] args) {
	        new SpringApplicationBuilder(EurekaServerApplication.class).properties("spring.config.name:regist").web(true).run(args);
	    }

}

從上面的程式碼可以看出,eureka服務端的啟動時依賴於@EnableEurekaServer這個註解的,這個註解幹了什麼事情呢?引入EurekaServerMarkerConfiguration配置,這個是一個標記性質的配置,主要是生成一個EurekaServerMarkerConfiguration.Marker物件,那麼為什麼要生成這個物件

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
//eureka服務端的初始化配置依賴於EurekaServerMarkerConfiguration.Marker物件的存在與否
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
	/**
	 * List of packages containing Jersey resources required by the Eureka server
	 */
	private static String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
			"com.netflix.eureka" };

	@Autowired
	private ApplicationInfoManager applicationInfoManager;

	@Autowired
	private EurekaServerConfig eurekaServerConfig;

	@Autowired
	private EurekaClientConfig eurekaClientConfig;
	....
}

從上面的程式碼可以看出當EurekaServerMarkerConfiguration.Marker物件存在時,才自動初始化eureka服務端


簡單看看eureka server初始化配置有哪些

/*包含 Eureka server對外提供的restful資源的Jersey resources 包列表*/
private static String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
			"com.netflix.eureka" };

/**所有以EurekaConstants.DEFAULT_PREFIX為字首的請求都轉到Jersey
	 * Register the Jersey filter
	 */
	@Bean
	public FilterRegistrationBean jerseyFilterRegistration(
			javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean bean = new FilterRegistrationBean();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

		return bean;
	}

	/**
	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
	 * required by the Eureka server.
	 */
	@Bean
	public javax.ws.rs.core.Application jerseyApplication(Environment environment,
			ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		//
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		// 掃描EUREKA_PACKAGES 包下的Jersey資源 包含StatusResource,ApplicationsResource,InstancesResource。。。
		Set<Class<?>> classes = new HashSet<Class<?>>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
						resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}

		// Construct the Jersey ResourceConfig
		//
		Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);

		return rc;
	}

上面的配置定義了eureka server 對外提供的服務埠,包含例項的註冊,例項的續約,例項的取消,已註冊應用列表的獲取,變更應用資訊的獲取,其他節點的資訊同步等介面,後面詳細分析


這個配置顯而易見,配置eureka server的web UI 介面,可控制是否啟用web 視覺化端點

	@Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}

例項化服務例項具體操作實現,封裝了例項的取消,註冊,續約等操作

	@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}

封裝eureka節點資訊,以及eureka節點資訊更新操作

    @Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new PeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}


初始化eureka server環境上下文資訊,引導eureka server啟動

    @Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}

	@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}

分析一下EurekaServerBootstrap初始化了哪些資訊

在EurekaServerInitializerConfiguration中可以看到實現了SmartLifecycle介面,由spring管理EurekaServer的生命週期

/**
 * @author Dave Syer
 */
@Configuration
@CommonsLog
public class EurekaServerInitializerConfiguration
		implements ServletContextAware, SmartLifecycle, Ordered {

	@Autowired
	private EurekaServerConfig eurekaServerConfig;

	private ServletContext servletContext;

	@Autowired
	private ApplicationContext applicationContext;

	@Autowired
	private EurekaServerBootstrap eurekaServerBootstrap;

	private boolean running;

	private int order = 1;

	@Override
	public void setServletContext(ServletContext servletContext) {
		this.servletContext = servletContext;
	}

	@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					//初始化上下文資訊
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");
                    //釋出初始化OK事件
					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}

	@Override
	public void stop() {
		//生命週期結束,處理後續善後問題
		this.running = false;
		eurekaServerBootstrap.contextDestroyed(this.servletContext);
	}
。。。

}

@see EurekaServerBootstrap.java

public void contextInitialized(ServletContext context) {
		try {
	      	//初始化環境資訊
			initEurekaEnvironment();
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

	protected void initEurekaEnvironment() throws Exception {
		log.info("Setting the eureka configuration..");

		/** 資料中心*/
		String dataCenter = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_DATACENTER);
		if (dataCenter == null) {
			log.info(
					"Eureka data center value eureka.datacenter is not set, defaulting to default");
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
		}
		/**設定環境 預設 test*/
		String environment = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_ENVIRONMENT);
		if (environment == null) {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
			log.info(
					"Eureka environment value eureka.environment is not set, defaulting to test");
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
		}
	}

	protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}

		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		//從臨近的eureka節點拷貝服務註冊資訊
		int registryCount = this.registry.syncUp();
		//初始化過期節點清除定時任務
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
		EurekaMonitors.registerAllStats();
	}

 @see PeerAwareInstanceRegistryImpl.java
 /** 從臨近的對等節點獲取註冊的服務資訊,並註冊到自己的登錄檔裡*/
 @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
        //根據設定的重試次數,獲取並設定登錄檔資訊,預設可重試5次,可見EurekaServerAutoConfiguration中的配置
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    //每次重試間隔30秒  預設
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            //這裡並不是傳送請求去獲取,從程式碼中可看到資料來自localRegionApps.get();那麼這個資料又是怎麼來的?
            Applications apps = eurekaClient.getApplications();
            //獲取到了註冊資料則註冊到自身登錄檔中
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

跟著上面發現的問題,繼續分析Applications apps = eurekaClient.getApplications();這裡的資料來自何處

通過檢視eureka server 服務端的pom.xml
可找到添加了以下依賴
<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-netflix-eureka-client</artifactId>
</dependency>

在EurekaClientAutoConfiguration中可以發現注入了CloudEurekaClient 這個bean,這個例項處理了很多細節,這個在閱讀客戶端原始碼時,會更具體分析
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
	return new CloudEurekaClient(manager, config, this.optionalArgs,
			this.context);
}

在CloudEurekaClient 繼承了DiscoveryClient,在DiscoveryClient的建構函式中可以看到如下程式碼片段

/*獲取eureka節點服務註冊例項資訊*/
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    fetchRegistryFromBackup();
}
/*這裡便會請求其他eureka 節點,獲取服務註冊資訊,所以前面那個地方才會有資料*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

上面這個問題搞清楚之後,繼續分析下一個問題,對於過期未續約服務例項節點的處理,定時清理任務是如何處理的

    @see PeerAwareInstanceRegistryImpl.java
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        //每分鐘期待接收到的續約數量 比如有17個註冊例項,那麼每分鐘期待的續約就是34個 ,30秒回續約一次,閾值就是 34 * 0.85 = 28 ,取整啦
        this.expectedNumberOfRenewsPerMin = count * 2;
        //每分鐘需要接收的續約請求閾值【期待續約的總量 * 配置的閾值百分比 預設 0.85】
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got " + count + " instances from neighboring DS node");
        logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        //aws暫時不管
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        //這裡便會初始化清理的定時任務
        super.postInit();
    }
    
   protected void postInit() {
        //啟用記錄最後一分鐘續約的數量,用於處理自我保護模式的開啟問題
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

開始看清理過期節點任務之前,先看一下怎麼記錄續約次數的

public class MeasuredRate {
    private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
    private final AtomicLong lastBucket = new AtomicLong(0);
    private final AtomicLong currentBucket = new AtomicLong(0);

    private final long sampleInterval;
    private final Timer timer;

    private volatile boolean isActive;

    /**
     * @param sampleInterval in milliseconds
     */
    public MeasuredRate(long sampleInterval) {
        this.sampleInterval = sampleInterval;
        this.timer = new Timer("Eureka-MeasureRateTimer", true);
        this.isActive = false;
    }

    public synchronized void start() {
        if (!isActive) {
            //啟用記錄最後一分鐘的續約數量任務 sampleInterval 可以從程式碼看到是 1分鐘
            timer.schedule(new TimerTask() {

                @Override
                public void run() {
                    try {
                        // 每分鐘都會從頭開始計算
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);

            isActive = true;
        }
    }

    public synchronized void stop() {
        if (isActive) {
            timer.cancel();
            isActive = false;
        }
    }

    /**
     * Returns the count in the last sample interval.
     */
    public long getCount() {
        return lastBucket.get();
    }

    /**
     * 每次續約都會自增1
     */
    public void increment() {
        currentBucket.incrementAndGet();
    }
}

通過查詢呼叫關係,可以看到increment()方法被呼叫的地方AbstractInstanceRegistry裡的renew方法,正是續約的方法

知曉了續約數量的記錄後再往下看過期節點清理任務

class EvictionTask extends TimerTask {

        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

        @Override
        public void run() {
            try {
                //獲取補償時間,比如gc消耗的時間,或者時鐘偏差
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }

        /**
         * compute a compensation time defined as the actual time this task was executed since the prev iteration,
         * vs the configured amount of time for execution. This is useful for cases where changes in time (due to
         * clock skew or gc for example) causes the actual eviction task to execute later than the desired time
         * according to the configured cycle.
         */
        long getCompensationTimeMs() {
            //當前時間
            long currNanos = getCurrentTimeNano();
            //最後一次執行時間
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }
            //當前時間與最後一次時間的時間差
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            //時間差與執行時間間隔的差值即為需要補償的時間
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }

        long getCurrentTimeNano() {  // for testing
            return System.nanoTime();
        }

    }
 /*實際的過期節點清理邏輯*/
 public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        /*
        *首先收集所有過期的例項集合,然後以隨機的順序清除,
        *如果不這麼做的話,當大批量清除時,還沒等到自我保護起作用,
        *可能就已經把所有應用都剔除掉啦,隨機化逐出,
        *將影響均勻分攤在了整個應用裡
        */
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    //找出過期的節點,最後一次續約時間加上過期時間再加上補償時間是否小於當前時間,小則過期啦
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
    
先看是否允許清理
@see PeerAwareInstanceRegistryImpl.java
@Override
public boolean isLeaseExpirationEnabled() {
    #如果自我保護模式不開啟,則直接允許清理過期節點
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    
    //如果保護模式開啟啦,則如果最後一分鐘續約的數量大於計算的閾值,則允許,否則不允許清除
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

@Override
public boolean isSelfPreservationModeEnabled() {
    return serverConfig.shouldEnableSelfPreservation();
}

從登錄檔移除過期節點
protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            //資料統計
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                //從登錄檔中移除
                leaseToCancel = gMap.remove(id);
            }
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                //這裡將狀態變更了的例項資訊新增到 recentlyChangedQueue 佇列,這個是幹嘛?什麼目的?
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                
                //無效responseCache快取,這快取幹嘛用的?
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

針對上面的兩個疑問,繼續看一下原始碼,分析一下為什麼

recentlyChangedQueue 儲存最近變更的例項資訊

由於例項狀態的變更相對來說,是不會頻繁變更的,所以如果客戶端要保持與eureka服務端的註冊資訊的同步,
需要通過心跳來實現,如果每次心跳都全量獲取例項資訊,叢集龐大的話,效率較低,存在大量重複的資訊,
浪費頻寬,所以通過增量獲取已變更的例項資訊會是一個更好的選擇,通過原始碼可追溯到eureka服務端提供了增量獲取變更的註冊例項資訊的端點

@see ApplicationsResource.java

/**
*獲取有關所有增量更改的資訊[註冊,取消,狀態更改和過期],一般對登錄檔的更改很少,因此只獲得 *delta比獲得完整的登錄檔更有效率,由於delta資訊在一段時間內被快取,因此請求
*可以在配置的快取重新整理時間內多次返回相同的資料
*
*/
@Path("delta")
@GET
public Response getContainerDifferential(
    @PathParam("version") String version,
    @HeaderParam(HEADER_ACCEPT) String acceptHeader,
    @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
    @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
    @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
//是否禁止增量獲取
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
    return Response.status(Status.FORBIDDEN).build();
}

String[] regions = null;
if (!isRemoteRegionRequested) {
    EurekaMonitors.GET_ALL_DELTA.increment();
} else {
    regions = regionsStr.toLowerCase().split(",");
    Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
    EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}

CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
    keyType = Key.KeyType.XML;
    returnMediaType = MediaType.APPLICATION_XML;
}

Key cacheKey = new Key(Key.EntityType.Application,
        ResponseCacheImpl.ALL_APPS_DELTA,
        keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
//從快取獲取增量資料 這裡就使用了responseCache
if (acceptEncoding != null
        && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    return Response.ok(responseCache.getGZIP(cacheKey))
            .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
            .header(HEADER_CONTENT_TYPE, returnMediaType)
            .build();
} else {
    return Response.ok(responseCache.get(cacheKey))
            .build();
}
}

看一下增量資訊的來源
@see AbstractInstanceRegistry.java
public Applications getApplicationDeltas() {
        GET_ALL_CACHE_MISS_DELTA.increment();
        Applications apps = new Applications();
        apps.setVersion(responseCache.getVersionDelta().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            write.lock();
            //終於看到了recentlyChangedQueue佇列,原來是用來處理增量更新的資訊同步的
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            logger.debug("The number of elements in the delta queue is :"
                    + this.recentlyChangedQueue.size());
            while (iter.hasNext()) {
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                Object[] args = {instanceInfo.getId(),
                        instanceInfo.getStatus().name(),
                        instanceInfo.getActionType().name()};
                logger.debug(
                        "The instance id %s is found with status %s and actiontype %s",
                        args);
                Application app = applicationInstancesMap.get(instanceInfo
                        .getAppName());
                if (app == null) {
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(decorateInstanceInfo(lease));
            }

            boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();

            if (!disableTransparentFallback) {
                Applications allAppsInLocalRegion = getApplications(false);

                for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                    Applications applications = remoteRegistry.getApplicationDeltas();
                    for (Application application : applications.getRegisteredApplications()) {
                        Application appInLocalRegistry =
                                allAppsInLocalRegion.getRegisteredApplications(application.getName());
                        if (appInLocalRegistry == null) {
                            apps.addApplication(application);
                        }
                    }
                }
            }

            Applications allApps = getApplications(!disableTransparentFallback);
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }

responseCache 這個比較簡單,直接看ResponseCacheImpl實現基本就明白了,裡面會有一個定時任務專門處理
只讀快取的更新,預設30s更新一次,如果關閉使用只讀快取,則每次拿的都是最新的

recentlyChangedQueue這個裡面的變更資訊資料多久會刪掉呢

 @see AbstractInstanceRegistry.java
 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
        this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

        this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
        
        //清理最近的變更資訊佇列裡過期的資料 30s執行一次
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    }
    
    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {
     
            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    //移除超過3分鐘的資料
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }

        };
    }

服務的註冊,續約下一章再寫