SpringCloud 原始碼系列(2)—— 註冊中心 Eureka(中)
五、服務註冊
1、例項資訊註冊器初始化
服務註冊的程式碼位置不容易發現,我們看 DiscoveryClient 初始化排程任務的這個方法,這段程式碼會去初始化一個例項資訊複製器 InstanceInfoReplicator,這個複製器就包含了例項的註冊(明明是註冊卻叫 Replicator 感覺怪怪的)。
① DiscoveryClient 初始化排程器的流程
- 先基於 DiscoveryClient、InstanceInfo 構造 InstanceInfoReplicator,然後還有兩個引數為例項資訊複製間隔時間(預設30秒)、併發的數量(預設為2)。
- 建立了一個例項狀態變更監聽器,並註冊到 ApplicationInfoManager。當例項狀態變更時,就會觸發這個監聽器,並呼叫 InstanceInfoReplicator 的 onDemandUpdate 方法。
- 啟動 InstanceInfoReplicator,預設延遲40秒,也就是說服務啟動可能40秒之後才會註冊到註冊中心。
1 private void initScheduledTasks() { 2 // 省略定時重新整理登錄檔的任務... 3 4 if (clientConfig.shouldRegisterWithEureka()) { 5 // 省略定時心跳的任務... 6 7 // 例項資訊複製器,用於定時更新自己狀態,並向註冊中心註冊 8 instanceInfoReplicator = new InstanceInfoReplicator( 9 this, 10 instanceInfo, 11 clientConfig.getInstanceInfoReplicationIntervalSeconds(), 12 2); // burstSize 13 14 // 例項狀態變更的監聽器 15 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { 16 @Override 17 public String getId() { 18 return "statusChangeListener"; 19 } 20 21 @Override 22 public void notify(StatusChangeEvent statusChangeEvent) { 23 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { 24 logger.error("Saw local status change event {}", statusChangeEvent); 25 } else { 26 logger.info("Saw local status change event {}", statusChangeEvent); 27 } 28 instanceInfoReplicator.onDemandUpdate(); 29 } 30 }; 31 32 // 向 ApplicationInfoManager 註冊狀態變更監聽器 33 if (clientConfig.shouldOnDemandUpdateStatusChange()) { 34 applicationInfoManager.registerStatusChangeListener(statusChangeListener); 35 } 36 37 // 啟動例項資訊複製器,預設延遲時間40秒 38 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); 39 } else { 40 logger.info("Not registering with Eureka server per configuration"); 41 } 42 }
② InstanceInfoReplicator 的構造方法
- 建立了一個單執行緒的排程器
- 設定 started 為 false
- 建立了以分鐘為單位的限流器,每分鐘預設最多隻能排程4次
1 InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { 2 this.discoveryClient = discoveryClient; 3 this.instanceInfo = instanceInfo; 4 // 單執行緒的排程器 5 this.scheduler = Executors.newScheduledThreadPool(1, 6 new ThreadFactoryBuilder() 7 .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") 8 .setDaemon(true) 9 .build()); 10 11 this.scheduledPeriodicRef = new AtomicReference<Future>(); 12 // started 設定為 false 13 this.started = new AtomicBoolean(false); 14 // 以分鐘為單位的限流器 15 this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); 16 // 間隔時間,預設為30秒 17 this.replicationIntervalSeconds = replicationIntervalSeconds; 18 this.burstSize = burstSize; 19 // 允許每分鐘更新的頻率 60 * 2 / 30 = 4 20 this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; 21 logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); 22 }
③ 啟動 InstanceInfoReplicator
- 將 started 設定為 true,代表已經啟動了
- 呼叫 instanceInfo.setIsDirty() 方法,將例項設定為 dirty=true,並更新了最後一次設定 dirty 的時間戳
- InstanceInfoReplicator 實現了 Runnable,它本身被當成任務來排程,然後延遲40秒開始排程當前任務,並將 Future 放到本地變數中
1 public void start(int initialDelayMs) { 2 // 啟動時 started 設定為 true 3 if (started.compareAndSet(false, true)) { 4 // 設定為 dirty,便於下一次心跳時同步到 eureka server 5 instanceInfo.setIsDirty(); 6 // 延遲40秒後開始排程當前任務 7 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); 8 // 將 Future 放到本地變數中 9 scheduledPeriodicRef.set(next); 10 } 11 } 12 13 /////// 14 15 public synchronized void setIsDirty() { 16 isInstanceInfoDirty = true; 17 lastDirtyTimestamp = System.currentTimeMillis(); 18 }
2、客戶端例項註冊
① 實現註冊的run方法
接著看 InstanceInfoReplicator 的 run 方法,這個方法就是完成註冊的核心位置。
- 首先會更新例項的資訊,如果有變更就會設定 dirty=true
- 如過是 dirty 的,就會呼叫 DiscoveryClient 的 register 方法註冊例項
- 例項註冊後,就把 dirty 設定為 false
- 最後在 finally 中繼續下一次的排程,預設是每隔30秒排程一次,注意他這裡是把排程結果 Future 放到本地變數中
1 public void run() { 2 try { 3 // 更新本地例項資訊,如果例項資訊有變更,則 dirty=true 4 discoveryClient.refreshInstanceInfo(); 5 6 // 設定為 dirty 時的時間戳 7 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); 8 if (dirtyTimestamp != null) { 9 // 註冊例項 10 discoveryClient.register(); 11 // 設定 dirty=false 12 instanceInfo.unsetIsDirty(dirtyTimestamp); 13 } 14 } catch (Throwable t) { 15 logger.warn("There was a problem with the instance info replicator", t); 16 } finally { 17 // 30秒之後再排程 18 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); 19 scheduledPeriodicRef.set(next); 20 } 21 }
② 例項資訊重新整理
再來細看下 refreshInstanceInfo 重新整理例項資訊的方法:
- 首先重新整理了資料中心的資訊
- 然後重新整理續約資訊,主要就是將 EurekaClientConfig 的續約配置與本地的續約配置做對比,如果變更了就重新建立續約資訊,並設定例項為dirty。這種情況一般就是執行期間動態更新例項的配置,然後重新註冊例項資訊。
- 接著使用健康檢查器檢查例項健康狀況,從 getHealthCheckHandler 這段程式碼進去不難發現,我們可以自定義健康檢查器,例如當本地的一些資源未建立成功、某些核心執行緒池down了就認為例項不可用,這個時候就可以自定義健康檢查器。如果沒有自定義健康檢查器,那就直接返回例項當前的狀態。我們可以實現 HealthCheckHandler 介面自定義健康檢查器。
- 最後就會呼叫 ApplicationInfoManager 的 setInstanceStatus 設定例項狀態,會判斷如果狀態發生變更,就會發出狀態變更的通知,這樣就會觸發前面定義的狀態變更監聽器,然後呼叫 InstanceInfoReplicator 的 onDemandUpdate 方法。
1 void refreshInstanceInfo() { 2 // 如果有必要,就更新資料中心的資訊 3 applicationInfoManager.refreshDataCenterInfoIfRequired(); 4 // 如果有必要,就更新續約資訊,比如動態更新了配置檔案,這時就更新續約資訊 LeaseInfo,並將例項設定為 dirty 5 applicationInfoManager.refreshLeaseInfoIfRequired(); 6 7 InstanceStatus status; 8 try { 9 // 用監控檢查器檢查例項的狀態 10 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); 11 } catch (Exception e) { 12 logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); 13 status = InstanceStatus.DOWN; 14 } 15 16 if (null != status) { 17 // 設定例項狀態,例項狀態變了會觸發狀態變更的監聽器 18 applicationInfoManager.setInstanceStatus(status); 19 } 20 } 21 22 ///////////////////////////////// 23 24 public void refreshLeaseInfoIfRequired() { 25 // 當前例項續約資訊 26 LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); 27 if (leaseInfo == null) { 28 return; 29 } 30 // 從配置中獲取續約資訊 31 int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); 32 int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); 33 // 如果續約資訊變了,就重新建立續約資訊,並設定例項為 dirty 34 if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { 35 LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() 36 .setRenewalIntervalInSecs(currentLeaseRenewal) 37 .setDurationInSecs(currentLeaseDuration) 38 .build(); 39 instanceInfo.setLeaseInfo(newLeaseInfo); 40 instanceInfo.setIsDirty(); 41 } 42 } 43 44 ///////////////////////////////// 45 46 public HealthCheckHandler getHealthCheckHandler() { 47 HealthCheckHandler healthCheckHandler = this.healthCheckHandlerRef.get(); 48 if (healthCheckHandler == null) { 49 // 可以自定義 HealthCheckHandler 實現健康檢查 50 if (null != healthCheckHandlerProvider) { 51 healthCheckHandler = healthCheckHandlerProvider.get(); 52 } else if (null != healthCheckCallbackProvider) { 53 // 可以自定義 HealthCheckCallback 實現健康檢查,HealthCheckCallback 已過期,建議使用 HealthCheckHandler 54 healthCheckHandler = new HealthCheckCallbackToHandlerBridge(healthCheckCallbackProvider.get()); 55 } 56 57 if (null == healthCheckHandler) { 58 // 沒有自定義的就是用預設的橋接類 59 healthCheckHandler = new HealthCheckCallbackToHandlerBridge(null); 60 } 61 this.healthCheckHandlerRef.compareAndSet(null, healthCheckHandler); 62 } 63 64 return this.healthCheckHandlerRef.get(); 65 } 66 67 ////////////////////////////////////// 68 69 public synchronized void setInstanceStatus(InstanceStatus status) { 70 InstanceStatus next = instanceStatusMapper.map(status); 71 if (next == null) { 72 return; 73 } 74 75 // 如果狀態變更了,才會返回之前的狀態,然後觸發狀態變更監聽器 76 InstanceStatus prev = instanceInfo.setStatus(next); 77 if (prev != null) { 78 for (StatusChangeListener listener : listeners.values()) { 79 try { 80 listener.notify(new StatusChangeEvent(prev, next)); 81 } catch (Exception e) { 82 logger.warn("failed to notify listener: {}", listener.getId(), e); 83 } 84 } 85 } 86 }View Code
③ 向 eureka server 註冊
在 run 方法裡呼叫了 discoveryClient.register() 方法實現了客戶端例項向註冊中心的註冊,進入到 register 方法可以看到,他就是使用前面構造的 EurekaTransport 來發起遠端呼叫。
一層層進去,很容易發現就是呼叫了 eureka-server 的 POST /apps/{appName} 介面,後面我們就從 eureka-core 中找這個介面就可以找到註冊中心實現服務註冊的入口了。
1 boolean register() throws Throwable { 2 logger.info(PREFIX + "{}: registering service...", appPathIdentifier); 3 EurekaHttpResponse<Void> httpResponse; 4 try { 5 // registrationClient => JerseyReplicationClient 6 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); 7 } catch (Exception e) { 8 logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); 9 throw e; 10 } 11 if (logger.isInfoEnabled()) { 12 logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); 13 } 14 return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); 15 } 16 17 ///////////////////////////////// 18 19 public EurekaHttpResponse<Void> register(InstanceInfo info) { 20 // 呼叫的是 POST apps/{appName} 介面 21 String urlPath = "apps/" + info.getAppName(); 22 ClientResponse response = null; 23 try { 24 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); 25 addExtraHeaders(resourceBuilder); 26 response = resourceBuilder 27 .header("Accept-Encoding", "gzip") 28 .type(MediaType.APPLICATION_JSON_TYPE) 29 .accept(MediaType.APPLICATION_JSON) 30 // post 方法 31 .post(ClientResponse.class, info); 32 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); 33 } finally { 34 if (logger.isDebugEnabled()) { 35 logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), 36 response == null ? "N/A" : response.getStatus()); 37 } 38 if (response != null) { 39 response.close(); 40 } 41 } 42 }View Code
④ 註冊中心設定例項狀態為已啟動
再回想下注冊中心的初始化流程,在最後呼叫 openForTraffic 方法時,最後也會呼叫 ApplicationInfoManager 的 setInstanceStatus 方法,將例項狀態設定為已啟動,這個時候就會觸發客戶端註冊到註冊中心的動作。
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
⑤ 完成監聽例項變更的方法
狀態變更器會呼叫 onDemandUpdate 方法來完成例項狀態變更後的邏輯。
- 它這裡一個是用到了限流器來限制每分鐘這個方法只能被呼叫4次,即避免了頻繁的註冊行為
- 然後在排程時,它會從本地變數中取出上一次排程的 Future,如果任務還沒執行完,它會直接取消掉
- 最後就是呼叫 run 方法,完成服務的註冊
1 public boolean onDemandUpdate() { 2 // 限流控制 3 if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { 4 if (!scheduler.isShutdown()) { 5 scheduler.submit(new Runnable() { 6 @Override 7 public void run() { 8 logger.debug("Executing on-demand update of local InstanceInfo"); 9 10 // 如果上一次的任務還沒有執行完,直接取消掉,然後執行註冊的任務 11 Future latestPeriodic = scheduledPeriodicRef.get(); 12 if (latestPeriodic != null && !latestPeriodic.isDone()) { 13 logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); 14 latestPeriodic.cancel(false); 15 } 16 17 InstanceInfoReplicator.this.run(); 18 } 19 }); 20 return true; 21 } else { 22 logger.warn("Ignoring onDemand update due to stopped scheduler"); 23 return false; 24 } 25 } else { 26 logger.warn("Ignoring onDemand update due to rate limiter"); 27 return false; 28 } 29 }View Code
⑥ 限流器
最後簡單看下限流器 RateLimiter 的設計:
- 從它的註釋中可以看出,eureka 的 RateLimiter 是基於令牌桶演算法實現的限流器
- acquire 方法有兩個引數:
- burstSize:允許以突發方式進入系統的最大請求數
- averageRate:設定的時間視窗內允許進入的請求數
1 /** 2 * Rate limiter implementation is based on token bucket algorithm. There are two parameters: 3 * <ul> 4 * <li> 5 * burst size - maximum number of requests allowed into the system as a burst 6 * </li> 7 * <li> 8 * average rate - expected number of requests per second (RateLimiters using MINUTES is also supported) 9 * </li> 10 * </ul> 11 * 12 * @author Tomasz Bak 13 */ 14 public class RateLimiter { 15 16 private final long rateToMsConversion; 17 18 private final AtomicInteger consumedTokens = new AtomicInteger(); 19 private final AtomicLong lastRefillTime = new AtomicLong(0); 20 21 @Deprecated 22 public RateLimiter() { 23 this(TimeUnit.SECONDS); 24 } 25 26 public RateLimiter(TimeUnit averageRateUnit) { 27 switch (averageRateUnit) { 28 case SECONDS: 29 rateToMsConversion = 1000; 30 break; 31 case MINUTES: 32 rateToMsConversion = 60 * 1000; 33 break; 34 default: 35 throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported"); 36 } 37 } 38 39 public boolean acquire(int burstSize, long averageRate) { 40 return acquire(burstSize, averageRate, System.currentTimeMillis()); 41 } 42 43 public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) { 44 if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go 45 return true; 46 } 47 48 refillToken(burstSize, averageRate, currentTimeMillis); 49 return consumeToken(burstSize); 50 } 51 52 private void refillToken(int burstSize, long averageRate, long currentTimeMillis) { 53 // 上一次填充 token 的時間 54 long refillTime = lastRefillTime.get(); 55 // 時間差 56 long timeDelta = currentTimeMillis - refillTime; 57 // 固定生成令牌的速率,即每分鐘4次 58 // 例如剛好間隔15秒進來一個請求,就是 15000 * 4 / 60000 = 1,newTokens 代表間隔了多少次,如果等於0,說明間隔不足15秒 59 long newTokens = timeDelta * averageRate / rateToMsConversion; 60 if (newTokens > 0) { 61 long newRefillTime = refillTime == 0 62 ? currentTimeMillis 63 // 注意這裡不是直接設定的當前時間戳,而是根據 newTokens 重新計算的,因為有可能同一週期內同時有多個請求進來,這樣可以保持一個固定的週期 64 : refillTime + newTokens * rateToMsConversion / averageRate; 65 if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) { 66 while (true) { 67 // 調整令牌的數量 68 int currentLevel = consumedTokens.get(); 69 int adjustedLevel = Math.min(currentLevel, burstSize); 70 // currentLevel 可能為2,重置為了 0 或 1 71 int newLevel = (int) Math.max(0, adjustedLevel - newTokens); 72 if (consumedTokens.compareAndSet(currentLevel, newLevel)) { 73 return; 74 } 75 } 76 } 77 } 78 } 79 80 private boolean consumeToken(int burstSize) { 81 while (true) { 82 int currentLevel = consumedTokens.get(); 83 // 突發數量為2,也就是允許15秒內最多有兩次請求進來 84 if (currentLevel >= burstSize) { 85 return false; 86 } 87 if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) { 88 return true; 89 } 90 } 91 } 92 93 public void reset() { 94 consumedTokens.set(0); 95 lastRefillTime.set(0); 96 } 97 }View Code
3、Eureka Server 接收註冊請求
① 找到例項註冊的API入口
從前面的分析中,我們知道服務端註冊的API是 POST /apps/{appName},由於 eureka 是基於 jersey 來通訊的,想找到API入口還是有點費勁的,至少沒有 springmvc 那麼容易。
先看 ApplicationsResource 這個類,可以找到 getApplicationResource 這個方法的路徑是符合 /apps/{appName} 這個規則的。然後可以看到它裡面建立了 ApplicationResource,再進入到這個類裡面,就可以找到 @Post 標註的 addInstance 方法,這就是註冊的入口了。可以看到它是呼叫了登錄檔的 register 方法來註冊例項的。
1 @Path("/{version}/apps") 2 @Produces({"application/xml", "application/json"}) 3 public class ApplicationsResource { 4 private final EurekaServerConfig serverConfig; 5 private final PeerAwareInstanceRegistry registry; 6 private final ResponseCache responseCache; 7 8 // 符合規則 /apps/{appName} 9 @Path("{appId}") 10 public ApplicationResource getApplicationResource( 11 @PathParam("version") String version, 12 @PathParam("appId") String appId) { 13 CurrentRequestVersion.set(Version.toEnum(version)); 14 try { 15 // 真正的入口 16 return new ApplicationResource(appId, serverConfig, registry); 17 } finally { 18 CurrentRequestVersion.remove(); 19 } 20 } 21 } 22 23 ///////////////////////////////// 24 25 @Produces({"application/xml", "application/json"}) 26 public class ApplicationResource { 27 28 private final PeerAwareInstanceRegistry registry; 29 30 @POST 31 @Consumes({"application/json", "application/xml"}) 32 public Response addInstance(InstanceInfo info, 33 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { 34 logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); 35 36 registry.register(info, "true".equals(isReplication)); 37 return Response.status(204).build(); // 204 to be backwards compatible 38 } 39 }
addInstance 介面有兩個引數:
- InstanceInfo:服務例項,主要有兩塊資料:
- 基本資訊:主機名、IP地址、埠號、URL地址
- 租約資訊:保持心跳的間隔時間、最近心跳的時間、服務註冊的時間、服務啟動的時間
- isReplication:這個引數是從請求頭中取的,表示是否是在同步 server 節點的例項。在叢集模式下,因為客戶端例項註冊到註冊中心後,會同步到其它 server節點,所以如果是eureka-server之間同步資訊,這個引數就為 true,避免迴圈同步。
② 例項註冊
進入到登錄檔的 register 方法,可以看到主要就是呼叫父類的 register 方法註冊例項,然後同步到 eureka server 叢集中的其它 server 節點。叢集同步放到後面來看,現在只需要知道註冊例項時會同步到其它server節點即可。
1 @Override 2 public void register(final InstanceInfo info, final boolean isReplication) { 3 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; 4 // 如果例項中沒有周期的配置,就設定為預設的 90 秒 5 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { 6 leaseDuration = info.getLeaseInfo().getDurationInSecs(); 7 } 8 // 註冊例項 9 super.register(info, leaseDuration, isReplication); 10 // 複製到叢集其它 server 節點 11 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); 12 }
接著看父類的註冊方法,它的主要流程如下:
- 首先可以看到eureka server儲存登錄檔(registry)的資料結構是 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>,key 就是服務名稱,value 就是對應的例項,因為一個服務可能會部署多個例項。
- 根據服務名稱從登錄檔拿到例項表,然後根據例項ID拿到例項的租約資訊 Lease<InstanceInfo>
- 如果租約資訊存在,說明已經註冊過相同的例項了,然後就對比已存在例項和新註冊例項的最後更新時間,如果新註冊的是舊的,就替換為已存在的例項來完成註冊
- 如果租約資訊不存在,說明是一個新註冊的例項,這時會更新兩個閾值:
- 期望續約的客戶端數量 +1
- 每分鐘續約次數的閾值,如果低於這個值,說明有很多客戶端沒有傳送心跳,這時eureka就認為可能網路出問題了,就會有另一些機制,這個後面再說
- 然後就根據註冊的例項資訊和續約週期建立新的租約,並放入登錄檔中去
- 接著根據當前時間戳、服務名稱、例項ID封裝一個 Pair,然後放入到最近註冊的佇列中 recentRegisteredQueue,先記住這個佇列就行了
- 根據例項的 overriddenStatus 判斷,不為空的話,可能就只是要更新例項的狀態,這個時候就會只變更例項的狀態,而不會改變 dirty
- 然後是設定了例項的啟動時間戳,設定了例項的 ActionType 為 ADDED
- 將租約加入到最近變更的佇列 recentlyChangedQueue,先記住這個佇列
- 最後一步失效快取,一步步進去可以發現,主要就是將讀寫快取 readWriteCacheMap 中與這個例項相關的快取失效掉,這個快取後面分析抓取登錄檔的時候再來細看
1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { 2 read.lock(); 3 try { 4 // registry => ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 5 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); 6 REGISTER.increment(isReplication); 7 if (gMap == null) { 8 // 初次註冊時,建立一個 ConcurrentHashMap,key 為 appName 9 final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); 10 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); 11 if (gMap == null) { 12 gMap = gNewMap; 13 } 14 } 15 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); 16 // Retain the last dirty timestamp without overwriting it, if there is already a lease 17 if (existingLease != null && (existingLease.getHolder() != null)) { 18 // 已存在的例項的最後更新時間 19 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); 20 // 新註冊的例項的最後更新時間 21 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); 22 logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 23 24 // 如果存在的例項比新註冊儘量的例項後更新,就直接把新註冊的例項設定為已存在的例項 25 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + 27 " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 28 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 29 registrant = existingLease.getHolder(); 30 } 31 } else { 32 // 新註冊時,續約資訊不存在 33 synchronized (lock) { 34 if (this.expectedNumberOfClientsSendingRenews > 0) { 35 // Since the client wants to register it, increase the number of clients sending renews 36 // 期望續約的客戶端數量 + 1 37 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; 38 // 更新每分鐘續約請求次數的閥值,這個閥值在後面很多地方都會用到 39 updateRenewsPerMinThreshold(); 40 } 41 } 42 logger.debug("No previous lease information found; it is new registration"); 43 } 44 // 建立新的續約 45 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); 46 if (existingLease != null) { 47 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 48 } 49 gMap.put(registrant.getId(), lease); 50 // 放入最近註冊的佇列 51 recentRegisteredQueue.add(new Pair<Long, String>( 52 System.currentTimeMillis(), 53 registrant.getAppName() + "(" + registrant.getId() + ")")); 54 // 覆蓋狀態 55 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 56 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " 57 + "overrides", registrant.getOverriddenStatus(), registrant.getId()); 58 if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { 59 logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 60 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 61 } 62 } 63 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 64 if (overriddenStatusFromMap != null) { 65 logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 66 registrant.setOverriddenStatus(overriddenStatusFromMap); 67 } 68 69 // Set the status based on the overridden status rules 70 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); 71 // 僅僅是變更例項狀態,不會設定為 dirty 72 registrant.setStatusWithoutDirty(overriddenInstanceStatus); 73 74 // If the lease is registered with UP status, set lease service up timestamp 75 if (InstanceStatus.UP.equals(registrant.getStatus())) { 76 // UP 時設定 Lease 的時間戳 77 lease.serviceUp(); 78 } 79 // 設定動作是 ADDED,這個在後面會做 switch 判斷 80 registrant.setActionType(ActionType.ADDED); 81 // 新增到最近變更的佇列 82 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); 83 // 設定最後更新時間 84 registrant.setLastUpdatedTimestamp(); 85 // 失效快取 86 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 87 logger.info("Registered instance {}/{} with status {} (replication={})", 88 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); 89 } finally { 90 read.unlock(); 91 } 92 }View Code
更新每分鐘續約次數的閾值:
1 protected void updateRenewsPerMinThreshold() { 2 // 每分鐘續約閾值 = 期望續約的客戶端數量 * (60 / 續約間隔時間) * 續約百分比 3 // 例如,一共註冊了 10 個例項,那麼期望續約的客戶端數量為 10,間隔時間預設為 30秒,就是每個客戶端應該每30秒傳送一次心跳,續約百分比預設為 0.85 4 // 每分鐘續約次數閾值 = 10 * (60.0 / 30) * 0.85 = 17,也就是說每分鐘至少要接收到 17 此續約請求 5 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews 6 * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) 7 * serverConfig.getRenewalPercentThreshold()); 8 }
這就是登錄檔 registry 快取服務例項資訊的結構,可以看出 eureka 是基於記憶體來組織登錄檔的,使用的是 ConcurrentHashMap 來保證多執行緒併發安全。
4、Eureka Server 控制檯
前面已經將服務例項註冊上去了,現在來看下 eureka server 的控制檯頁面是怎麼獲取這些資料的。
前面已經分析過 eureka-server 的 web.xml 中配置了歡迎頁為 status.jsp ,這就是控制檯的頁面。
從 status.jsp 可以看出,其實就是從 EurekaServerContext 上下文獲取登錄檔,然後讀取登錄檔註冊的服務例項,然後遍歷展示到表格中。
1 <%@ page language="java" import="java.util.*,java.util.Map.Entry,com.netflix.discovery.shared.Pair, 2 com.netflix.discovery.shared.*,com.netflix.eureka.util.*,com.netflix.appinfo.InstanceInfo.*, 3 com.netflix.appinfo.DataCenterInfo.*,com.netflix.appinfo.AmazonInfo.MetaDataKey,com.netflix.eureka.resources.*, 4 com.netflix.eureka.*,com.netflix.appinfo.*,com.netflix.eureka.util.StatusUtil" pageEncoding="UTF-8" %> 5 <% 6 String path = request.getContextPath(); 7 String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/"; 8 %> 9 10 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> 11 12 <html> 13 <head> 14 <base href="<%=basePath%>"> 15 16 <title>Eureka</title> 17 <link rel="stylesheet" type="text/css" href="./css/main.css"> 18 <script type="text/javascript" src="./js/jquery-1.11.1.js" ></script> 19 <script type="text/javascript" src="./js/jquery.dataTables.js" ></script> 20 <script type="text/javascript" > 21 $(document).ready(function() { 22 $('table.stripeable tr:odd').addClass('odd'); 23 $('table.stripeable tr:even').addClass('even'); 24 $('#instances thead th').each(function () { 25 var title = $('#instances thead th').eq($(this).index()).text(); 26 $(this).html(title + '</br><input type="text" placeholder="Search ' + title + '" />'); 27 }); 28 // DataTable 29 var table = $('#instances').DataTable({"paging": false, "bInfo": false, "sDom": 'ltipr', "bSort": false}); 30 // Apply the search 31 table.columns().eq(0).each(function (colIdx) { 32 $('input', table.column(colIdx).header()).on('keyup change', function () { 33 table.column(colIdx).search(this.value).draw(); 34 }); 35 }); 36 }); 37 </script> 38 </head> 39 40 <body id="one"> 41 <jsp:include page="header.jsp" /> 42 <jsp:include page="navbar.jsp" /> 43 <div id="content"> 44 <div class="sectionTitle">Instances currently registered with Eureka</div> 45 <table id='instances' class="stripeable"> 46 <thead><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></thead> 47 <tfoot><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></tfoot> 48 <tbody> 49 <% 50 // 獲取 eureka server 上下文 EurekaServerContext 51 EurekaServerContext serverContext = (EurekaServerContext) pageContext.getServletContext() 52 .getAttribute(EurekaServerContext.class.getName()); 53 // 從上下文中取出登錄檔, 54 for(Application app : serverContext.getRegistry().getSortedApplications()) { 55 out.print("<tr><td><b>" + app.getName() + "</b></td>"); 56 Map<String, Integer> amiCounts = new HashMap<String, Integer>(); 57 Map<InstanceStatus,List<Pair<String, String>>> instancesByStatus = 58 new HashMap<InstanceStatus, List<Pair<String,String>>>(); 59 Map<String,Integer> zoneCounts = new HashMap<String, Integer>(); 60 61 for(InstanceInfo info : app.getInstances()){ 62 String id = info.getId(); 63 String url = info.getStatusPageUrl(); 64 InstanceStatus status = info.getStatus(); 65 String ami = "n/a"; 66 String zone = ""; 67 if(info.getDataCenterInfo().getName() == Name.Amazon){ 68 AmazonInfo dcInfo = (AmazonInfo)info.getDataCenterInfo(); 69 ami = dcInfo.get(MetaDataKey.amiId); 70 zone = dcInfo.get(MetaDataKey.availabilityZone); 71 } 72 73 Integer count = amiCounts.get(ami); 74 if(count != null){ 75 amiCounts.put(ami, Integer.valueOf(count.intValue()+1)); 76 }else { 77 amiCounts.put(ami, Integer.valueOf(1)); 78 } 79 80 count = zoneCounts.get(zone); 81 if(count != null){ 82 zoneCounts.put(zone, Integer.valueOf(count.intValue()+1)); 83 }else { 84 zoneCounts.put(zone, Integer.valueOf(1)); 85 } 86 List<Pair<String, String>> list = instancesByStatus.get(status); 87 88 if(list == null){ 89 list = new ArrayList<Pair<String,String>>(); 90 instancesByStatus.put(status, list); 91 } 92 list.add(new Pair<String, String>(id, url)); 93 } 94 StringBuilder buf = new StringBuilder(); 95 for (Iterator<Entry<String, Integer>> iter = 96 amiCounts.entrySet().iterator(); iter.hasNext();) { 97 Entry<String, Integer> entry = iter.next(); 98 buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append("), "); 99 } 100 out.println("<td>" + buf.toString() + "</td>"); 101 buf = new StringBuilder(); 102 for (Iterator<Entry<String, Integer>> iter = 103 zoneCounts.entrySet().iterator(); iter.hasNext();) { 104 Entry<String, Integer> entry = iter.next(); 105 buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append("), "); 106 } 107 out.println("<td>" + buf.toString() + "</td>"); 108 buf = new StringBuilder(); 109 for (Iterator<Entry<InstanceStatus, List<Pair<String,String>>>> iter = 110 instancesByStatus.entrySet().iterator(); iter.hasNext();) { 111 Entry<InstanceStatus, List<Pair<String,String>>> entry = iter.next(); 112 List<Pair<String, String>> value = entry.getValue(); 113 InstanceStatus status = entry.getKey(); 114 if(status != InstanceStatus.UP){ 115 buf.append("<font color=red size=+1><b>"); 116 } 117 buf.append("<b>").append(status.name()).append("</b> (").append(value.size()).append(") - "); 118 if(status != InstanceStatus.UP){ 119 buf.append("</font></b>"); 120 } 121 122 for(Pair<String,String> p : value) { 123 String id = p.first(); 124 String url = p.second(); 125 if(url != null && url.startsWith("http")){ 126 buf.append("<a href=\"").append(url).append("\">"); 127 }else { 128 url = null; 129 } 130 buf.append(id); 131 if(url != null){ 132 buf.append("</a>"); 133 } 134 buf.append(", "); 135 } 136 } 137 out.println("<td>" + buf.toString() + "</td></tr>"); 138 } 139 %> 140 </tbody> 141 </table> 142 </div> 143 <div> 144 <div class="sectionTitle">General Info</div> 145 <table id='generalInfo' class="stripeable"> 146 <tr><th>Name</th><th>Value</th></tr> 147 <% 148 StatusInfo statusInfo = (new StatusUtil(serverContext)).getStatusInfo(); 149 Map<String,String> genMap = statusInfo.getGeneralStats(); 150 for (Map.Entry<String,String> entry : genMap.entrySet()) { 151 out.print("<tr>"); 152 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"); 153 out.print("</tr>"); 154 } 155 Map<String,String> appMap = statusInfo.getApplicationStats(); 156 for (Map.Entry<String,String> entry : appMap.entrySet()) { 157 out.print("<tr>"); 158 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"); 159 out.print("</tr>"); 160 } 161 %> 162 </table> 163 </div> 164 <div> 165 <div class="sectionTitle">Instance Info</div> 166 <table id='instanceInfo' class="stripeable"> 167 <tr><th>Name</th><th>Value</th></tr> 168 <% 169 InstanceInfo instanceInfo = statusInfo.getInstanceInfo(); 170 Map<String,String> instanceMap = new HashMap<String,String>(); 171 instanceMap.put("ipAddr", instanceInfo.getIPAddr()); 172 instanceMap.put("status", instanceInfo.getStatus().toString()); 173 if(instanceInfo.getDataCenterInfo().getName() == DataCenterInfo.Name.Amazon) { 174 AmazonInfo info = (AmazonInfo) instanceInfo.getDataCenterInfo(); 175 instanceMap.put("availability-zone", info.get(AmazonInfo.MetaDataKey.availabilityZone)); 176 instanceMap.put("public-ipv4", info.get(AmazonInfo.MetaDataKey.publicIpv4)); 177 instanceMap.put("instance-id", info.get(AmazonInfo.MetaDataKey.instanceId)); 178 instanceMap.put("public-hostname", info.get(AmazonInfo.MetaDataKey.publicHostname)); 179 instanceMap.put("ami-id", info.get(AmazonInfo.MetaDataKey.amiId)); 180 instanceMap.put("instance-type", info.get(AmazonInfo.MetaDataKey.instanceType)); 181 } 182 for (Map.Entry<String,String> entry : instanceMap.entrySet()) { 183 out.print("<tr>"); 184 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"); 185 out.print("</tr>"); 186 } 187 %> 188 </table> 189 </div> 190 191 </body> 192 </html>View Code
5、服務註冊的整體流程圖
下面通過一張圖來看看服務例項註冊的整個流程。
六、抓取登錄檔
1、Eureka Client 啟動時全量抓取登錄檔
客戶端啟動初始化 DiscoveryClient 時,其中有段程式碼如下:這一步呼叫 fetchRegistry 就是在啟動時全量抓取登錄檔快取到本地中。
1 if (clientConfig.shouldFetchRegistry()) { 2 try { 3 // 拉取登錄檔:全量抓取和增量抓取 4 boolean primaryFetchRegistryResult = fetchRegistry(false); 5 if (!primaryFetchRegistryResult) { 6 logger.info("Initial registry fetch from primary servers failed"); 7 } 8 boolean backupFetchRegistryResult = true; 9 if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { 10 backupFetchRegistryResult = false; 11 logger.info("Initial registry fetch from backup servers failed"); 12 } 13 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { 14 throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); 15 } 16 } catch (Throwable th) { 17 logger.error("Fetch registry error at startup: {}", th.getMessage()); 18 throw new IllegalStateException(th); 19 } 20 }
進入 fetchRegistry 方法,可以看到,首先獲取本地的 Applications,如果為空就會呼叫 getAndStoreFullRegistry 方法全量抓取登錄檔並快取到本地。
1 private boolean fetchRegistry(boolean forceFullRegistryFetch) { 2 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); 3 4 try { 5 // 獲取本地的應用例項 6 Applications applications = getApplications(); 7 8 if (clientConfig.shouldDisableDelta() 9 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) 10 || forceFullRegistryFetch 11 || (applications == null) 12 || (applications.getRegisteredApplications().size() == 0) 13 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta 14 { 15 // 全量抓取登錄檔 16 getAndStoreFullRegistry(); 17 } else { 18 // 增量更新登錄檔 19 getAndUpdateDelta(applications); 20 } 21 applications.setAppsHashCode(applications.getReconcileHashCode()); 22 logTotalInstances(); 23 } catch (Throwable e) { 24 logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}", 25 appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); 26 return false; 27 } finally { 28 if (tracer != null) { 29 tracer.stop(); 30 } 31 } 32 33 // 發出快取重新整理的通知 34 onCacheRefreshed(); 35 36 // Update remote status based on refreshed data held in the cache 37 updateInstanceRemoteStatus(); 38 39 // registry was fetched successfully, so return true 40 return true; 41 }View Code
進入 getAndStoreFullRegistry 方法可以發現,就是呼叫 GET /apps 介面抓取全量登錄檔,因此等會服務端就從這個入口進去看抓取全量登錄檔的邏輯。登錄檔抓取回來之後,就放到本地變數 localRegionApps 中。
1 private boolean fetchRegistry(boolean forceFullRegistryFetch) { 2 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); 3 4 try { 5 // 獲取本地的應用例項 6 Applications applications = getApplications(); 7 8 if (clientConfig.shouldDisableDelta() 9 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) 10 || forceFullRegistryFetch 11 || (applications == null) 12 || (applications.getRegisteredApplications().size() == 0) 13 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta 14 { 15 // 全量抓取登錄檔 16 getAndStoreFullRegistry(); 17 } else { 18 // 增量更新登錄檔 19 getAndUpdateDelta(applications); 20 } 21 applications.setAppsHashCode(applications.getReconcileHashCode()); 22 logTotalInstances(); 23 } catch (Throwable e) { 24 logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}", 25 appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); 26 return false; 27 } finally { 28 if (tracer != null) { 29 tracer.stop(); 30 } 31 } 32 33 // 發出快取重新整理的通知 34 onCacheRefreshed(); 35 36 // Update remote status based on refreshed data held in the cache 37 updateInstanceRemoteStatus(); 38 39 // registry was fetched successfully, so return true 40 return true; 41 }View Code
2、Eureka Server 登錄檔多級快取機制
① 全量抓取登錄檔的介面
全量抓取登錄檔的介面是 GET /apps,跟找註冊介面是類似的,最終可以找到 ApplicationsResource 的 getContainers 方法就是全量抓取登錄檔的入口。
- 可以看出,我們可以通過請求頭來指定返回 xml 格式還是 json 格式,可以指定是否要壓縮返回等。
- 然後建立了全量快取的 Key
- 接著根據快取的 key 從 responseCache 中全量抓取登錄檔
1 @GET 2 public Response getContainers(@PathParam("version") String version, 3 @HeaderParam(HEADER_ACCEPT) String acceptHeader, 4 @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, 5 @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, 6 @Context UriInfo uriInfo, 7 @Nullable @QueryParam("regions") String regionsStr) { 8 // 省略部分程式碼... 9 10 // JSON 型別 11 KeyType keyType = Key.KeyType.JSON; 12 String returnMediaType = MediaType.APPLICATION_JSON; 13 if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { 14 keyType = Key.KeyType.XML; 15 returnMediaType = MediaType.APPLICATION_XML; 16 } 17 18 // 全量登錄檔的快取key 19 Key cacheKey = new Key(Key.EntityType.Application, 20 ResponseCacheImpl.ALL_APPS, 21 keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions 22 ); 23 24 Response response; 25 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { 26 // 壓縮返回 27 response = Response.ok(responseCache.getGZIP(cacheKey)) 28 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) 29 .header(HEADER_CONTENT_TYPE, returnMediaType) 30 .build(); 31 } else { 32 // 根據快取 key 從 responseCache 獲取全量登錄檔 33 response = Response.ok(responseCache.get(cacheKey)) 34 .build(); 35 } 36 CurrentRequestVersion.remove(); 37 return response; 38 }
② ResponseCache 多級快取讀取
ResponseCache 就是 eureka server 讀取登錄檔的核心元件,它的內部採用了多級快取的機制來快速響應客戶端抓取登錄檔的請求,下面就來看看 ResponseCache。
快取讀取的流程:
- 如果設定了使用只讀快取(預設true),就先從只讀快取 readOnlyCacheMap 中讀取;readOnlyCacheMap 使用 ConcurrentHashMap 實現,ConcurrentHashMap 支援併發訪問,讀取速度很快。
- 如果讀寫快取中沒有,就從讀寫快取 readWriteCacheMap 中讀取,讀取出來後並寫入到只讀快取中;readWriteCacheMap 使用 google guava 的 LoadingCache 實現,LoadingCache 支援在沒有元素的時候使用 CacheLoader 載入元素。
- 如果沒有開啟使用只讀快取,就直接從讀寫快取中獲取。
1 public String get(final Key key) { 2 return get(key, shouldUseReadOnlyResponseCache); 3 } 4 5 //////////////////////////////////////////////////// 6 7 String get(final Key key, boolean useReadOnlyCache) { 8 // => getValue 9 Value payload = getValue(key, useReadOnlyCache); 10 if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { 11 return null; 12 } else { 13 return payload.getPayload(); 14 } 15 } 16 17 //////////////////////////////////////////////////// 18 19 Value getValue(final Key key, boolean useReadOnlyCache) { 20 Value payload = null; 21 try { 22 if (useReadOnlyCache) { 23 // 開啟使用只讀快取,則先從只讀快取讀取 24 // readOnlyCacheMap => ConcurrentHashMap<Key, Value> 25 final Value currentPayload = readOnlyCacheMap.get(key); 26 if (currentPayload != null) { 27 payload = currentPayload; 28 } else { 29 // 只讀快取中沒有,則從讀寫快取中讀取,然後放入只讀快取中 30 // readWriteCacheMap => LoadingCache<Key, Value> 31 payload = readWriteCacheMap.get(key); 32 readOnlyCacheMap.put(key, payload); 33 } 34 } else { 35 // 未開啟只讀快取,就從讀寫快取中讀取 36 payload = readWriteCacheMap.get(key); 37 } 38 } catch (Throwable t) { 39 logger.error("Cannot get value for key : {}", key, t); 40 } 41 return payload; 42 }
③ ResponseCache 初始化
分析 eureka server EurekaBootStrap 啟動初始化時,最後有一步去初始化 eureka server 上下文,它裡面就會去初始化登錄檔,初始化登錄檔的時候就會初始化 ResponseCache,這裡就來分析下這個初始化幹了什麼。
- 主要就是使用 google guava cache 構造了一個讀寫快取 readWriteCacheMap,初始容量為 1000。注意這個讀寫快取的特性:每隔 180 秒定時過期,然後元素不存在的時候就會使用 CacheLoader 從登錄檔中讀取。
- 接著如果配置了使用只讀快取,還會開啟一個定時任務,每隔30秒將讀寫快取 readWriteCacheMap 的資料同步到只讀快取 readOnlyCacheMap。
1 ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { 2 this.serverConfig = serverConfig; 3 this.serverCodecs = serverCodecs; 4 // 是否使用只讀快取,預設為 true 5 this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); 6 // 儲存登錄檔 7 this.registry = registry; 8 // 快取更新間隔時間,預設30秒 9 long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); 10 // 使用 google guava cache 構造一個讀寫快取 11 this.readWriteCacheMap = 12 // 初始容量為1000 13 CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) 14 // 快取的資料在寫入多久後過期,預設180秒,也就是說 readWriteCacheMap 會定時過期 15 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) 16 .removalListener(new RemovalListener<Key, Value>() { 17 @Override 18 public void onRemoval(RemovalNotification<Key, Value> notification) { 19 Key removedKey = notification.getKey(); 20 if (removedKey.hasRegions()) { 21 Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); 22 regionSpecificKeys.remove(cloneWithNoRegions, removedKey); 23 } 24 } 25 }) 26 // 當key對應的元素不存在時,使用定義 CacheLoader 載入元素 27 .build(new CacheLoader<Key, Value>() { 28 @Override 29 public Value load(Key key) throws Exception { 30 if (key.hasRegions()) { 31 Key cloneWithNoRegions = key.cloneWithoutRegions(); 32 regionSpecificKeys.put(cloneWithNoRegions, key); 33 } 34 // 獲取元素 35 Value value = generatePayload(key); 36 return value; 37 } 38 }); 39 40 if (shouldUseReadOnlyResponseCache) { 41 // 如果配置了使用只讀快取,就開啟一個定時任務,定期將 readWriteCacheMap 的資料同步到 readOnlyCacheMap 中 42 // 預設間隔時間是 30 秒 43 timer.schedule(getCacheUpdateTask(), 44 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) 45 + responseCacheUpdateIntervalMs), 46 responseCacheUpdateIntervalMs); 47 } 48 49 try { 50 Monitors.registerObject(this); 51 } catch (Throwable e) { 52 logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); 53 } 54 }
generatePayload 方法:
1 private Value generatePayload(Key key) { 2 Stopwatch tracer = null; 3 try { 4 String payload; 5 switch (key.getEntityType()) { 6 case Application: 7 boolean isRemoteRegionRequested = key.hasRegions(); 8 9 // 獲取所有應用 10 if (ALL_APPS.equals(key.getName())) { 11 if (isRemoteRegionRequested) { 12 tracer = serializeAllAppsWithRemoteRegionTimer.start(); 13 payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); 14 } else { 15 tracer = serializeAllAppsTimer.start(); 16 // 從登錄檔讀取所有服務例項 17 payload = getPayLoad(key, registry.getApplications()); 18 } 19 } 20 // 增量獲取應用 21 else if (ALL_APPS_DELTA.equals(key.getName())) { 22 if (isRemoteRegionRequested) { 23 tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); 24 versionDeltaWithRegions.incrementAndGet(); 25 versionDeltaWithRegionsLegacy.incrementAndGet(); 26 payload = getPayLoad(key, 27 registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); 28 } else { 29 tracer = serializeDeltaAppsTimer.start(); 30 versionDelta.incrementAndGet(); 31 versionDeltaLegacy.incrementAndGet(); 32 payload = getPayLoad(key, registry.getApplicationDeltas()); 33 } 34 } else { 35 tracer = serializeOneApptimer.start(); 36 payload = getPayLoad(key, registry.getApplication(key.getName())); 37 } 38 break; 39 case VIP: 40 case SVIP: 41 tracer = serializeViptimer.start(); 42 payload = getPayLoad(key, getApplicationsForVip(key, registry)); 43 break; 44 default: 45 logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType()); 46 payload = ""; 47 break; 48 } 49 return new Value(payload); 50 } finally { 51 if (tracer != null) { 52 tracer.stop(); 53 } 54 } 55 }View Code
3、Eureka Server 登錄檔多級快取過期機制
這節來總結下 eureka server 登錄檔多級快取的過期時機,其實前面都已經分析過了。
① 主動過期
分析服務註冊時已經說過,服務註冊完成後,呼叫了 invalidateCache 來失效快取,進去可以看到就是將讀寫快取 readWriteCacheMap 中的服務、所有服務、增量服務的快取失效掉。
那這裡就要注意了,如果服務註冊、下線、故障之類的,這裡只是失效了讀寫快取,然後可能要間隔30秒才能同步到只讀快取 readOnlyCacheMap,那麼其它客戶端可能要隔30秒後才能感知到。
1 private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { 2 // invalidate cache 3 responseCache.invalidate(appName, vipAddress, secureVipAddress); 4 }
快取失效:
1 @Override 2 public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { 3 for (Key.KeyType type : Key.KeyType.values()) { 4 for (Version v : Version.values()) { 5 invalidate( 6