1. 程式人生 > >SpringCloud | FeignClient和Ribbon重試機制區別與聯絡

SpringCloud | FeignClient和Ribbon重試機制區別與聯絡

在spring cloud體系專案中,引入的重試機制保證了高可用的同時,也會帶來一些其它的問題,如冪等操作或一些沒必要的重試。 今天就來分別分析一下 FeignClient 和 Ribbon 重試機制的實現原理和區別,主要分為三點: >1)FeignClient重試機制分析 >2)Ribbon重試機制分析 >3)FeignClient和Ribbon重試機制的區別於聯絡

1)FeignClient 重試機制分析:

FeignClient 重試機制的實現原理相對簡單。首先看一下feignClient處理請求的攔截類:SynchronousMethodHandler,看一下該類中的代理方法invoke

 @Override
  public Object invoke(Object[] argv) throws Throwable {
  //生成處理請求模板
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    //獲取重試配置類
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
      //在異常裡執行是否重試方法
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }

上面的預設重試配置Retryer,在其構造方法中,預設的請求次數為5次,如下:

 public Default() {
      this(100, SECONDS.toMillis(1), 5);
    }

判斷是否重試的演算法如下:

public void continueOrPropagate(RetryableException e) {
//重試次數大於最大請求次數,丟擲異常
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
      if (e.retryAfter() != null) {
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
      }
      sleptForMillis += interval;
    }
如果要關閉或者要重寫 feignClient重試機制 的話,可以自定義`feignRetryer`,在方法中不做重試,直接丟擲異常。配置如下:
/**
 * @author zhangshukang
 */
@Configuration
public class FeignConfig {
	@Bean
	Retryer feignRetryer() {
		return new Retryer() {
			@Override
			//在這裡重寫 continueOrPropagate演算法,可自定義處理方式。這裡直接丟擲異常,相當於不重試。
			public void continueOrPropagate(RetryableException e) {
				throw e;
			}
			@Override
			public Retryer clone() {
				return this;
			}
		};
	}
}

2)Ribbon重試機制分析:

首先看一下我們ribbon常用的配置,已經配置用到的地方:

ribbon:
  ReadTimeout: 0
  ConnectTimeout: 10
  MaxAutoRetries: 1
  MaxAutoRetriesNextServer: 2
  OkToRetryOnAllOperations: false

在這裡插入圖片描述

這裡從字面意思可以看出:
retrySameServer:重試相同例項,對應MaxAutoRetries
retryNextServer:重試下一例項,對應MaxAutoRetriesNextServer
retryEnabled:重試所有操作,對應OkToRetryOnAllOperations

這裡宣告一點,關於feignClient如何整合ribbon負載均衡的,之前的部落格已經有完整的分析:
《SpringCloud | Feign如何整合Ribbon進行負載均衡的?》,所以下面就跳過整合部分,直接分析負載均衡模組。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
		//獲取重試機制配置:RequestSpecificRetryHandler,繼續跟進該方法...
        RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
        //這裡很關鍵,很明顯採用了命令模式,ribbon負載均衡的配置在這裡傳給LoadBalancerCommand類
        LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
                .withLoadBalancerContext(this)
                .withRetryHandler(handler)
                .withLoadBalancerURI(request.getUri())
                .build();

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }
	@Override
	public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
			RibbonRequest request, IClientConfig requestConfig) {
			//這裡如果配置了OkToRetryOnAllOperations為true,則所有的請求都進行重試。預設為false
		if (this.clientConfig.get(CommonClientConfigKey.OkToRetryOnAllOperations,
				false)) {
			return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
					requestConfig);
		}
		//如果沒配置的話,如果不是get請求,就關閉重試
		if (!request.toRequest().method().equals("GET")) {
			return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
					requestConfig);
		}
		else {
		//如果是get請求,則開啟重試。
			return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
					requestConfig);
		}
	}

