1. 程式人生 > >springcloud 和apollo配置中心一起使用時踩到的一個坑

springcloud 和apollo配置中心一起使用時踩到的一個坑

場景描述:

今天收到線上一個服務的instance deregistered的告警郵件,在springboot admin上確實那個instance不線上了。於是去伺服器上jps看了一下,程序正常的,但是/health端點資訊顯示eureka為down。檢視error.log,並沒有錯誤日誌,檢視warn.log,發現裡面有些許的警告日誌如下:

[2018-09-20T19:12:24.159+08:00] WARN: [TID: N/A] [Apollo-Config-1] DiscoveryClient.java:1291 - Saw local status change event StatusChangeEvent [timestamp=1537441944159, current=DOWN, previous=UP] [2018-09-20T19:12:24.160+08:00] WARN: [TID: N/A] [Apollo-Config-1] DiscoveryClient.java:1291 - Saw local status change event StatusChangeEvent [timestamp=1537441944160, current=UP, previous=DOWN] [2018-09-20T19:12:24.160+08:00] WARN: [TID: N/A] [DiscoveryClient-InstanceInfoReplicator-0] DiscoveryClient.java:1291 - Saw local status change event StatusChangeEvent [timestamp=1537441944160, current=DOWN, previous=UP][2018-09-20T19:12:24.160+08:00] WARN: [TID: N/A] [DiscoveryClient-InstanceInfoReplicator-0] InstanceInfoReplicator.java:98 - Ignoring onDemand update due to rate limiter

開啟InstanceInfoReplicator.java查看了一下原始碼,發現是由於單位分鐘內向eureka server同步次數超過限制,導致有一個StatusChangeEvent被忽略掉了。

從上面的警告日誌也可以看出一些端倪,就是在很短的時間內,收到了2個StatusChangeEvent事件,一次是從UP-》DOWN,一次是從DOWN->UP。apollo自身也使用了eureka,所以apollo也處理了這個事件兩次(第一條和第二條日誌,在Apollo-Config執行緒組中).然後springcloud內部也處理了這個事件(在DiscoveryClient-InstanceInfoReplicator執行緒組中),但是隻處理了一個從UP->DOWN的事件,另外一個從DOWN->UP的事件由於rate limiter限制導致被忽略了,最終導致這個instance的health永遠是down狀態。

以下是處理statusChangeEvent的程式碼片段,來自com.netflix.discovery.DiscoveryClient#initScheduledTasks方法:

 private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

中的StatusChangeListener的notify方法。裡面呼叫了instanceInfoReplicator.onDemandUpdate()方法,嘗試通過令牌桶演算法來限流:

 public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

警告日誌中的最後一行日誌就是來自這個方法。

原始碼裡面burstSize寫死了等於2,沒法改變,allowedRatePerMinute可以通過:

eureka.client.instance-info-replication-interval-seconds

引數來改變,預設是30秒同步一次。

每分鐘限制同步多少次是在InstanceInfoReplicator類的建構函式中計算的:

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());

        this.scheduledPeriodicRef = new AtomicReference<Future>();

        this.started = new AtomicBoolean(false);
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;

        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }

this.allowedRatePerMinute = 60* this.burstSize / this.replicationIntervalSeconds ,預設為: 60*2/30=4。所以導致了第四個StatusChangedEvent被丟棄了。

那我建議在和apollo一起使用時,將eureka.client.instance-info-replication-interval-seconds改成24,即每分鐘最多可以執行60*2/24=5次複製instance資訊的請求,這樣就可能避免該問題的發生。