前言介紹

瞭解到了SpringCloud,大家都應該知道註冊中心,而對於我們從過去到現在,SpringCloud中用的最多的註冊中心就是Eureka了,所以深入Eureka的原理和原始碼,接下來我們要進行講解下eureka的原始碼分析,由此應運而產生的本章節的內容。

基本原理

  1. Eureka Server提供服務註冊服務,各個節點啟動後,會在Eureka Server中進行註冊,這樣Eureka Server中的服務登錄檔中將會儲存所有可用服務節點的資訊,服務節點的資訊可以在介面中直觀的看到

  2. Eureka Client 是一個Java 客戶端,用於簡化與Eureka Server的互動,客戶端同時也具備一個內建的、使用輪詢負載演算法的負載均衡器

  3. 在應用啟動後,將會向Eureka Server傳送心跳(預設週期為30秒),如果Eureka Server在多個心跳週期(預設3個週期)沒有收到某個節點的心跳,Eureka Server 將會從服務登錄檔中把這個服務節點移除(預設90秒)

  4. Eureka Server之間將會通過複製的方式完成資料的同步;

  5. Eureka Client具有快取的機制,即使所有的Eureka Server都掛掉的話,客戶端依然可以利用快取中的資訊消費其它服務的API;

啟動流程分析

從EurekaServer 啟動的流程日誌入手分析:

2021-01-21 18:14:17.635  INFO 5288 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'environmentManager': registering with JMX server as MBean [org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager]
2021-01-21 18:14:17.650 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'restartEndpoint': registering with JMX server as MBean [org.springframework.cloud.context.restart:name=restartEndpoint,type=RestartEndpoint]
2021-01-21 18:14:17.661 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshScope': registering with JMX server as MBean [org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope]
2021-01-21 18:14:17.674 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'configurationPropertiesRebinder': registering with JMX server as MBean [org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=335b5620,type=ConfigurationPropertiesRebinder]
2021-01-21 18:14:17.683 INFO 5288 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshEndpoint': registering with JMX server as MBean [org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint]
2021-01-21 18:14:17.926 INFO 5288 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2021-01-21 18:14:17.927 INFO 5288 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application unknown with eureka with status UP
2021-01-21 18:14:17.927 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : Setting the eureka configuration..
2021-01-21 18:14:17.948 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : isAws returned false
2021-01-21 18:14:17.949 INFO 5288 --- [ Thread-10] o.s.c.n.e.server.EurekaServerBootstrap : Initialized server context
2021-01-21 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Got 1 instances from neighboring DS node
2021-01-21 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Renew threshold is: 1
2021-01-21 18:14:17.949 INFO 5288 --- [ Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl : Changing status to UP
2021-01-21 18:14:17.958 INFO 5288 --- [ Thread-10] e.s.EurekaServerInitializerConfiguration : Started Eureka Server
2021-01-21 18:14:18.019 INFO 5288 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http)
2021-01-21 18:14:18.020 INFO 5288 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761
2021-01-21 18:14:18.023 INFO 5288 --- [ main] c.s.cloud.EurekaServerApplication : Started EurekaServerApplication in 8.299 seconds (JVM running for 8.886)

Eureka微服務已啟動.

日誌列印“Setting the eureka configuration..”,eureka開始進行配置,說不定也許就是Eureka Server流程啟動的開始呢?我們抱著懷疑的心態進入這行日誌列印的EurekaServerBootstrap類去看看。

EurekaServerBootstrap類

看看,看這個類的名字,見名知意,應該就是 EurekaServer 的啟動類了:

protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");

initEurekaEnvironment方法

我們看到日誌在initEurekaEnvironment方法中被打印出來,然後我順著這個方法尋找該方法被呼叫的地方;

public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
contextInitialized

接著發現contextInitialized這個方法裡面呼叫了initEurekaEnvironment 方法,接著我們再往上層尋找被呼叫的地方;

EurekaServerInitializerConfiguration

接著我們看到 EurekaServerInitializerConfiguration 類中有個 start 方法,該方法建立了一個執行緒來後臺執行 EurekaServer 的初始化流程;

進入 EurekaServerInitializerConfiguration 方法,看看這個所謂的 EurekaServer 初始化配置做了哪些事情?

