1. 程式人生 > >Netflix Eureka原始碼分析(18)——eureka server網路故障時的的自我保護機制原始碼剖析

Netflix Eureka原始碼分析(18)——eureka server網路故障時的的自我保護機制原始碼剖析

假如說,20個服務例項,結果在1分鐘之內,只有8個服務例項保持了心跳 --> eureka server是應該將剩餘的12個沒有心跳的服務例項都摘除嗎?

這個時候很可能說的是,eureka server自己網路故障了,那些服務沒問題的。只不過eureka server自己的機器所在的網路故障了,導致那些服務的心跳傳送不過來。就導致eureka server本地一直沒有更新心跳。

其實eureka server自己會進入一個自我保護的機制,從此之後就不會再摘除任何服務例項了

登錄檔的evict()方法,EvictionTask,定時排程的任務,60s來一次,會判斷一下服務例項是否故障了,如果故障了,一直沒有心跳,就會將服務例項給摘除。

1、evict()方法內部,先會判斷上一分鐘的心跳次數,是否小於我期望的一分鐘的心跳次數,如果小於,那麼壓根兒就不讓清理任何服務例項

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

     public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        //自我保護機制,直接return,不摘除任何服務例項
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    //呼叫Lease的isExpired()方法,來判斷當前這個服務例項的租約是否過期了,是否失效了是否故障了
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
}
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            //自我儲存模式被禁用,因此允許例項過期。
            return true;
        }
        //numberOfRenewsPerMinThreshold代表期望一分鐘至少有多少次心跳
        //getNumOfRenewsInLastMin() 獲取上一分鐘心跳的總次數
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }
}

2、我期望的一分鐘的心跳次數是怎麼算出來的?

(1)eureka server啟動的時候,就會初始化一次這個值

EurekaBootStrap是啟動的初始化的類

registry.openForTraffic(applicationInfoManager, registryCount);

完成了numberOfRenewsPerMinThreshold這個值,我期望一分鐘得有多少次心跳的值,初始化。剛開始會呼叫syncUp()的方法,從相鄰的eureka server節點,拷貝過來登錄檔,如果是自己本地還沒註冊的服務例項,就在自己本地註冊一下。

會記錄一下從別的eureka server拉取過來的服務例項的數量registryCount,將這個服務例項的數量,就作為自己eureka server本地初始化的這麼一個服務例項的數量

protected void initEurekaServerContext() throws Exception {
        //之前的程式碼處理省略。。。
        //從相鄰的eureka server節點,拷貝過來登錄檔
        int registryCount = registry.syncUp();
        registry.openForTraffic(applicationInfoManager, registryCount);
}

將 服務例項數量 * 2 * 0.85 ,期望心跳次數的計算,居然hard code了。

假設你現在有20個服務例項,每個服務例項每30秒傳送一次心跳,於是一分鐘一個服務例項應該傳送2次心跳,1分鐘內我期望獲取到的心跳的次數,應該是20 * 2 = 40個心跳。

用這個服務例項 * 2 * 0.85 = 20 * 2 * 0.85 = 34,期望的是最少一分鐘20個服務例項,得有34個心跳。根據當前的服務例項的數量,計算出來的一分鐘最少需要的心跳次數。

硬編碼可能會產生的問題:假設現在我們預設的心跳是30秒1次,如果我調整了撐10秒一次心跳了???怎麼辦??這裡的count * 2,就錯了。

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfRenewsPerMin = count * 2;
        //初始化numberOfRenewsPerMinThreshold的值 服務例項數量 * 2 * 0.85
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got " + count + " instances from neighboring DS node");
        logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }
}

(2)註冊、下線、故障

