1. 程式人生 > >SpringCloud原始碼:Ribbon負載均衡分析

SpringCloud原始碼:Ribbon負載均衡分析

    本文主要分析 SpringCloud 中 Ribbon 負載均衡流程和原理。

    SpringCloud版本為:Edgware.RELEASE。

一.時序圖

    和以前一樣,先把圖貼出來,直觀一點:

 

二.原始碼分析

    我們先從 contoller 裡面看如何使用 Ribbon 來負載均衡的:

  @GetMapping("/user/{id}")
  public User findById(@PathVariable Long id) {
    //return this.restTemplate.getForObject("http://192.168.2.110:8001/" + id, User.class);
    return this.restTemplate.getForObject("http://microservice-provider-user/" + id, User.class);
  }

    可以看到,在整合 Ribbon 之前,請求Rest是通過IP埠直接請求。整合 Ribbon 之後,請求的地址改成了 http://applicationName ,官方取名為虛擬主機名(virtual host name),當 Ribbon 和 Eureka 配合使用時,會自動將虛擬主機名轉換為微服務的實際IP地址,我們後面會分析這個過程。

    首先從 RestTemplate#getForObject 開始:

    public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws 
    RestClientException {
        // 設定RequestCallback的返回型別responseType
		RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        // 例項化responseExtractor
		HttpMessageConverterExtractor<T> responseExtractor =
				new HttpMessageConverterExtractor<T>(responseType, getMessageConverters(), logger);
		return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
	}

    接著執行到 RestTemplate 的 execute,主要是拼裝URI,如果存在baseUrl,則插入baseUrl。拼裝好後,進入實際"執行"請求的地方:

    public <T> T execute(String url, HttpMethod method, RequestCallback requestCallback,
			ResponseExtractor<T> responseExtractor, Object... uriVariables) throws 
    RestClientException {
        // 組裝 URI
		URI expanded = getUriTemplateHandler().expand(url, uriVariables);
        // 實際"執行"的地方
		return doExecute(expanded, method, requestCallback, responseExtractor);
	}

    RestTemplate#doExecute,實際“執行”請求的地方,執行超過後,返回 response:

    protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,
			ResponseExtractor<T> responseExtractor) throws RestClientException {
		ClientHttpResponse response = null;
		try {
            // 例項化請求,url為請求地址,method為GET
			ClientHttpRequest request = createRequest(url, method);
			if (requestCallback != null) {// AcceptHeaderRequestCallback
				requestCallback.doWithRequest(request);
			}
            // 實際處理請求的地方
			response = request.execute();
            // 處理response,記錄日誌和呼叫對應的錯誤處理器
			handleResponse(url, method, response);
			if (responseExtractor != null) {// 使用前面的HttpMessageConverterExtractor從Response裡面抽取資料
				return responseExtractor.extractData(response);
			}
			else {
				return null;
			}
		}
		......
	}

    到了請求被執行的地方,AbstractClientHttpRequest#execute,跳轉到 executeInternal:

    public final ClientHttpResponse execute() throws IOException {
        // 斷言請求還沒被執行過
		assertNotExecuted();
        // 跳轉到 executeInternal 處理請求
		ClientHttpResponse result = executeInternal(this.headers);
        // 標記請求為已經執行過
		this.executed = true;
		return result;
	}

    AbstractBufferingClientHttpRequest#executeInternal,AbstractBufferingClientHttpRequest是AbstractClientHttpRequest的子抽象類,作用是快取output,使用了一個位元組陣列輸出流:

    protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
        // 首次進來,bytes內容為空
		byte[] bytes = this.bufferedOutput.toByteArray();
		if (headers.getContentLength() < 0) {
            // 設定 Content-Length 為 1
			headers.setContentLength(bytes.length);
		}
        // 模板方法,跳轉到了實現類中的方法,InterceptingClientHttpRequest#executeInternal
		ClientHttpResponse result = executeInternal(headers, bytes);
        // 拿到結果後,清空快取
		this.bufferedOutput = null;
		return result;
	}

    executeInternal是一個抽象方法,跳轉到了其實現類InterceptingClientHttpRequest#executeInternal:

    protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) 
    throws IOException {
		InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
        // InterceptingRequestExecution是一個內部類
		return requestExecution.execute(this, bufferedOutput);
	}
    // 內部類,負責執行請求
    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
		private final Iterator<ClientHttpRequestInterceptor> iterator;// 所有HttpRequest的攔截器
		public InterceptingRequestExecution() {
			this.iterator = interceptors.iterator();
		}
		@Override
		public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
			if (this.iterator.hasNext()) {// 如果還有下一個攔截器,則執行其攔截方法
                // 這裡的攔截器是 MetricsClientHttpRequestInterceptor,對應"metrics"資訊,記錄執行時間和結果
				ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                // 執行攔截方法
				return nextInterceptor.intercept(request, body, this);
			}
			......
		}
	}

    跳轉到了攔截器 MetricsClientHttpRequestInterceptor 的攔截方法:

    public ClientHttpResponse intercept(HttpRequest request, byte[] body,
			ClientHttpRequestExecution execution) throws IOException {
		long startTime = System.nanoTime();// 標記開始執行時間
		ClientHttpResponse response = null;
		try {
            // 傳入請求和Body,處理執行,又跳轉回 InterceptingRequestExecution
			response = execution.execute(request, body);
			return response;
		}
		finally {// 在執行完方法,返回response之前,記錄一下執行的資訊
			SmallTagMap.Builder builder = SmallTagMap.builder();
			for (MetricsTagProvider tagProvider : tagProviders) {
				for (Map.Entry<String, String> tag : tagProvider
						.clientHttpRequestTags(request, response).entrySet()) {
					builder.add(Tags.newTag(tag.getKey(), tag.getValue()));
				}
			}
			MonitorConfig.Builder monitorConfigBuilder = MonitorConfig
					.builder(metricName);
			monitorConfigBuilder.withTags(builder);
            // 記錄執行時間
			servoMonitorCache.getTimer(monitorConfigBuilder.build())
					.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
		}
	}

    又跳轉回了 InterceptingRequestExecution,下個攔截器是 - LoadBalancerInterceptor,最後的Boss,呼叫LoadBalancerClient完成請求的負載。

    LoadBalancerInterceptor#intercept,主角登場了,終於等到你,還好沒放棄:

    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
        // 獲取原始URI
		final URI originalUri = request.getURI();
        // 獲取請求中的服務名字,也就是所謂的"虛擬主機名"
		String serviceName = originalUri.getHost();
        // 轉由 LoadBalancerClient 處理請求
		return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}

    下面空一行先停下來休息一下,然後看看,負載均衡是怎樣實現的。

 

    LoadBalancerInterceptor這裡預設的實現是 RibbonLoadBalancerClient,Ribbon是Netflix釋出的負載均衡器。

    RibbonLoadBalancerClient#execute,負載均衡演算法選出實際處理請求的Server:

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        // serviceId即前面的虛擬主機名 "microservice-provider-user",獲取loadBalancer
        //這裡獲取到的是 DynamicServerListLoadBalancer
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        // 基於loadBalancer,選擇實際處理請求的服務提供者
		Server server = getServer(loadBalancer);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));
		return execute(serviceId, ribbonServer, request);
	}

    RibbonLoadBalancerClient#getServer,轉交 loadBalancer 選擇Server:

    protected Server getServer(ILoadBalancer loadBalancer) {
		if (loadBalancer == null) {
			return null;
		}
        // 由 loadBalancer 完成選Server的重任,這裡的 key 是預設值 "default"
		return loadBalancer.chooseServer("default"); // TODO: better handling of key
	}

    chooseServer也是一個抽象的模板方法,最後的實現是 ZoneAwareLoadBalancer#chooseServer:

    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            // 到了 BaseLoadBalancer的chooseServer
            return super.chooseServer(key);
        }
        ......
    }

    BaseLoadBalancer#chooseServer,轉交規則來選擇Server:

    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        // counter是一個計數器,起始值是"0",下面自增一次,變為 "1"
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                // 預設的挑選規則是 "ZoneAvoidanceRule"
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

    PredicateBasedRule是ZoneAvoidanceRule的父類。PredicateBasedRule#choose,可以看到,基礎負載規則採用的是"RoundRobin"即輪詢的方式:

    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

    下面分析"輪詢"的過程,AbstractServerPredicate#chooseRoundRobinAfterFiltering,傳入Server列表的長度,自增取模實現:

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        // 首先拿到所有"合格"的Server
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        // 在 incrementAndGetModulo 中獲取,"自增取模"
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

    AbstractServerPredicate#incrementAndGetModulo,維護了一個nextIndex,記錄下次請求的下標:

    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();// 第一次 current是"0"
            int next = (current + 1) % modulo;// current+1對size取模,作為下次的"current"
            // "0" == current,則以原子方式將該值設定為 next
            if (nextIndex.compareAndSet(current, next))
                return current;
        }
    }

    最後,我們通過控制檯來驗證一下請求是不是"輪詢"分配到服務提供者的,本地啟動了8000和8001兩個Provider:

2018-12-09 18:55:30.794  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8001
2018-12-09 18:55:33.196  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8000
2018-12-09 18:55:34.713  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8001
2018-12-09 18:55:34.975  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8000
2018-12-09 18:55:35.175  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8001
2018-12-09 18:55:35.351  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8000
2018-12-09 18:55:35.534  c.i.c.s.user.controller.MovieController  : microservice-provider-user:192.168.2.117:8001

    可以看到,請求確實被輪詢給兩個Provider處理的。

 

    至此,我們完成了 SpringCloud 中 Ribbon 負載均衡的過程,知道了預設採用的是"輪詢"的方式,實現是通過維護一個index,自增後取模來作為下標挑選實際響應請求的Server。除了輪詢的方式,還有隨機等演算法。感興趣可以按照類似思路分析測試一下。