上述程式碼是對請求型別進行區分,哪些重試,哪些不重試。
區別就在於第二個引數,來看一下第二個引數具體哪裡用到了,繼續跟進程式碼如下:

    public boolean isRetriableException(Throwable e, boolean sameServer) {
    //如果手動配置了所有請求都重試,或者get請求時,這裡開啟重試。
        if(this.okToRetryOnAllErrors) {
            return true;
        } else if(e instanceof ClientException) {
            ClientException ce = (ClientException)e;
            return ce.getErrorType() == ErrorType.SERVER_THROTTLED?!sameServer:false;
        } else {
            return this.okToRetryOnConnectErrors && this.isConnectionException(e);
        }
    }
剛剛上面提到了命令模式,屬於RxJava的內容,事件驅動機制,有興趣的可以自行研讀。這裡看一下上面命令模式執行類具體怎麼用的:
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //這兩個變數,上面已經提到了,重試機制的關鍵
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    // 利用RxJava生成一個Observable用於後面的回撥
    Observable<T> o = 
            //選擇具體的server進行呼叫
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    //獲取這個server呼叫監控記錄,用於各種統計和LoadBalanceRule的篩選server處理
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //獲取本次server呼叫的回撥入口,用於重試同一例項的重試回撥
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);

                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

								......省略部分程式碼

                                }
                            });
                    //設定針對同一例項的重試回撥
                    if (maxRetrysSame > 0) 
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    //設定重試下一個例項的回撥    
    if (maxRetrysNext > 0 && server == null) 
        o = o.retry(retryPolicy(maxRetrysNext, false));
    //異常回調
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

上述程式碼典型的RxJava風格。

接下來是關鍵。o為Observable例項,類似於生產者,上面程式碼為Observable回撥邏輯。上面有兩行關鍵的程式碼:

o = o.retry(retryPolicy(maxRetrysSame, true));
o = o.retry(retryPolicy(maxRetrysNext, false));

首先看一下 retryPolicy 方法,這個就是 ribbon 重試演算法的邏輯了,來看一下的實現:

 private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
        return new Func2<Integer, Throwable, Boolean>() {
            @Override
            public Boolean call(Integer tryCount, Throwable e) {
                if (e instanceof AbortExecutionException) {
                    return false;
                }
                //判斷是否繼續重試
                if (tryCount > maxRetrys) {
                    return false;
                }
                
                if (e.getCause() != null && e instanceof RuntimeException) {
                    e = e.getCause();
                }
                //進入異常處理
                return retryHandler.isRetriableException(e, same);
            }
        };
    }

上述程式碼是Ribbon判斷是否重試的實現,根據我們配置的變數次數,進行判斷,有異常則進入異常處理。
整體的重試機制就是將 LoadBalancerCommand 類中 retryPolicy 的重試實現邏輯,傳入RxJava Observable物件的o.retry()方法,該方法接收的引數的就是一個Function:

public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
        return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
    }
最後回過頭看這兩行程式碼,邏輯大致清晰許多,來看一下執行順序:
o = o.retry(retryPolicy(maxRetrysSame, true));
o = o.retry(retryPolicy(maxRetrysNext, false));

執行順序:
1)首先會先執行下面一行程式碼,獲取負載均衡的重試配置,然後進行負載均衡,選取例項。
2)再執行上面一行程式碼,獲取執行單個服務的重試配置,最後再執行具體的業務邏輯。

3)FeignClient 和 Ribbon重試區別與聯絡:

疑問:一個http請求,如果feign和ribbon都配置了重試機制,異常情況下一共會請求多少次?
經過上面的分析,請求總次數 n 為feignClient和ribbon配置引數的笛卡爾積:
n(請求總次數)=feign(預設5次) * (MaxAutoRetries+1) * (MaxAutoRetriesNextServer+1)
注意:+1是代表ribbon本身預設的請求。

其實二者的重試機制相互獨立,並無聯絡。但是因為用了feign肯定會用到ribbon,所以feign的重試機制相對來說比較雞肋,自己feignClient的時候一般會關閉該功能。ribbon的重試機制預設配置為0,也就是預設是去除重試機制的,建議不要修改。如果配置不當,會因為冪等請求帶來資料問題。所以建議關閉二者的重試功能。
如果開啟的話,建議合理配置Hystrix的超時時間,在一些沒必要的重試請求執行時,根據Hystrix的超時時間,快速失敗,結束重試。

友鏈:探果網