1. 程式人生 > >Eureka初始化(1)

Eureka初始化(1)

1. 基於1.X版本的原始碼ExampleEurekaService的main函式啟動。通過配置中介軟體Archaius獲取全域性通用配置例項,例項化MyDataCenterInstanceConfig,父類PropertiesInstanceConfig以及AbstractInstanceConfig,拼接名稱空間以及獲取對應檔案配置例項,返回例項配置instanceConfig

public PropertiesInstanceConfig(String namespace, DataCenterInfo info) {
        super(info);

        this.namespace = namespace.endsWith(".")
                ? namespace
                : namespace + ".";

        appGrpNameFromEnv = ConfigurationManager.getConfigInstance()
                .getString(FALLBACK_APP_GROUP_KEY, Values.UNKNOWN_APPLICATION);

        this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
    }

構造例項資訊instanceInfo,配置有客戶端的心跳時間,失效時間以及本例項的其他資訊。最後把例項配置和例項資訊儲存在應用資訊管理類ApplicationInfoManager中返回。

 @Inject
    public EurekaConfigBasedInstanceInfoProvider(EurekaInstanceConfig config) {
        this.config = config;
    }

    @Override
    public synchronized InstanceInfo get() {
        if (instanceInfo == null) {
            // Build the lease information to be passed to the server based on config
            LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                    .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

            if (vipAddressResolver == null) {
                vipAddressResolver = new Archaius1VipAddressResolver();
            }

            // Builder the instance information to be registered with eureka server
            InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);

            // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname
            String instanceId = config.getInstanceId();
            if (instanceId == null || instanceId.isEmpty()) {
                DataCenterInfo dataCenterInfo = config.getDataCenterInfo();
                if (dataCenterInfo instanceof UniqueIdentifier) {
                    instanceId = ((UniqueIdentifier) dataCenterInfo).getId();
                } else {
                    instanceId = config.getHostName(false);
                }
            }

            String defaultAddress;
            if (config instanceof RefreshableInstanceConfig) {
                // Refresh AWS data center info, and return up to date address
                defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
            } else {
                defaultAddress = config.getHostName(false);
            }

            // fail safe
            if (defaultAddress == null || defaultAddress.isEmpty()) {
                defaultAddress = config.getIpAddress();
            }

            builder.setNamespace(config.getNamespace())
                    .setInstanceId(instanceId)
                    .setAppName(config.getAppname())
                    .setAppGroupName(config.getAppGroupName())
                    .setDataCenterInfo(config.getDataCenterInfo())
                    .setIPAddr(config.getIpAddress())
                    .setHostName(defaultAddress)
                    .setPort(config.getNonSecurePort())
                    .enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
                    .setSecurePort(config.getSecurePort())
                    .enablePort(PortType.SECURE, config.getSecurePortEnabled())
                    .setVIPAddress(config.getVirtualHostName())
                    .setSecureVIPAddress(config.getSecureVirtualHostName())
                    .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
                    .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
                    .setASGName(config.getASGName())
                    .setHealthCheckUrls(config.getHealthCheckUrlPath(),
                            config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());


            // Start off with the STARTING state to avoid traffic
            if (!config.isInstanceEnabledOnit()) {
                InstanceStatus initialStatus = InstanceStatus.STARTING;
                LOG.info("Setting initial instance status as: {}", initialStatus);
                builder.setStatus(initialStatus);
            } else {
                LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise "
                         + "itself as available. You would instead want to control this via a healthcheck handler.",
                         InstanceStatus.UP);
            }

            // Add any user-specific metadata information
            for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
                String key = mapEntry.getKey();
                String value = mapEntry.getValue();
                builder.add(key, value);
            }

            instanceInfo = builder.build();
            instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        }
        return instanceInfo;
    }

2. 初始化DefaultEurekaClientConfig客戶端配置例項,獲取通用配置例項以及DefaultEurekaTransportConfig傳輸配置

 public DefaultEurekaClientConfig(String namespace) {
        this.namespace = namespace.endsWith(".")
                ? namespace
                : namespace + ".";

        this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
        this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance);
    }
public DefaultEurekaTransportConfig(String parentNamespace, DynamicPropertyFactory configInstance) {
        this.namespace = parentNamespace == null
                ? SUB_NAMESPACE
                : (parentNamespace.endsWith(".")
                    ? parentNamespace + SUB_NAMESPACE
                    : parentNamespace + "." + SUB_NAMESPACE);
        this.configInstance = configInstance;
    }

初始化DiscoveryClient核心類,傳遞引數有前面初始化的配置管理和備份註冊提供資訊

 public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            private volatile BackupRegistry backupRegistryInstance;

            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }

                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }

                return backupRegistryInstance;
            }
        });
    }

