Eureka 原始碼分析之 Eureka Client
文章首發於微信公眾號《程式設計師果果》
地址:https://mp.weixin.qq.com/s/47TUd96NMz67_PCDyvyInQ
簡介
Eureka是一種基於REST(Representational State Transfer)的服務,主要用於AWS雲,用於定位服務,以實現中間層伺服器的負載平衡和故障轉移。我們將此服務稱為Eureka Server。Eureka還附帶了一個基於Java的客戶端元件Eureka Client,它使與服務的互動變得更加容易。客戶端還有一個內建的負載均衡器,可以進行基本的迴圈負載均衡。在Netflix,一個更復雜的負載均衡器包含Eureka基於流量,資源使用,錯誤條件等多種因素提供加權負載平衡,以提供卓越的彈性。
從圖可以看出在這個體系中,有2個角色,即Eureka Server和Eureka Client。而Eureka Client又分為Applicaton Service和Application Client,即服務提供者何服務消費者。 每個區域有一個Eureka叢集,並且每個區域至少有一個eureka伺服器可以處理區域故障,以防伺服器癱瘓。
Eureka Client 在 Eureka Server 註冊,然後Eureka Client 每30秒向 Eureka Server 傳送一次心跳來更新一次租約。如果 Eureka Client 無法續訂租約幾次,則會在大約90秒內 Eureka Server 將其從伺服器登錄檔中刪除。註冊資訊和續訂將複製到群集中的所有 Eureka Server 節點。來自任何區域的客戶端都可以查詢登錄檔資訊(每30秒發生一次)根據這些登錄檔資訊,Application Client 可以遠端呼叫 Applicaton Service 來消費服務。
原始碼分析
基於Spring Cloud的 eureka 的 client 端在啟動類上加上 @EnableDiscoveryClient 註解,就可以 用 NetFlix 提供的 Eureka client。下面就以 @EnableDiscoveryClient 為入口,進行Eureka Client的原始碼分析。
@EnableDiscoveryClient,通過原始碼可以發現這是一個標記註解:
/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { boolean autoRegister() default true; }
通過註釋可以知道 @EnableDiscoveryClient 註解是用來 啟用 DiscoveryClient 的實現,DiscoveryClient介面程式碼如下:
public interface DiscoveryClient {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
介面說明:
- description():實現描述。
- getInstances(String serviceId):獲取與特定serviceId關聯的所有ServiceInstance
- getServices():返回所有已知的服務ID
DiscoveryClient 介面的實現結構圖:
- EurekaDiscoveryClient:Eureka 的 DiscoveryClient 實現類。
- CompositeDiscoveryClient:用於排序可用客戶端的發現客戶端的順序。
- NoopDiscoveryClient:什麼都不做的服務發現實現類,已經被廢棄。
- SimpleDiscoveryClient:簡單的服務發現實現類 SimpleDiscoveryClient,具體的服務例項從 SimpleDiscoveryProperties 配置中獲取。
EurekaDiscoveryClient 是 Eureka 對 DiscoveryClient介面的實現,程式碼如下:
public class EurekaDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
private final EurekaInstanceConfig config;
private final EurekaClient eurekaClient;
public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
this.config = config;
this.eurekaClient = eurekaClient;
}
@Override
public String description() {
return DESCRIPTION;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
@Override
public List<String> getServices() {
Applications applications = this.eurekaClient.getApplications();
if (applications == null) {
return Collections.emptyList();
}
List<Application> registered = applications.getRegisteredApplications();
List<String> names = new ArrayList<>();
for (Application app : registered) {
if (app.getInstances().isEmpty()) {
continue;
}
names.add(app.getName().toLowerCase());
}
return names;
}
}
從程式碼可以看出 EurekaDiscoveryClient 實現了 DiscoveryClient 定義的規範介面,真正實現發現服務的是 EurekaClient,下面是 EurekaClient 依賴結構圖:
EurekaClient 唯一實現類 DiscoveryClient,DiscoveryClient 的構造方法如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//省略...
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//省略...
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
//省略...
}
可以看到這個構造方法裡面,主要做了下面幾件事:
- 建立了scheduler定時任務的執行緒池,heartbeatExecutor心跳檢查執行緒池(服務續約),cacheRefreshExecutor(服務獲取)
- 然後initScheduledTasks()開啟上面三個執行緒池,往上面3個執行緒池分別新增相應任務。然後建立了一個instanceInfoReplicator(Runnable任務),然後呼叫InstanceInfoReplicator.start方法,把這個任務放進上面scheduler定時任務執行緒池(服務註冊並更新)。
服務註冊(Registry)
上面說了,initScheduledTasks()方法中呼叫了InstanceInfoReplicator.start()方法,InstanceInfoReplicator 的 run()方法程式碼如下:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
發現 InstanceInfoReplicator的run方法,run方法中會呼叫DiscoveryClient的register方法。DiscoveryClient 的 register方法 程式碼如下:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
最終又經過一系列呼叫,最終會呼叫到AbstractJerseyEurekaHttpClient的register方法,程式碼如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
可以看到最終通過http rest請求eureka server端,把應用自身的InstanceInfo例項註冊給server端,我們再來完整梳理一下服務註冊流程:
Renew服務續約
服務續約和服務註冊非常類似,HeartbeatThread 程式碼如下:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
//更新最後一次心跳的時間
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
// 續約的主方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
傳送心跳 ,請求eureka server 端 ,如果介面返回值為404,就是說服務不存在,那麼重新走註冊流程。
如果介面返回值為404,就是說不存在,從來沒有註冊過,那麼重新走註冊流程。
服務續約流程如下圖:
服務下線cancel
在服務shutdown的時候,需要及時通知服務端把自己剔除,以避免客戶端呼叫已經下線的服務,shutdown()方法程式碼如下:
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 關閉各種定時任務
// 關閉重新整理例項資訊/註冊的定時任務
// 關閉續約(心跳)的定時任務
// 關閉獲取註冊資訊的定時任務
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
// 更改例項狀態,使例項不再接收流量
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//向EurekaServer端傳送下線請求
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
先關閉各種定時任務,然後向eureka server 傳送服務下線通知。服務下線流程如下圖:
參考
https://github.com/Netflix/eureka/wiki
http://yeming.me/2016/12/01/eureka1/
http://blog.didispace.com/springcloud-sourcecode-eureka/
https://www.jianshu.com/p/71a8bdbf03f4
歡迎掃碼或微信搜尋公眾號《程式設計師果果》關注我,關注有驚喜~
相關推薦
Eureka原始碼分析之Eureka Client獲取例項資訊流程
下方是Eureka Client從Eureka Server獲取例項資訊的總體流程圖,後面會詳細介紹每個步驟。 Eureka Client在剛啟動的時候會從Eureka Server全量獲取一次註冊資訊,同時初始化Eureka Client本地例項資訊快取定時更新任務,預設30s一次
Eureka原始碼分析之eureka-client
很多人對Eureka進行了原始碼分析,但是主要著重於Eureka server端的原始碼分析,本篇博文著重分析eureka-client的分析,先上圖看類結構(首先宣告以下內容為本人淺顯見解,如有不妥請指正批評)eureka客戶端核心jar包為以上截圖,其中核心的類包
Eureka 原始碼分析之 Eureka Client
文章首發於微信公眾號《程式設計師果果》 地址:https://mp.weixin.qq.com/s/47TUd96NMz67_PCDyvyInQ 簡介 Eureka是一種基於REST(Representational State Transfer)的服務,主要用於AWS雲,用於定位服務,以實現中間層伺服器
Eureka 原始碼分析之 Eureka Server
文章首發於公眾號《程式設計師果果》 地址 : https://mp.weixin.qq.com/s/FfJrAGQuHyVrsedtbr0Ihw 簡介 上一篇文章《Eureka 原始碼分析之 Eureka Client》 通過原始碼知道 ,eureka Client 是通過 http rest來 與 eu
Spring Cloud原始碼分析之Eureka篇第六章:服務註冊
在文章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》的分析中,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledT
Spring Cloud原始碼分析之Eureka篇第八章:服務註冊名稱的來歷
關於服務註冊名稱 服務註冊名稱,是指Eureka client註冊到Eureka server時,用於標記自己身份的標誌,舉例說明,以下是個簡單的Eureka client配置: server: port: 8082 spring: applicatio
Spring Cloud原始碼分析之Eureka篇第五章:更新服務列表
在上一章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTask
Android7.0原始碼分析之Binder——Client分析
Binder Client分析,咋一看,就那麼四個關鍵方法:getService()、addService()、checkService()、listServices()。四個方法原理都差不多,以下僅
Eureka原始碼分析:Eureka不會進行二次Replication的原因
Eureka不會進行二次同步註冊資訊 Eureka會將本例項中的註冊資訊同步到它的peer節點上,這是我們都知道的特性。然而,當peer節點收到同步資料後,並不會將這些資訊再同步到它自己的peer節點上。如果有A, B, C三個例項,A配B, B配C, C配A
Netflix Eureka原始碼分析(7)——eureka client啟動環境初始化流程
eureka-examples,有一個類,ExampleEurekaClient,就是一個自帶的例子,如果是一個eureka服務,一定會有一個eureka client,服務例項啟動的時候,一定會啟動eureka client,eureka client去向eureka se
eureka原始碼解讀之服務端
剖析eureka服務端啟動流程 服務端啟動類-入口處 @EnableEurekaServer @SpringBootApplication public class EurekaServerApplication { public static void main(Strin
Netflix Eureka原始碼分析(3)——listener(EurekaBootStrap監聽類)分析
web.xml中的listener: <listener> <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class> </listener>
Netflix Eureka原始碼分析(13)——eureka server的登錄檔多級快取過期機制:主動過期+定時過期+被動過期
(1)主動過期 readWriteCacheMap,讀寫快取 有新的服務例項發生註冊、下線、故障的時候,就會去重新整理readWriteCacheMap 比如說現在有一個服務A,ServiceA,有一個新的服務例項,Instance010來註冊了,註冊完了之後,其實必須
Netflix Eureka原始碼分析(18)——eureka server網路故障時的的自我保護機制原始碼剖析
假如說,20個服務例項,結果在1分鐘之內,只有8個服務例項保持了心跳 --> eureka server是應該將剩餘的12個沒有心跳的服務例項都摘除嗎? 這個時候很可能說的是,eureka server自己網路故障了,那些服務沒問題的。只不過eureka server
Netflix Eureka原始碼分析(19)——eureka server叢集機制原始碼剖析:登錄檔同步以及高可用
(1)eureka core的BootStrap裡面,有一塊程式碼,是PeerEurekaNodes的程式碼,其實是在處理eureka server叢集資訊的初始化,會執行PeerEurekaNodes.start()方法 public class EurekaBootSt
Eureka原始碼分析-環境構建篇
承接上一篇文章《什麼是微服務》,我們已經對微服務有一定了解,並且以一個實現了註冊中心、服務提供者及消費者的例子作為文章的結尾,而本篇文章,主要介紹Eureka原始碼的環境構建及示例除錯。
【SpringCloud Eureka原始碼】從Eureka Client發起註冊請求到Eureka Server處理的整個服務註冊過程(上)
目錄 Eureka Client啟動並呼叫Eureka Server的註冊介面 Spring Cloud Eureka的自動配置 @EnableDiscoveryClient EurekaDiscoveryClientConfiguration
深入理解Spring cloud原始碼篇之Eureka原始碼
1.eureka功能分析 首先,eureka在springcloud中充當服務註冊功能,相當於dubbo+zk裡面得zk,但是比zk要簡單得多,zk可以做得東西太多了,包括分散式鎖,分散式佇列都是基於zk裡面得四種節點加watch機制通過長連線來
DDPush開源推送框架原始碼分析之Client到DDPush(UDP模式)
在前一篇文章中我們主要分析了AppServer是如何連線到DDPush,並向DDPush推送訊息,還沒有看過的朋友請移步DDPush開源推送框架原始碼分析之APPServer到DDPush。 本篇文章主要講解Client(客戶端)如何連線到DDPush,並向DDPush傳送
SpringCloud Eureka 原始碼分析
目錄 SpringCloud-Eureka 整合專案 spring-cloud-netflix-eureka-server spring-cloud-netflix-eureka-client Eureka架構圖