深入理解Spring cloud原始碼篇之Eureka原始碼
1.eureka功能分析
首先,eureka在springcloud中充當服務註冊功能,相當於dubbo+zk裡面得zk,但是比zk要簡單得多,zk可以做得東西太多了,包括分散式鎖,分散式佇列都是基於zk裡面得四種節點加watch機制通過長連線來實現得,但是eureka不一樣,eureka是基於HTTPrest來實現的,就是把服務的資訊放到一個ConcurrentHashMap中,然後服務啟動的時候去讀取這個map,來把所有服務關聯起來,然後伺服器之間呼叫的時候通過資訊,進行http呼叫。eureka包括兩部分,一部分就是服務提供者(對於eureka來說就是客戶端),一部分是服務端,客戶端需要每個讀取每個服務的資訊,然後註冊到服務端,很明顯了,這個服務端就是接受客戶端提供的自身的一些資訊。
2.eureka客戶端原始碼分析
如果看spring的原始碼的話我們一般會找到Spring 原始碼包裡面的META-INF資料夾下面的spring.handlers檔案,然後直接找到XXXHandler的原始碼檔案,緊著著就會分析springxml裡面的各種標籤解析。在看cloud原始碼的時候,我們則是找到META-INF檔案下的spring.factories,找到裡面的類去分析功能。
我們根據上面的描述首先找到eureka-client(1.4.0)包下面的spring.factories檔案中的EurekaClientAutoConfiguration配置類。我們知道一個eureka客戶端最重要的功能也就是四點:
- 2.1讀取該專案的ip,instance_id,埠號,註冊到服務端
- 2.2服務下架
- 2.3心跳機制
- 2.4獲取其他伺服器資訊
2.1服務註冊
基於這個思想,我們先找到第一個配置就是在哪讀取的application.properties檔案,我們看到eurekaInstanceConfigBean()方法,就是讀取配置檔案到EurekaInstanceConfigBean物件中,並且有@bean註冊到ioc的容器中。EurekaInstanceConfigBean物件就包括客戶端的ip,instance_id,埠號等等資訊。我們看到以下程式碼是對EurekaInstanceConfigBean的一個包裝:
@Bean
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {//上文說的eurekaInstanceConfigBean是EurekaInstanceConfig的實現類
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
接著就是服務註冊了:
@Bean(destroyMethod = "shutdown")
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
manager.getInfo(); // force initialization
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
我們直接看到super裡面的方法,在initScheduledTasks();之上就是建立一些執行緒池,initScheduledTasks裡面開啟了一個執行緒heartbeat,我們看到了:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
在renew方法裡,如果返回為404的話,則會呼叫register()方法去註冊,這個傳送心跳的時間間隔也可配置,在配置原始碼的定時器裡可以找到,跟讀原始碼的時候發現呼叫這個register方法除了renew還有InstanceInfoReplicator執行緒裡面的run方法,這個定時器的時間間隔是40秒,在服務啟動的時候也會去設定條件合適去執行定時器,這個定時器的作用就是當配置資訊改變的時候去呼叫register,當初次啟動的時候也會去呼叫一下,因為呼叫了refreshInstanceInfo(),所以isInstanceInfoDirty的值就變成了true,所以,初次註冊的時候也會註冊到這裡,之後除了特殊情況其他的的都不會走register().特殊情況包括:IP的改變,某些配置檔案引數的改變,從下面程式碼可以看出來:
public void refreshDataCenterInfoIfRequired() {
String existingAddress = instanceInfo.getHostName();
String newAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh data center info, and return up to date address
newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
} else {
newAddress = config.getHostName(true);
}
String newIp = config.getIpAddress();
if (newAddress != null && !newAddress.equals(existingAddress)) {
logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
// :( in the legacy code here the builder is acting as a mutator.
// This is hard to fix as this same instanceInfo instance is referenced elsewhere.
// We will most likely re-write the client at sometime so not fixing for now.
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
instanceInfo.setIsDirty();//設定isInstanceInfoDirty為true,lastDirtyTimestamp為當前時間
}
}
public void refreshLeaseInfoIfRequired() {
LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
if (leaseInfo == null) {
return;
}
int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {//配置引數變了
LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(currentLeaseRenewal)
.setDurationInSecs(currentLeaseDuration)
.build();
instanceInfo.setLeaseInfo(newLeaseInfo);
instanceInfo.setIsDirty();
}
}
以上就是eureka客戶端的註冊。
2.2服務下架
我們看EurekaClient介面,裡面有個shutdown,我們看到@PreDestroy當servlet關閉的時候就會觸發。
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();//關閉心跳,服務替換,快取重新整理等定時器
// If APPINFO was registered
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//設定狀態為down
unregister();//通知服務端客戶端下線
}
logger.info("Completed shut down of DiscoveryClient");
}
2.3心跳機制
2.1我們分析了服務註冊,設計到了renew()當返回404的時候是服務註冊,200的時候就是傳送心跳的機制預設30秒傳送一次。
2.4服務獲取
當eureka客戶端啟動的時候會註冊到eureka服務端上,其他客戶端也需要感知該eureka啟動,從而讀取配置資訊,服務之間的資訊獲取也是通過定時器獲取的,在initScheduledTasks();方法中,我們看到啟動了一個CacheRefreshThread執行緒,時間間隔預設為30秒,我們直接看該執行緒裡面的fetchRegistry(boolean forceFullRegistryFetch);方法,這裡有兩種拉取,一種是全量拉取,一種是增量拉取。全量拉取方法為getAndStoreFullRegistry()程式碼:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())//rest請求伺服器獲得例項資訊
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));//存放到DiscoveryClient物件的localRegionApps的AtomicReference物件中
} else {
}
}
3.eureka服務端原始碼分析
分析eureka客戶端功能的時候我們發現客戶端是通過httprest請求來註冊/拉取資訊的,那麼eureka服務端一定是一個類似spring MVC的專案結構。找到EurekaServerAutoConfiguration類,看到jerseyApplication()方法,在容器中存放了一個jerseyApplication物件,jerseyApplication()方法裡的東西和Spring原始碼裡掃描@Component邏輯類似,掃描@Path和@Provider標籤,然後封裝成beandefinition,封裝到Application的set容器裡。通過filter過濾器來過濾url進行對映到物件的Controller。
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();//核心是一個filter
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));//攔截/eureka開頭的所有請求
return bean;
}
以上是對jersey的初步介紹,通過分析eureka客戶端,我們大概知道客戶端有這幾個功能
- 服務接受請求(認識jersey)
- 接受客戶端註冊/心跳/下架請求並處理
- 服務剔除
以下是eureka服務端自身高可用層面的功能點
- 自我保護
- 服務之間的資訊同步
3.1服務怎麼接受請求
上面介紹了jersey和eureka怎麼整合jersey,這裡就不多說。
3.2接受客戶端註冊/心跳/下架請求並處理
服務端接受客戶端的註冊
在eurekawiki上https://github.com/Netflix/eureka/wiki/Eureka-REST-operations我們我們知道註冊到服務端是呼叫的POST /eureka/v2/apps/appID 介面,找到了ApplicationsResource類中呼叫了ApplicationResource的addInstance()方法,找到register()方法
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;//預設有效時長90m
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);//例項註冊,下面具體看這個
//同步到其他服務
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();//執行緒安全的一個服務例項map,name為cloud專案中的例項名字,巢狀裡面的map是以key為instanceId,Lease物件為value的一個map
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//appname就為cloud配置裡的spring.application.name
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//如果第一個例項註冊進來的時候會給registryput進去一個空的lease
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//這個id就是instanceId
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//put了一個lease
gMap.put(registrant.getId(), lease);
} finally {
read.unlock();
}
}
服務端接受客戶端的續約(心跳)
介面在InstanceResource#renewLease()。服務續約其實就是維護例項狀態,更新一下最後更新時間,然後同步到其他服務端。直接看renew()方法
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到例項對應的lease物件
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);//得到例項
}
if (leaseToRenew == null) {//error
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
...
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
instanceInfo.setStatus(overriddenInstanceStatus);//修改例項狀態
}
}
renewsLastMin.increment();
leaseToRenew.renew();//更新組後更新時間
return true;
}
}
服務端接受客戶端要下架請求
服務下架介面在InstanceResource#cancelLease()方法,直接看internalCancel()方法
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到所有例項
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);//從map中移除掉下架例項
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
}
}
3.3服務剔除
當客戶端長時間(預設90秒)沒有給服務端傳送請求的時候,就說明客戶端down了,看過Spring原始碼得都明白,Spring原始碼比較重要得方法就在AbstractApplicationContext#refresh()方法,裡面從掃描了xml/java檔案到掃描註解,到進行DI到ioc容器然後再到銷燬bean,最後有一個finishRefresh();方法,這是Spring所有工作做完之後呼叫得方法,一直調到了DefaultLifecycleProcessor#onRefresh()下得#startBeans(true);下的#start();下的doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);下的start()方法。在這裡會呼叫到實現了Lifecycle介面的所有的start()方法,而在EurekaServerAutoConfiguration類中,我們看到import了一個實現了Lifecycle介面的EurekaServerInitializerConfiguration類,在start方法裡初始化了一個單獨的EurekaServerContext的上下文。在initEurekaServerContext()方法中,
執行了registry.openForTraffic(applicationInfoManager, registryCount);最後一句呼叫了AbstractInstanceRegistry#postInit()方法,在此方法裡開啟了一個每60秒呼叫一次EvictionTask#evict()的定時器。
public void evict(long additionalLeaseMs) {
if (!isLeaseExpirationEnabled()) {//如果開啟自我保護,則不自動剔除。預設開啟
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);//如果過期了,加入到expiredLeases的list中
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);//移除伺服器快取,同步其他伺服器
}
}
}
3.4服務自我保護模式
客戶端長時間不傳送續約(心跳),服務端預設每一分鐘會進行一次服務剔除,3.3裡又一個isLeaseExpirationEnabled()方法:
/**
* 期望 最大 每分鐘 續租 次數。 計算公式 當前註冊的應用例項數 x 2
*/
protected volatile int expectedNumberOfRenewsPerMin ;
/**
* 期望 最小 每分鐘 續租 次數。 計算公式 expectedNumberOfRenewsPerMin * 續租百分比( eureka.renewalPercentThreshold )
*/
protected volatile int numberOfRenewsPerMinThreshold ;
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {//預設開啟自我保護,false則關閉自我保護
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;//每分鐘心跳數大於期望最小每分鐘續租次數代表這個例項還活著
}
3.5服務之間資訊同步
上面說到的服務註冊,服務剔除,服務續約等功能的時候在修改完本地業務之後會呼叫PeerAwareInstanceRegistryImpl#replicateToPeers()方法,同步到其他伺服器。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {//
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
Eureka二次傳播問題:當eureka A配置到B,B配置到C的時候,客戶端註冊到伺服器A,這個時候伺服器A,B會有客戶端資訊,C則沒有。程式碼分析結果如下:
當客戶端註冊到服務端A的時候A上有客戶端資訊,這個時候會同步一遍B服務端,則,B同步到C的時候isReplication則為false,就不會同步過去了。