SpringCloud之Eureka的定時任務詳解(Server)
1.EurekaServer內定時更新叢集內其他Server節點
public class PeerEurekaNodes {
/**
* Eureka-Server 叢集節點陣列
*/
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
/**
* Eureka-Server 服務地址陣列
*/
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
/**
* 啟動 Eureka-Server 叢集節點集合(複製)
*/
public void start() {
......
// 初始化 叢集節點資訊
updatePeerEurekaNodes(resolvePeerUrls());
// 初始化 初始化固定週期更新叢集節點資訊的任務
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 每隔10分鐘更新叢集節點
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
......
}
/**
* Resolve peer URLs. 獲取Server叢集的所有serviceUrl,不包括自身
*
* @return peer URLs with node's own URL filtered out
*/
protected List<String> resolvePeerUrls() {
// 獲得 Eureka-Server 叢集服務地址陣列
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
// 獲取相同Region下的所有serviceUrl
List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
// 移除自己(避免向自己同步)
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
}
EurekaServer在初始化時會根據配置的Server叢集url地址,來例項化叢集內其他Server節點的互動例項,用於叢集間的資料同步。
EurekaServer在獲取Server URL時用的還是EurekaClient,也就是環境裡eureka.client開頭的配置,客戶端配置一模一樣。
其實EurekaServer集成了EurekaClient,Client的配置都可以用到Server上,只不過很多可以略去,比如是否需要向Server傳送心跳registerWithEureka,可以設定為false等。
Client裡的serviceUrl的含義是可以向哪些Server節點註冊和拉取服務資訊;而Server裡的serviceUrl的含義是叢集裡有哪些Server節點,當自身節點有服務操作是需要向哪些節點同步。
Client正常情況下只合Server叢集中的一個互動,而Server在有服務操作時會同步至所有其他的節點。
應用的配置資訊是可能發生變化的,所以Client和Server才需要定時的重新整理叢集節點資訊,關閉那些不再連線的Server節點,初始化新增的節點。
2.每隔一分鐘統計最近一分鐘內所有Client的續約次數,也就是接收到的心跳次數,以此來作為是否觸發服務資訊回收的依據之一
public class MeasuredRate {
/**
* 間隔, 預設60S
*/
private final long sampleInterval;
public synchronized void start() {
if (!isActive) {
// 每隔一分鐘執行一次定時任務, 更新最新的總的續約次數, 這樣就能計算一分鐘內續約的次數, 以此來判斷續約次數是否低於閾值
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket. 將 currentBucket 賦值給lastBucket, 然後重置 currentBucket
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
}
EurekaServer每隔1分鐘執行一次服務資訊回收,回收那些超過90S沒有傳送心跳,也就是續約的服務資訊,當然前提是EurekaServer開啟租約過期功能,且未觸發自我保護臨界值。
所謂自我保護,就是指最近一分鐘內的續約總數 > 預估的續約總數 * 0.85。 近似的來講,也就是一分鐘內有超過85%的應用資訊傳送了心跳。如果這個條件未滿足,那麼不會執行服務回收操作。
比如當Server節點的網路不穩定,丟失了部分心跳資訊,如果超過了15,那麼就不會觸發自我保護,停止服務資訊的回收,而這也是我們希望服務發現元件應該具備的功能,強調可用性。
從這點上看其和Zookeeper的服務發現機制有很大不同。
3.EurekaServer每隔60S執行一次服務資訊的回收
/**
* 租約過期任務
*/
/* visible for testing */
class EvictionTask extends TimerTask {
/**
* 上一次執行清理任務的時間
*/
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);
@Override
public void run() {
try {
// 獲取 補償時間毫秒數, 計算這次執行距離上次執行的時間差,與60S的距離
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
// 清理過期租約邏輯
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0L) {
return 0L;
}
// 此次執行與上次執行的時間差
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
// 檢視時間間隔是否比60S大
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
// 如果未超過60S, 返回; 否則返回超過的時間差
return compensationTime <= 0L ? 0L : compensationTime;
}
long getCurrentTimeNano() { // for testing
return System.nanoTime();
}
}
EurekaServer每隔60S執行一次服務資訊的回收任務,移除那些超過90S未更新租約資訊的服務。
當然能夠回收的前提是開啟了租約回收功能,而且未觸發自我保護。所謂的自我保護機制,就是最近一分鐘內的實際續約次數比例超過期望總數的85%,如果未超過,那麼認為是Server出現了問題,不進行服務回收。
4.定時更新續約次數的期望值和自我保護的臨界值
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
/**
* Schedule the task that updates <em>renewal threshold</em> periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
*/
private void scheduleRenewalThresholdUpdateTask() {
// 15分鐘後更新續約閾值,之後每隔15分分鐘執行一次
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
/**
* 更新續約閾值,也就是每分鐘期望續約的次數,以及觸發自我保護的最低續約次數
* Updates the <em>renewal threshold</em> based on the current number of
* renewals. The threshold is a percentage as specified in
* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
* received per minute {@link #getNumOfRenewsInLastMin()}.
*/
private void updateRenewalThreshold() {
try {
// 計算 應用例項數
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
// 計算 expectedNumberOfRenewsPerMin 、 numberOfRenewsPerMinThreshold 引數
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
// 不會一次性的把續約次數將至85%以下,也就是隻有在存活應用資訊數量超過總數的85%時才能更新,這樣就不會修改續約的自我保護的臨界值
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int)((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
}
服務自身取消,會相應的降低續約期望總數和自我保護臨界值,每取消一個,數值均減2。但因為續約時間超時而被動移除的服務資訊,不會相應的減少期望總值和臨界值。
如果不定時的更新期望總值和臨界值,那麼當服務逐漸的因心跳超時而被移除時,很容易就觸發保護臨界值,之後就不能再移除那些心跳超時的服務資訊。
但是在更新總值和臨界值時,如果當前Server處於自我保護狀態,那麼也不能強制的改變臨界值,這會強制的退出自我保護狀態。所以更新總值和臨界值的前提是當前Server不處於自我保護狀態,也就是上一分鐘的續約總數的比例超過85%。
5.服務資訊增量快取更新任務
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
/**
* 最近租約變更記錄佇列
*/
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
......
// 30S後每隔30S執行一次, 移除3分鐘前發生的續約記錄
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(), // 30S
serverConfig.getDeltaRetentionTimerIntervalInMs()); // 30S
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
RecentlyChangedItem item = it.next();
// 如果某個續約任務是3分鐘前發生的,那麼移除它
if (item.getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
}
EurekaClient在初始化時進行一次全量拉取,之後每隔30S執行一次增量拉取,也就是會返回recentlyChangedQueue裡的記錄,EurekaClient根據記錄的操作型別和服務資訊,相應的更新自身持有的可用服務資訊。
recentlyChangedQueue 是一個有序佇列,當Client向Server執行操作時,比如註冊,狀態變更,取消等(續約不會記錄),那麼會記錄操作的時間,型別和相應的服務資訊。
通過增量資訊來保持同步,能夠極大的減少Server和Client之間的資料的傳輸,降低IO消耗。
6.每隔30S執行一次,更新只讀響應快取
public class ResponseCacheImpl implements ResponseCache {
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
......
// 僅使用只讀快取, 因此每隔30S執行更新快取任務
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
}
/**
* 快取更新任務, 每隔30S執行一次
*
* @return
*/
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) { // 迴圈 readOnlyCacheMap 的快取鍵
if (logger.isDebugEnabled()) {
Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) { // 不一致時,進行替換
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
}
EurekaServer會快取資料資訊,根據Key的不同值快取相應的結果,當Client獲取資訊時,優先用只讀快取的資料返回,如果只讀快取不存在,那麼從讀寫快取處獲取,然後存入只讀快取,最後返回結果。
讀寫快取藉助guava的CacheBuilder來實現快取淘汰,在寫入180S後失效,這樣當只讀快取定期更新時,如果發現讀寫快取的值和只讀快取的不一致時,進行替換。
當Client進行相應操作,比如註冊,狀態變更,取消等操作時,會時對應的快取立即失效,保證Client獲取到的是有效的服務資訊。