1. 程式人生 > >深入Spring Cloud原始碼設計-eureka一篇就夠

深入Spring Cloud原始碼設計-eureka一篇就夠

What is eureka ?

Eureka is a REST (Representational State Transfer) based service that is primarily used in the AWS cloud for locating services for the purpose of load balancing and failover of middle-tier servers. We call this service, the Eureka Server. Eureka also comes with a Java-based client component,the Eureka Client, which makes interactions with the service much easier. The client also has a built-in load balancer that does basic round-robin load balancing.

eureka 發音 [juˈri:kə] 伊瑞克

翻譯後可簡單概括為:

Eureka是一個基於 REST 的服務,用於定位服務,以實現雲端中間層服務發現和故障轉移的中介軟體。它有兩個重要組成部分,Eureka服務端和基於JAVA的客戶端元件。

eureka核心模組

eureka基本架構

由上圖可以看出,eureka主要有三種角色:

  1. Eureka Server
    eureka服務端,主要用來做服務的註冊與發現
  2. Service Provider
    服務的實際提供方,會將服務註冊到Eureka Server上
  3. Service Consumer
    服務消費方,從Eureka Server獲取服務列表,向Service Provider發起真實呼叫請求

TIP:這三個角色是邏輯上的劃分,可能在使用時,這幾個角色可以是同一個例項;
一個Service Provider既可以是Service Consumer,也可以是Service Provider

這裡寫圖片描述

上圖進一步展示了3個角色之間的互動。

  1. ServiceProvider會向Eureka Server做Register(服務註冊)、Renew(服務續約)、Cancel(服務下線)等操作。
  2. EurekaServer之間會做註冊服務的同步,從而保證狀態一致
  3. ServiceConsumer會向Eureka Server獲取註冊服務列表,並消費服務

eureka原始碼閱讀入口

eureka的服務端

A: EurekaBootStrap.java 實現了ServletContextListener介面,當專案啟動時會初始化該類,觸發contextInitialized方法的執行。

B: contextInitialized中呼叫了initEurekaServerContext方法。

C:
initEurekaServerContext方法依次呼叫了三個方法
PeerAwareInstanceRegistryImpl初始化
PeerEurekaNodes初始化
DefaultEurekaServerContext初始化,該類有@Singleton註解,並含有@PostConstruct註解(構造方法執行後執行)的initialize方法

D:
initialize呼叫了兩個方法
PeerEurekaNodes的start方法
PeerAwareInstanceRegistryImpl的init方法

E:
init方法依次呼叫三個方法
- initializedResponseCache方法,該方法繼續呼叫ResponseCacheImpl類,最後使用的是guava cache
- scheduleRenewalThresholdUpdateTask使用Timer做定時任務定時更新新註冊服務配置引數更新
- initRemoteRegionRegistry初始化註冊中心

eureka的客戶端

DiscoveryClient.java這個類中含有了client側的很多操作:
register()
renew()
unregister()
fetchRegistry() 等等

register()方法呼叫過程

最為主要的register方法,是在DiscoveryClient初始化的過程中被呼叫,如下DiscoveryClient構造方法中呼叫了initScheduledTasks()

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        ...

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

    }

initScheduledTasks 方法中啟動了InstanceInfoReplicator執行緒,
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())

   private void initScheduledTasks() {
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
          ...
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

InstanceInfoReplicator實現了Runnable介面是一個執行緒,其run方法邏輯如下,可以看到其呼叫了 discoveryClient.register();

 public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

再來看看DiscoveryClient的register方法

  /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

renew ()方法呼叫過程

Renew(服務續約)操作由Service Provider定期呼叫,類似於heartbeat。主要是用來告訴Eureka Server Service Provider還活著,避免服務被剔除掉

  /**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

renew方法在HeartbeatThread執行緒中被呼叫

   /**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

HeartbeatThread執行緒在initScheduledTasks方法中被呼叫

 private void initScheduledTasks() {
   ...
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
...

initScheduledTasks是在DiscoveryClient建構函式初始化過程中被呼叫。

通過register方法和renew方法的分析,相信大家已經摸索出eureka程式碼的套路,其他的方法,大家可以自己去深入理解了。

參考資料