@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));//讀取相關的eureka的配置資訊
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig())); //釋出相關eureka server配置的事件操作
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
  • 看到 log.info("Started Eureka Server"); 這行程式碼,相信大家已經釋然了,這裡就是所謂的啟動了 EurekaServer 了,其實也就是 eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext) 初始化了一些我們未知的東西;

  • 當列印完啟動Eureka Server日誌後,呼叫了兩次 publish 方法,該方法最終呼叫的是this.applicationContext.publishEvent(event) 方法,目的是利用Spring中ApplicationContext對事件傳遞性質,事件釋出者(applicationContext)來發布事件(event),但是缺少的是監聽者,其實你仔細搜尋下程式碼,發現好像沒有地方對 EurekaServerStartedEvent、EurekaRegistryAvailableEvent 進行監聽。

  • 然後找到 EurekaServerStartedEvent 所在的目錄下,EurekaInstanceCanceledEvent、EurekaInstanceRegisteredEvent、EurekaInstanceRenewedEvent、EurekaRegistryAvailableEvent、EurekaServerStartedEvent 有這麼幾個事件的類,服務下線事件、服務註冊事件、服務續約事件、註冊中心啟動事件、Eureka Server啟動事件,這麼幾個事件都沒有被監聽。

  • 像這樣 @EventListener public void listen(EurekaInstanceCanceledEvent event) {下線邏輯 },新增EventListener 監聽註解,就可以在我們自己的程式碼邏輯中收到這個事件的回調了,所以想想SpringCloud還是挺機制的,提供回撥介面讓我們自己實現自己的業務邏輯,真心不錯;

  • 那麼反過來想想,為啥會無緣無故 start 方法就被呼叫了呢?那麼反向繼續向上找呼叫 start 方法的地方,結果找到了DefaultLifecycleProcessor類的doStart方法呼叫了 bean.start();

DefaultLifecycleProcessor

EurekaServerInitializerConfiguration.start 方法是如何被觸發的?

private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
// 打上斷點
Lifecycle bean = lifecycleBeans.remove(beanName);
if (bean != null && !this.equals(bean)) {
String[] dependenciesForBean = this.beanFactory.getDependenciesForBean(beanName);
for (String dependency : dependenciesForBean) {
doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() &&
(!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
if (logger.isDebugEnabled()) {
logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]");
}
try {
bean.start();
}
catch (Throwable ex) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
}
if (logger.isDebugEnabled()) {
logger.debug("Successfully started bean '" + beanName + "'");
}
}
}
}

看到在 bean.isRunning 等一系列狀態的判斷下才去呼叫 bean.start() 方法的,我們再往上尋找被呼叫地方;

public void start() {
// 打上斷點
if (this.members.isEmpty()) {
return;
}
if (logger.isInfoEnabled()) {
logger.info("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
for (LifecycleGroupMember member : this.members) {
if (this.lifecycleBeans.containsKey(member.name)) {
doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
}
}
}

該類是DefaultLifecycleProcessor中內部類LifecycleGroup的一個方法,再往上尋找呼叫方;

private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();
for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {
Lifecycle bean = entry.getValue();
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(entry.getKey(), bean);
}
}
if (phases.size() > 0) {
List<Integer> keys = new ArrayList<Integer>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
phases.get(key).start();
}
}
}
  • startBeans 屬於 DefaultLifecycleProcessor 類的一個私有方法,startBeans 方法第一行就是獲取 getLifecycleBeans() 生命週期Bean物件,由此可見似乎 Eureka Server 之所以會被啟動,是不是實現了某個介面或者重寫了某個方法,才會導致由於容易在初始化的過程中因呼叫某些特殊方法或者某些類才啟動的,因此我們回頭去看看 EurekaServerInitializerConfiguration 這個類;

  • 結果發現 EurekaServerInitializerConfiguration 這個類實現了 SmartLifecycle 這麼樣的一個介面,而 SmartLifecycle 介面又繼承了 Lifecycle 生命週期介面類,所以真想已經重見天日了,原來是實現了 Lifecycle 這樣的一個介面,然後實現了 start 方法,因此 Eureka Server 就這麼稀裡糊塗的就被莫名其妙的啟動起來了?

我們之前僅僅只是通過了日誌來逆向分析,但是我們是不是忘了我們本應該標誌是Eureka Server的這個註解了呢?沒錯,我們在分析的過程中

已經將 @EnableEurekaServer 這個註解遺忘了,那麼我們現在先回到這個註解類來看看;

EnableEurekaServer

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerConfiguration.class)
public @interface EnableEurekaServer {}

我們不難發現 EnableEurekaServer 類上有個 @Import 註解,引用了一個 class 檔案,由此我們進入觀察;

EurekaServerConfiguration 類看看,看名稱的話,理解的意思大概就是 Eureka Server 配置類;

  • 果不其然,這個類有很多 @Bean、@Configuration 註解過的方法,那是不是我們可以認為剛才 3.1~3.4 的推論是不是就是由於被例項化了這麼一個 Bean,然後就慢慢的呼叫到了 start 方法了呢?

  • 搜尋 “Bootstrap” 字樣,還真發現了有這麼一個方法;

@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
  • 既然有這麼一個 Bean,那麼是不是和剛開始順著日誌逆向分析也是有一定道理的,沒有這麼一個Bean的存在,那麼 DefaultLifecycleProcessor.startBeans 方法中 getLifecycleBeans 的這個也就沒那麼順暢被找到了呢?不過我的猜想是這樣的,本人沒有將原始碼下載下來,將 eurekaServerBootstrap 方法中的 @Bean 註解註釋掉試試,不過推理起來也八九不離十,這個疑問懸念就留給大家嘗試嘗試吧;

  • 既然找到了一個 @Bean 註解過的方法,那我們再找找其他的一些被註解過的方法,比如一些通用全域性用的類似詞眼,比如 Context,Bean,Init、Server 之類的;