根據引數AbstractDiscoveryClientOptionalArgs初始化一些健康檢查等配置資訊,應用路徑標識appPathIdentifier,lurl隨機處理器,本地分割槽應用localRegionApps.set(new Applications());Applications物件中的各種成員變數map,雜湊碼,版本增量等,記錄獲取註冊資訊的原子變數fetchRegistryGeneration,遠端分割槽的資訊remoteRegionsToFetch,根絕配置建立有關注冊的監控registryStalenessMonitor和心跳的監控heartbeatStalenessMonitor,否則配置為既不註冊資料也不查詢資料的客戶端。建立週期執行緒池scheduler用來執行心跳和快取的操作,建立心跳和快取重新整理執行緒池heartbeatExecutor,cacheRefreshExecutor

3. 初始化EurekaTransport,執行scheduleServerEndpointTask(eurekaTransport, args);建立TransportClientFactory

// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
        eurekaTransport.transportClientFactory = providedJerseyClient == null
                ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)
                : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);

類Jersey1TransportClientFactories
 public TransportClientFactory newTransportClientFactory(EurekaClientConfig clientConfig,
            Collection<ClientFilter> additionalFilters, InstanceInfo myInstanceInfo, Optional<SSLContext> sslContext,
            Optional<HostnameVerifier> hostnameVerifier) {
        final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create(
                clientConfig,
                additionalFilters,
                myInstanceInfo,
                new EurekaClientIdentity(myInstanceInfo.getIPAddr()),
                sslContext,
                hostnameVerifier
        );
        
        final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory);

        return new TransportClientFactory() {
            @Override
            public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) {
                return metricsFactory.newClient(serviceUrl);
            }

            @Override
            public void shutdown() {
                metricsFactory.shutdown();
                jerseyFactory.shutdown();
            }
        };
    }

Jersey1TransportClientFactories#newTransportClientFactory=>JerseyEurekaHttpClientFactory#create讀取關於http等相關配置並建立對應的http客戶端工廠

public static JerseyEurekaHttpClientFactory create(EurekaClientConfig clientConfig,
                                                       Collection<ClientFilter> additionalFilters,
                                                       InstanceInfo myInstanceInfo,
                                                       AbstractEurekaIdentity clientIdentity,
                                                       Optional<SSLContext> sslContext,
                                                       Optional<HostnameVerifier> hostnameVerifier) {
        boolean useExperimental = "true".equals(clientConfig.getExperimental("JerseyEurekaHttpClientFactory.useNewBuilder"));

        JerseyEurekaHttpClientFactoryBuilder clientBuilder = (useExperimental ? experimentalBuilder() : newBuilder())
                .withAdditionalFilters(additionalFilters)
                .withMyInstanceInfo(myInstanceInfo)
                .withUserAgent("Java-EurekaClient")
                .withClientConfig(clientConfig)
                .withClientIdentity(clientIdentity);
        
        sslContext.ifPresent(clientBuilder::withSSLContext);
        hostnameVerifier.ifPresent(clientBuilder::withHostnameVerifier);

        if ("true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) {
            clientBuilder.withClientName("DiscoveryClient-HTTPClient-System").withSystemSSLConfiguration();
        } else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) {
            clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient")
                    .withProxy(
                            clientConfig.getProxyHost(), Integer.parseInt(clientConfig.getProxyPort()),
                            clientConfig.getProxyUserName(), clientConfig.getProxyPassword()
                    );
        } else {
            clientBuilder.withClientName("DiscoveryClient-HTTPClient");
        }

        return clientBuilder.build();
    }

在JerseyEurekaHttpClientFactoryBuilder#buildLegacy

private JerseyEurekaHttpClientFactory buildLegacy(Map<String, String> additionalHeaders, boolean systemSSL) {
            EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
                    .withClientName(clientName)
                    .withUserAgent("Java-EurekaClient")
                    .withConnectionTimeout(connectionTimeout)
                    .withReadTimeout(readTimeout)
                    .withMaxConnectionsPerHost(maxConnectionsPerHost)
                    .withMaxTotalConnections(maxTotalConnections)
                    .withConnectionIdleTimeout((int) connectionIdleTimeout)
                    .withEncoderWrapper(encoderWrapper)
                    .withDecoderWrapper(decoderWrapper)
                    .withProxy(proxyHost,String.valueOf(proxyPort),proxyUserName,proxyPassword);

            if (systemSSL) {
                clientBuilder.withSystemSSLConfiguration();
            } else if (sslContext != null) {
                clientBuilder.withCustomSSL(sslContext);
            }
            
            if (hostnameVerifier != null) {
                clientBuilder.withHostnameVerifier(hostnameVerifier);
            }

            EurekaJerseyClient jerseyClient = clientBuilder.build();
            ApacheHttpClient4 discoveryApacheClient = jerseyClient.getClient();
            addFilters(discoveryApacheClient);

            return new JerseyEurekaHttpClientFactory(jerseyClient, additionalHeaders);
        }

