1. 程式人生 > >SpringCloud之Eureka的定時任務詳解(Server)

SpringCloud之Eureka的定時任務詳解(Server)

1.EurekaServer內定時更新叢集內其他Server節點

public class PeerEurekaNodes {

    /**
     * Eureka-Server 叢集節點陣列
     */
    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    /**
     * Eureka-Server 服務地址陣列
     */
    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();

    /**
     * 啟動 Eureka-Server 叢集節點集合(複製)
     */
public void start() { ...... // 初始化 叢集節點資訊 updatePeerEurekaNodes(resolvePeerUrls()); // 初始化 初始化固定週期更新叢集節點資訊的任務 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch
(Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // 每隔10分鐘更新叢集節點 taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); ...... } /** * Resolve peer URLs. 獲取Server叢集的所有serviceUrl,不包括自身 * * @return
peer URLs with node's own URL filtered out */
protected List<String> resolvePeerUrls() { // 獲得 Eureka-Server 叢集服務地址陣列 InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); // 獲取相同Region下的所有serviceUrl List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); // 移除自己(避免向自己同步) int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; } }

EurekaServer在初始化時會根據配置的Server叢集url地址,來例項化叢集內其他Server節點的互動例項,用於叢集間的資料同步。

EurekaServer在獲取Server URL時用的還是EurekaClient,也就是環境裡eureka.client開頭的配置,客戶端配置一模一樣。

其實EurekaServer集成了EurekaClient,Client的配置都可以用到Server上,只不過很多可以略去,比如是否需要向Server傳送心跳registerWithEureka,可以設定為false等。

Client裡的serviceUrl的含義是可以向哪些Server節點註冊和拉取服務資訊;而Server裡的serviceUrl的含義是叢集裡有哪些Server節點,當自身節點有服務操作是需要向哪些節點同步。
Client正常情況下只合Server叢集中的一個互動,而Server在有服務操作時會同步至所有其他的節點。

應用的配置資訊是可能發生變化的,所以Client和Server才需要定時的重新整理叢集節點資訊,關閉那些不再連線的Server節點,初始化新增的節點。

2.每隔一分鐘統計最近一分鐘內所有Client的續約次數,也就是接收到的心跳次數,以此來作為是否觸發服務資訊回收的依據之一

public class MeasuredRate {

    /**
     * 間隔, 預設60S
     */
    private final long sampleInterval;

    public synchronized void start() {
        if (!isActive) {
            // 每隔一分鐘執行一次定時任務, 更新最新的總的續約次數, 這樣就能計算一分鐘內續約的次數, 以此來判斷續約次數是否低於閾值
            timer.schedule(new TimerTask() {

                @Override
                public void run() {
                    try {
                        // Zero out the current bucket. 將 currentBucket 賦值給lastBucket, 然後重置 currentBucket
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);

            isActive = true;
        }
    }
}

EurekaServer每隔1分鐘執行一次服務資訊回收,回收那些超過90S沒有傳送心跳,也就是續約的服務資訊,當然前提是EurekaServer開啟租約過期功能,且未觸發自我保護臨界值。

所謂自我保護,就是指最近一分鐘內的續約總數 > 預估的續約總數 * 0.85。 近似的來講,也就是一分鐘內有超過85%的應用資訊傳送了心跳。如果這個條件未滿足,那麼不會執行服務回收操作。

比如當Server節點的網路不穩定,丟失了部分心跳資訊,如果超過了15,那麼就不會觸發自我保護,停止服務資訊的回收,而這也是我們希望服務發現元件應該具備的功能,強調可用性。
從這點上看其和Zookeeper的服務發現機制有很大不同。

3.EurekaServer每隔60S執行一次服務資訊的回收

/**
 * 租約過期任務
 */
/* visible for testing */
class EvictionTask extends TimerTask {

    /**
     * 上一次執行清理任務的時間
     */
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);

    @Override
    public void run() {
        try {
            // 獲取 補償時間毫秒數, 計算這次執行距離上次執行的時間差,與60S的距離
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            // 清理過期租約邏輯
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }

    /**
     * compute a compensation time defined as the actual time this task was executed since the prev iteration,
     * vs the configured amount of time for execution. This is useful for cases where changes in time (due to
     * clock skew or gc for example) causes the actual eviction task to execute later than the desired time
     * according to the configured cycle.
     */
    long getCompensationTimeMs() {
        long currNanos = getCurrentTimeNano();
        long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0L) {
            return 0L;
        }
        // 此次執行與上次執行的時間差
        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
        // 檢視時間間隔是否比60S大
        long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
        // 如果未超過60S, 返回; 否則返回超過的時間差
        return compensationTime <= 0L ? 0L : compensationTime;
    }

    long getCurrentTimeNano() {  // for testing
        return System.nanoTime();
    }

}

EurekaServer每隔60S執行一次服務資訊的回收任務,移除那些超過90S未更新租約資訊的服務。

當然能夠回收的前提是開啟了租約回收功能,而且未觸發自我保護。所謂的自我保護機制,就是最近一分鐘內的實際續約次數比例超過期望總數的85%,如果未超過,那麼認為是Server出現了問題,不進行服務回收。

4.定時更新續約次數的期望值和自我保護的臨界值

public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    /**
     * Schedule the task that updates <em>renewal threshold</em> periodically.
     * The renewal threshold would be used to determine if the renewals drop
     * dramatically because of network partition and to protect expiring too
     * many instances at a time.
     */
    private void scheduleRenewalThresholdUpdateTask() {
        // 15分鐘後更新續約閾值,之後每隔15分分鐘執行一次
        timer.schedule(new TimerTask() {
                           @Override
                           public void run() {
                               updateRenewalThreshold();
                           }
                       }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
    }

    /**
     * 更新續約閾值,也就是每分鐘期望續約的次數,以及觸發自我保護的最低續約次數
     * Updates the <em>renewal threshold</em> based on the current number of
     * renewals. The threshold is a percentage as specified in
     * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
     * received per minute {@link #getNumOfRenewsInLastMin()}.
     */
    private void updateRenewalThreshold() {
        try {
            // 計算 應用例項數
            Applications apps = eurekaClient.getApplications();
            int count = 0;
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            // 計算 expectedNumberOfRenewsPerMin 、 numberOfRenewsPerMinThreshold 引數
            synchronized (lock) {
                // Update threshold only if the threshold is greater than the
                // current expected threshold of if the self preservation is disabled.
                // 不會一次性的把續約次數將至85%以下,也就是隻有在存活應用資訊數量超過總數的85%時才能更新,這樣就不會修改續約的自我保護的臨界值
                if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfRenewsPerMin = count * 2;
                    this.numberOfRenewsPerMinThreshold = (int)((count * 2) * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
    }
}

服務自身取消,會相應的降低續約期望總數和自我保護臨界值,每取消一個,數值均減2。但因為續約時間超時而被動移除的服務資訊,不會相應的減少期望總值和臨界值。

如果不定時的更新期望總值和臨界值,那麼當服務逐漸的因心跳超時而被移除時,很容易就觸發保護臨界值,之後就不能再移除那些心跳超時的服務資訊。

但是在更新總值和臨界值時,如果當前Server處於自我保護狀態,那麼也不能強制的改變臨界值,這會強制的退出自我保護狀態。所以更新總值和臨界值的前提是當前Server不處於自我保護狀態,也就是上一分鐘的續約總數的比例超過85%。

5.服務資訊增量快取更新任務

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    /**
     * 最近租約變更記錄佇列
     */
    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();

    protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
        ......
        // 30S後每隔30S執行一次, 移除3分鐘前發生的續約記錄
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            serverConfig.getDeltaRetentionTimerIntervalInMs(),  // 30S
            serverConfig.getDeltaRetentionTimerIntervalInMs()); // 30S
    }

    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {

            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    RecentlyChangedItem item = it.next();
                    // 如果某個續約任務是3分鐘前發生的,那麼移除它
                    if (item.getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }

        };
    }
}

EurekaClient在初始化時進行一次全量拉取,之後每隔30S執行一次增量拉取,也就是會返回recentlyChangedQueue裡的記錄,EurekaClient根據記錄的操作型別和服務資訊,相應的更新自身持有的可用服務資訊。

recentlyChangedQueue 是一個有序佇列,當Client向Server執行操作時,比如註冊,狀態變更,取消等(續約不會記錄),那麼會記錄操作的時間,型別和相應的服務資訊。

通過增量資訊來保持同步,能夠極大的減少Server和Client之間的資料的傳輸,降低IO消耗。

6.每隔30S執行一次,更新只讀響應快取

public class ResponseCacheImpl implements ResponseCache {

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        ......
        // 僅使用只讀快取, 因此每隔30S執行更新快取任務
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
        }
    }

    /**
     * 快取更新任務, 每隔30S執行一次
     *
     * @return
     */
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) { // 迴圈 readOnlyCacheMap 的快取鍵
                    if (logger.isDebugEnabled()) {
                        Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) { // 不一致時,進行替換
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    }
                }
            }
        };
    }
}

EurekaServer會快取資料資訊,根據Key的不同值快取相應的結果,當Client獲取資訊時,優先用只讀快取的資料返回,如果只讀快取不存在,那麼從讀寫快取處獲取,然後存入只讀快取,最後返回結果。

讀寫快取藉助guava的CacheBuilder來實現快取淘汰,在寫入180S後失效,這樣當只讀快取定期更新時,如果發現讀寫快取的值和只讀快取的不一致時,進行替換。

當Client進行相應操作,比如註冊,狀態變更,取消等操作時,會時對應的快取立即失效,保證Client獲取到的是有效的服務資訊。