@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
} @Bean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new PeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
} @Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
} @Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
  • DefaultEurekaServerContext.initialize 初始化了一些東西,現在還不知道幹啥用的,先放這裡,打上斷點;

  • PeerEurekaNodes.start 方法,又是一個 start 方法,但是該類沒有實現任何類,姑且先放這裡,打上斷點;

  • InstanceRegistry.register 方法,而且還有幾個呢,可能是客戶端註冊用的,也先放這裡,都打上斷點,或者將 這個類的所有方法都斷點上,斷點打完後發現有註冊的,有續約的,有登出的;

  • 打完這些斷點後,感覺沒有思路了,索性就斷點跑一把,看看有什麼新的發現點;

【分析一】:DefaultEurekaServerContext.initialize 方法被呼叫了,證實了剛才想法,EurekaServerConfiguration 不是白寫的,還是有它的作用的;

@PostConstruct
@Override
public void initialize() throws Exception {
logger.info("Initializing ...");
peerEurekaNodes.start();
registry.init(peerEurekaNodes);
logger.info("Initialized");
} 【分析二】:進入 initialize 方法中 peerEurekaNodes.start(); public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
} }
};
// 註釋:間隔 600000 毫秒,即 10分鐘 間隔執行一次服務叢集資料同步;
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
  • start 方法中會看到一個定時排程的任務,updatePeerEurekaNodes(resolvePeerUrls()); 間隔 600000 毫秒,即 10分鐘 間隔執行一次服務叢集資料同步;

  • 然後斷點放走放下走,進入 initialize 方法中 registry.init(peerEurekaNodes);

@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 註釋:初始化 Eureka Server 響應快取,預設快取時間為30s
initializedResponseCache();
// 註釋:定時任務,多久重置一下心跳閾值,900000 毫秒,即 15分鐘 的間隔時間,會重置心跳閾值
scheduleRenewalThresholdUpdateTask();
// 註釋:初始化遠端註冊
initRemoteRegionRegistry(); try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}

快取也配置好了,定時任務也配置好了,似乎應該沒啥了,那麼我們把斷點放開,看看下一步會走到哪裡?

EurekaServerInitializerConfiguration.start。

  • 先是 DefaultLifecycleProcessor.doStart 方法進斷點,然後才是 EurekaServerInitializerConfiguration.start 方法進斷點;

  • 再一次證明剛剛的逆向分析僅僅只是缺了個從頭EnableEurekaServer分析罷了,但是最終方法論分析思路還是對的,由於開始分析過這裡,然而我們就跳過,繼續放開斷點向後繼續看看;

InstanceRegistry.openForTraffic

  • 【這不就是我們剛才在 “步驟3.7之分析七” 打的斷點麼?看下堆疊資訊,正是 “步驟3.2之分析一” 中 initEurekaServerContext 方法中有這麼一句 this.registry.openForTraffic(this.applicationInfoManager, registryCount); 呼叫到了,因果輪迴,程式碼千變萬化,打上斷點還有有好處的,結果還是回到了開始日誌逆向分析的地方。

c進入 super.openForTraffic 方法;

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 註釋:每30秒續約一次,那麼每分鐘續約就是2次,所以才是 count * 2 的結果;
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got " + count + " instances from neighboring DS node");
logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 註釋:修改 Eureka Server 為上電狀態,就是說設定 Eureka Server 已經處於活躍狀態了,那就是意味著 EurekaServer 基本上說可以正常使用了;
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 註釋:定時任務,60000 毫秒,即 1分鐘 的間隔時間,Eureke Server定期進行失效節點的清理
super.postInit();
}
  • 這裡主要設定了服務狀態,以及開啟了定時清理失效節點的定時任務,每分鐘掃描一次;

繼續放開斷點,來到了日誌列印 “main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761” 的EurekaDiscoveryClientConfiguration 類中 onApplicationEvent 方法。

@EventListener(EmbeddedServletContainerInitializedEvent.class)
public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) {
// TODO: take SSL into account when Spring Boot 1.2 is available
int localPort = event.getEmbeddedServletContainer().getPort();
if (this.port.get() == 0) {
log.info("Updating port to " + localPort);
this.port.compareAndSet(0, localPort);
start();
}
}
  • 設定埠,當看到 Updating port to 8761 這樣的日誌打印出來的話,說明 Eureka Server 整個啟動也就差不多Over了。現在回頭看看,發現分析了不少的方法和流程,有種感覺被掏空的感覺了。

總結 EurekaServer 啟動時候大概幹了哪些事情?

1、初始化Eureka環境,Eureka上下文;

2、初始化EurekaServer的快取

3、啟動了一些定時任務,比如充值心跳閾值定時任務,清理失效節點定時任務;

4、更新EurekaServer上電狀態,更新EurekaServer埠;