【一起學原始碼-微服務】Feign 原始碼三:Feign結合Ribbon實現負載均衡的原理分析
前言
前情回顧
上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。
動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target
(裡面是serviceName等資訊)和dispatcher
(map資料結構,key是請求的方法名,方法引數等,value是SynchronousMethodHandler
)。
如下圖所示:
本講目錄
這一講主要是Feign與Ribbon結合實現負載均衡的原理分析。
說明
原創不易,如若轉載 請標明來源!
部落格地址:一枝花算不算浪漫
原始碼分析
Feign結合Ribbon實現負載均衡原理
通過前面的分析,我們可以直接來看下SynchronousMethodHandler
中的程式碼:
final class SynchronousMethodHandler implements MethodHandler { @Override public Object invoke(Object[] argv) throws Throwable { // 生成請求類似於:GET /sayHello/wangmeng HTTP/1.1 RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } } Object executeAndDecode(RequestTemplate template) throws Throwable { // 構建request物件:GET http://serviceA/sayHello/wangmeng HTTP/1.1 Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { // 這個client就是之前構建的LoadBalancerFeignClient,options是超時時間 response = client.execute(request, options); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); // 下面邏輯都是構建返回值response boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { return decode(response); } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { return decode(response); } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } } }
這裡主要是構建request資料,然後通過request和options去通過LoadBalancerFeignClient.execute()
方法去獲得返回值。我們可以接著看client端的呼叫:
public class LoadBalancerFeignClient implements Client { @Override public Response execute(Request request, Request.Options options) throws IOException { try { // asUri: http://serviceA/sayHello/wangmeng URI asUri = URI.create(request.url()); // clientName:serviceA String clientName = asUri.getHost(); // uriWithoutHost: http://sayHello/wangmeng URI uriWithoutHost = cleanUrl(request.url(), clientName); // 這裡ribbonRequest:GET http:///sayHello/wangmeng HTTP/1.1 FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); // 這裡面config只有兩個超時時間,一個是connectTimeout:5000,一個是readTimeout:5000 IClientConfig requestConfig = getClientConfig(options, clientName); // 真正執行負載均衡的地方 return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } } }
接著我們看下lbClient()
和executeWithLoadBalancer()
public class LoadBalancerFeignClient implements Client {
private FeignLoadBalancer lbClient(String clientName) {
return this.lbClientFactory.create(clientName);
}
}
public class CachingSpringLoadBalancerFactory {
public FeignLoadBalancer create(String clientName) {
if (this.cache.containsKey(clientName)) {
return this.cache.get(clientName);
}
IClientConfig config = this.factory.getClientConfig(clientName);
// 獲取Ribbon ILoadBalancer資訊
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
}
這裡是獲取了ILoadBalancer
資料,裡面包含了Ribbon獲取的serviceA所有服務節點資訊。
這裡已經獲取到ILoadBalancer
,裡面包含serviceA伺服器所有節點請求host資訊。接下來就是從中負載均衡選擇一個節點資訊host出來。
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
}
public class LoadBalancerCommand<T> {
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
}
// 省略程式碼...
// selectServer是真正執行負載均衡的邏輯
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// loadBalancerURI是http:///sayHello/wangmeng, loadBalancerKey為null
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
}
public class LoadBalancerContext implements IClientConfigAware {
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// 獲取到ILoadBalancer,這裡面有IRule的資訊及服務節點所有資訊
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
// 這裡就執行真正的chooseServer的邏輯了。預設的rule為ZoneAvoidanceZule
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
}
// 省略程式碼
}
}
}
上面程式碼已經很清晰了,這裡就是真正的通過ribbon的 rule.chooseServer()
負載均衡地選擇了一個服務節點呼叫,debug如下:
到了這裡feign與ribbon的分析也就結束了,返回請求url資訊,然後得到response結果:
總結
上面已經分析了Feign與Ribbon的整合,最終還是落到Ribbon中的ILoadBalancer中,使用最後使用IRule去選擇對應的server資料。
下一講 會畫一個很大的圖,包含Feign、Ribbon、Eureka關聯的圖,裡面會畫出每個元件的細節及依賴關係。也算是學習至今的一個總複習了。
申明
本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫
相關推薦
【一起學原始碼-微服務】Feign 原始碼三:Feign結合Ribbon實現負載均衡的原理分析
前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d
【一起學原始碼-微服務】Hystrix 原始碼一:Hystrix基礎原理與Demo搭建
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe
【一起學原始碼-微服務】Hystrix 原始碼二:Hystrix核心流程:Hystix非降級邏輯流程梳理
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr
【一起學原始碼-微服務】Hystrix 原始碼三:Hystrix核心流程:Hystix降級、熔斷等原理剖析
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講解了Hystrix在配合feign的過程中,一個正常的請求邏輯該怎樣處理,這裡涉及到執行緒池的建立、HystrixCommand的執行等邏輯。 如圖所示:
【一起學原始碼-微服務】Feign 原始碼一:原始碼初探,通過Demo Debug Feign原始碼
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Feign 原始碼二:Feign動態代理構造過程
前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr
【一起學原始碼-微服務】Nexflix Eureka 原始碼二:EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取
前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /
【一起學原始碼-微服務】Nexflix Eureka 原始碼三:EurekaServer啟動之EurekaServer上下文EurekaClient建立
前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli
【一起學原始碼-微服務】Nexflix Eureka 原始碼六:在眼花繚亂的程式碼中,EurekaClient是如何註冊的?
前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka
【一起學原始碼-微服務】Nexflix Eureka 原始碼七:通過單元測試來Debug Eureka註冊過程
前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。
【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!
前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI
【一起學原始碼-微服務】Nexflix Eureka 原始碼九:服務續約原始碼分析
前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著
【一起學原始碼-微服務】Nexflix Eureka 原始碼十:服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?
前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註
【一起學原始碼-微服務】Nexflix Eureka 原始碼十一:EurekaServer自我保護機制竟然有這麼多Bug?
前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下
【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析
前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收
【一起學原始碼-微服務】Nexflix Eureka 原始碼十三:Eureka原始碼解讀完結撒花篇~!
前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的
【一起學原始碼-微服務】Ribbon 原始碼一:Ribbon概念理解及Demo除錯
前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類
【一起學原始碼-微服務】Ribbon 原始碼二:通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析
前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的
【一起學原始碼-微服務】Ribbon 原始碼三:Ribbon與Eureka整合原理分析
前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下
【一起學原始碼-微服務】Ribbon 原始碼四:進一步探究Ribbon的IRule和IPing
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進