這個每分鐘期望的心跳的次數,是跟咱們的這個服務例項的數量相關的,服務例項隨著上線和下線、故障,都在不斷的變動著。註冊的時候,每分鐘期望心跳次數 + 2。服務下線的時候,直接每分鐘期望心跳次數 - 2。

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
 
    /**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        // The lease does not exist and hence it is a new registration
        synchronized (lock) {
            if (this.expectedNumberOfRenewsPerMin > 0) {
                // Since the client wants to cancel it, reduce the threshold
                // (1
                // for 30 seconds, 2 for a minute)
                this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                this.numberOfRenewsPerMinThreshold =
                        (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
            }
        }
        logger.debug("No previous lease information found; it is new registration");
    }
}
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
        if (super.cancel(appName, id, isReplication)) {
            replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            return true;
        }
        return false;
    }
}

注意:故障的時候,摘除一個服務例項,居然沒找到更新期望心跳次數的程式碼。bug,如果說有很多的服務例項都是故障下線的,摘除了。結果每分鐘期望的心跳次數並沒有減少,但是實際的服務例項變少了一些,就會導致實際的心跳次數變少,如果說出現較多的服務例項故障被自動摘除的話,很可能會快速導致eureka server進自我保護機制。

實際的心跳次數比期望的心跳次數要小,就不會再摘除任何服務例項了

(3)定時更新

Registry登錄檔,預設是15分鐘,會跑一次定時任務,算一下服務例項的數量,如果從別的eureka server拉取到的服務例項的數量,大於當前的服務例項的數量,會重新計算一下,主要是跟其他的eureka server做一下同步

觸發概率很小

3、實際的上一分鐘的心跳次數是怎麼算出來的

抓大放小,之前我們看原始碼的時候,看到過這個MeasuredRate,當時肯定是看不懂的,因為很多程式碼,都是一個機制相關的。每次一個心跳過來,一定會更新這個MeasuredRate。來計算每一分鐘的心跳的實際的次數。

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    public boolean renew(String appName, String id, boolean isReplication) {
        //每傳送一次心跳,currentBucket就累加一次
        renewsLastMin.increment();
    }
}

MeasuredRate類,好好看看,技術亮點:如何計算每一分鐘內的一個記憶體中的計數的呢?計算每一分鐘內的心跳的次數?

public class MeasuredRate {
    private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
    //上一分鐘心跳的總次數
    private final AtomicLong lastBucket = new AtomicLong(0);
    //每傳送一次心跳,currentBucket就累加一次,每60秒清零一次
    private final AtomicLong currentBucket = new AtomicLong(0);

    private final long sampleInterval;
    private final Timer timer;

    private volatile boolean isActive;

    /**
     * @param sampleInterval in milliseconds
     */
    public MeasuredRate(long sampleInterval) {
        this.sampleInterval = sampleInterval;
        this.timer = new Timer("Eureka-MeasureRateTimer", true);
        this.isActive = false;
    }

    public synchronized void start() {
        if (!isActive) {
            timer.schedule(new TimerTask() {

                @Override
                public void run() {
                    try {
                        // Zero out the current bucket==》把當前的桶清零
                        //1.將currentBucket的值賦給lastBucket,
                        //2.將currentBucket清零
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);

            isActive = true;
        }
    }

    public synchronized void stop() {
        if (isActive) {
            timer.cancel();
            isActive = false;
        }
    }

    /**
     * Returns the count in the last sample interval.
     * 返回上一分鐘心跳的總次數
     */
    public long getCount() {
        return lastBucket.get();
    }

    /**
     * Increments the count in the current sample interval.
     * 每傳送一次心跳,currentBucket就累加一次
     */
    public void increment() {
        currentBucket.incrementAndGet();
    }
}

4、來看看自我保護機制的觸發

如果上一分鐘實際的心跳次數,比我們期望的一分鐘的心跳次數要小,觸發自我保護機制,不允許摘除任何服務例項,此時認為自己的eureka server出現網路故障,大量的服務例項無法傳送心跳過來

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            //自我儲存模式被禁用,因此允許例項過期。
            return true;
        }
        //numberOfRenewsPerMinThreshold代表期望一分鐘至少有多少次心跳
        //getNumOfRenewsInLastMin() 獲取上一分鐘心跳的總次數
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }
}

5、eureka這一塊,自我保護機制,必須從原始碼級別要看懂

因為其實在線上的時候,最坑爹的就是這兒,就是你會發現有些服務例項下線了,但是eureka控制檯老是沒給他摘除,自我保護機制了。線上生產環境,如果你可以的話,你可以選擇將這個自我保護給關了。如果eureka server接收不到心跳的話,各個服務例項也是無法從eureka server拉取登錄檔的。每個服務例項直接基於自己的本地的登錄檔的快取來就可以了。自我保護機制給開啟也可以,從原始碼層面已經知道了,服務故障摘除,自我保護的原始碼,如果你發現線上生產環境,出現了一些問題,你可以從原始碼級別去看一下是怎麼回事。

總結:eureka自我保護機制 流程圖