在EurekaJerseyClientBuilder#build

public EurekaJerseyClient build() {
            MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
            try {
                return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
            } catch (Throwable e) {
                throw new RuntimeException("Cannot create Jersey client ", e);
            }
        }

建立請求協議http和https處理,利用jersey RESTful 框架來實現請求處理,設定編解碼規範,配置其他請求處理屬性。

 MyDefaultApacheHttpClient4Config() {
                MonitoredConnectionManager cm;

                if (systemSSL) {
                    cm = createSystemSslCM();
                } else if (sslContext != null || hostnameVerifier != null || trustStoreFileName != null) {
                    cm = createCustomSslCM();
                } else {
                    cm = createDefaultSslCM();
                }

                if (proxyHost != null) {
                    addProxyConfiguration(cm);
                }

                DiscoveryJerseyProvider discoveryJerseyProvider = new DiscoveryJerseyProvider(encoderWrapper, decoderWrapper);
                getSingletons().add(discoveryJerseyProvider);

                // Common properties to all clients
                cm.setDefaultMaxPerRoute(maxConnectionsPerHost);
                cm.setMaxTotal(maxTotalConnections);
                getProperties().put(ApacheHttpClient4Config.PROPERTY_CONNECTION_MANAGER, cm);

                String fullUserAgentName = (userAgent == null ? clientName : userAgent) + "/v" + buildVersion();
                getProperties().put(CoreProtocolPNames.USER_AGENT, fullUserAgentName);

                // To pin a client to specific server in case redirect happens, we handle redirects directly
                // (see DiscoveryClient.makeRemoteCall methods).
                getProperties().put(PROPERTY_FOLLOW_REDIRECTS, Boolean.FALSE);
                getProperties().put(ClientPNames.HANDLE_REDIRECTS, Boolean.FALSE);
            }

初始化EurekaJerseyClientImpl用來接收請求處理

public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
                                  ClientConfig clientConfig) {
        try {
            jerseyClientConfig = clientConfig;
            apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);
            HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();

            HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
            HttpConnectionParams.setSoTimeout(params, readTimeout);

            this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
        } catch (Throwable e) {
            throw new RuntimeException("Cannot create Jersey client", e);
        }
    }

儲存http處理端,啟動週期執行緒池關閉空閒連線,收集http請求的監控資料

 public ApacheHttpClientConnectionCleaner(ApacheHttpClient4 apacheHttpClient, final long connectionIdleTimeout) {
        this.apacheHttpClient = apacheHttpClient;
        this.eurekaConnCleaner.scheduleWithFixedDelay(
                new Runnable() {
                    @Override
                    public void run() {
                        cleanIdle(connectionIdleTimeout);
                    }
                },
                HTTP_CONNECTION_CLEANER_INTERVAL_MS,
                HTTP_CONNECTION_CLEANER_INTERVAL_MS,
                TimeUnit.MILLISECONDS
        );

        MonitorConfig.Builder monitorConfigBuilder = MonitorConfig.builder("Eureka-Connection-Cleaner-Time");
        executionTimeStats = new BasicTimer(monitorConfigBuilder.build());
        cleanupFailed = new BasicCounter(MonitorConfig.builder("Eureka-Connection-Cleaner-Failure").build());
        try {
            Monitors.registerObject(this);
        } catch (Exception e) {
            logger.error("Unable to register with servo.", e);
        }
    }

給框架Jersey設定過濾器,壓縮資料功能以及設定header,最後儲存在JerseyEurekaHttpClientFactory返回,關於網路請求框架的工廠構造完畢。

private void addFilters(ApacheHttpClient4 discoveryApacheClient) {
            // Add gzip content encoding support
            discoveryApacheClient.addFilter(new GZIPContentEncodingFilter(false));

            // always enable client identity headers
            String ip = myInstanceInfo == null ? null : myInstanceInfo.getIPAddr();
            AbstractEurekaIdentity identity = clientIdentity == null ? new EurekaClientIdentity(ip) : clientIdentity;
            discoveryApacheClient.addFilter(new EurekaIdentityHeaderFilter(identity));

            if (additionalFilters != null) {
                for (ClientFilter filter : additionalFilters) {
                    if (filter != null) {
                        discoveryApacheClient.addFilter(filter);
                    }
                }
            }
        }