深入理解Eureka獲取註冊資訊(七)
Eureka-Client獲取資訊
啟動獲取
在客戶端應用啟動時,初始化DiscoverClient的時候,會主動去獲取一次註冊資訊
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { // ...省略N多程式碼 // 如果fetch-registry = true , 則去Eureka Server拉取註冊資訊
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfigif (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 如果所有的Eureka Server都不可用,那麼從備用的服務裡面去取資料 fetchRegistryFromBackup(); } // ...省略N多程式碼 // 設定定時器 initScheduledTasks(); }config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { // ...省略N多程式碼 // 如果fetch-registry = true , 則去Eureka Server拉取註冊資訊 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 如果所有的Eureka Server都不可用,那麼從備用的服務裡面去取資料fetchRegistryFromBackup(); } // ...省略N多程式碼 // 設定定時器 initScheduledTasks(); }
shouldFetchRegistry : 預設truefetchRegistry : 獲取註冊資訊,此處傳入的是false, 表面上看是不需要全量獲取,但是應用第一次啟動的時候,本地快取為空,所以還是會全量獲取的。
PS: 啟動時獲取註冊資訊為全量。
定時器獲取
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ...省略N多程式碼
}
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ...省略N多程式碼
}
registryFetchIntervalSeconds : 預設值為30秒 ,每30秒重新整理一次、
定時器初始化,,直接看CacheRefreshThread()
class CacheRefreshThread implements Runnable {
public void run() {
// 重新整理註冊資訊
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// 判斷是否需要全量獲取 , remoteRegionsModified 這個值來決定
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
// 獲取註冊資訊
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
// 日誌輸出 , 省略。。
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
CacheRefreshThread implements Runnable {
public void run() {
// 重新整理註冊資訊
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// 判斷是否需要全量獲取 , remoteRegionsModified 這個值來決定
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
// 獲取註冊資訊
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
// 日誌輸出 , 省略。。
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
由上可以看到,系統在啟動的時候,初始化了一個定時器,每30秒一次,用來重新整理本地快取資訊。
獲取註冊資訊
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 獲取本地的快取資訊 , 也就是客戶端註冊資訊
Applications applications = getApplications();
// 判斷是否需要全量獲取
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
// 增量獲取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 釋出快取重新整理事件。
onCacheRefreshed();
// 更新本地應用的狀態
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 獲取本地的快取資訊 , 也就是客戶端註冊資訊
Applications applications = getApplications();
// 判斷是否需要全量獲取
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
// 增量獲取
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 釋出快取重新整理事件。
onCacheRefreshed();
// 更新本地應用的狀態
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
clientConfig.shouldDisableDelta() : 是否禁用增量獲取, 預設為false , 如果禁用了的話,那就只能是全量獲取了,總要獲取一下不是。clientConfig.getRegistryRefreshSingleVipAddress() : 當這個屬性不為空的時候,則全量獲取。具體作用不是很清楚(苦笑)forceFullRegistryFetch : 傳入的引數,表示是否需要全量獲取applications : 本地註冊資訊的快取,如果本地快取為空,或者裡面的版本號為-1,那麼就需要全量獲取,表示首次載入時。onCacheRefreshed() : 釋出快取重新整理的事件,使用者可以自定義是否監聽這個事件,比如需要將註冊資訊的變化落庫。
全量獲取
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 傳送HTTP請求,去服務端獲取註冊資訊
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 設定到本地快取裡面去
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 傳送HTTP請求,去服務端獲取註冊資訊
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 設定到本地快取裡面去
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
在傳送HTTP請求去服務端獲取註冊資訊之前,做了一個判斷, 判斷registryRefreshSingleVipAddress是否為空, 這個欄位
表示的意思是 “此客戶端只對一個單一的VIP登錄檔的資訊感興趣”,預設為null , 也就是說如果客戶端只對其中一個VIP 感興趣那麼就只獲取這一個, 否則全部獲取
this.filterAndShuffle(apps) : 是否需要過濾客戶端資訊的狀態,如果設定了eureka.shouldFilterOnlyUpInstances = true 這個屬性的話,
客戶端獲取到註冊資訊之後,會剔除非UP狀態的客戶端資訊。
localRegionApps.set(this.filterAndShuffle(apps)) : 將註冊資訊設定到本地記憶體裡面去,使用AtomicReference型別做儲存、
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
增量獲取
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 增量獲取資訊
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
// 增量獲取為空,則全量返回
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
//這裡設定原子鎖的原因是怕某次排程網路請求時間過長,導致同一時間有多執行緒拉取到增量資訊併發修改
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 將獲取到的增量資訊和本地快取資訊合併。
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
// 釋放鎖
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// ( HashCode 不一致|| 列印增量和全量的差異 )= true 重新去全量獲取
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 增量獲取資訊
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
// 增量獲取為空,則全量返回
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
//這裡設定原子鎖的原因是怕某次排程網路請求時間過長,導致同一時間有多執行緒拉取到增量資訊併發修改
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 將獲取到的增量資訊和本地快取資訊合併。
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
// 釋放鎖
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// ( HashCode 不一致|| 列印增量和全量的差異 )= true 重新去全量獲取
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
步驟說明:
1.發起http請求,將服務端的客戶端變化的資訊拉取過來,如: register, cancle, modify 有過這些操作的資料
2.上鎖,防止某次排程網路請求時間過長,導致同一時間有多執行緒拉取到增量資訊併發修改
3.將請求過來的增量資料和本地的資料做合併
4.計算hashCode
5.如果hashCode不一致,或者clientConfig.shouldLogDeltaDiff() = true 的話,則又會去服務端發起一次全量獲取
合併資料
private void updateDelta(Applications delta) {
int deltaCount = 0;
// 迴圈拉取過來的應用列表
for (Application app : delta.getRegisteredApplications()) {
// 迴圈這個應用裡面的例項(有多個例項代表是叢集的。)
for (InstanceInfo instance : app.getInstances()) {
// 獲取本地的註冊應用列表
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {// 新增事件
//根據AppName 獲取本地的資料,看這個應用是否存在
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
// 不存在,則加到本地的應用裡面去
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
// 為本地這個應用新增這個例項
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改事件
//根據AppName 獲取本地的資料,看這個應用是否存在
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
// 不存在,則加到本地的應用裡面去