1. 程式人生 > >引入 Gateway 閘道器,這些坑一定要學會避開!!!

引入 Gateway 閘道器,這些坑一定要學會避開!!!

Spring cloud gateway是替代zuul的閘道器產品,基於Spring 5、Spring boot 2.0以上、Reactor, 提供任意的路由匹配和斷言、過濾功能。上一篇文章談了一下Gateway閘道器使用不規範,同事加班淚兩行~,這篇文章將會側重於其他的幾個需要注意的地方。

閘道器實現

這裡介紹編碼方式實現

HystrixObservableCommand.Setter getSetter() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("group-accept");
        HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter.withGroupKey(groupKey);
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("command-accept");

        setter.andCommandKey(commandKey);

        HystrixCommandProperties.Setter proertiesSetter = HystrixCommandProperties.Setter();
        proertiesSetter
                /* *
                 * 執行緒策略配置
                 */
                //設定執行緒模式 預設 1000ms
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                //執行是否啟用超時時間 預設 true
                .withExecutionTimeoutEnabled(true)
                //使用執行緒隔離時,是否對命令執行超時的執行緒呼叫中斷 預設false
                .withExecutionIsolationThreadInterruptOnFutureCancel(false)
                //執行超時的時候是否要它中斷 預設 true
                .withExecutionIsolationThreadInterruptOnTimeout(true)
                //執行的超時時間 預設 1000ms
                .withExecutionTimeoutInMilliseconds(2000)
                /* *
                 * 熔斷策略
                 */
                //是否開啟溶斷 預設 true
                .withCircuitBreakerEnabled(true)
                // 是否允許熔斷器忽略錯誤,預設false, 不開啟 ;
                // true,斷路器強制進入“關閉”狀態,它會接收所有請求。
                // 如果forceOpen屬性為true,該屬性不生效
                .withCircuitBreakerForceClosed(false)
                // 是否強制開啟熔斷器阻斷所有請求, 預設為false
                // 為true時,所有請求都將被拒絕,直接到fallback.
                // 如果該屬性設定為true,斷路器將強制進入“開啟”狀態,
                // 它會拒絕所有請求。該屬性優於forceClosed屬性
                .withCircuitBreakerForceOpen(false)
                // 用來設定當斷路器開啟之後的休眠時間窗。
                // 休眠時間窗結束之後,會將斷路器設定為“半開”狀態,嘗試熔斷的請求命令,
                // 如果依然請求錯誤就將斷路器繼續設定為“開啟”狀態,如果成功,就設定為“關閉”狀態
                // 熔斷器預設工作時間,預設:5000豪秒.
                // 熔斷器中斷請求10秒後會進入半開啟狀態,放部分流量過去重試.
                .withCircuitBreakerSleepWindowInMilliseconds(5000)
                // 熔斷器在整個統計時間內是否開啟的閥值.
                // 在metricsRollingStatisticalWindowInMilliseconds(預設10s)內預設至少請求10次,
                // 熔斷器才發揮起作用,9次熔斷器都不起作用。
                .withCircuitBreakerRequestVolumeThreshold(100)
                // 該屬性用來設定斷路器開啟的錯誤百分比條件。預設值為50.
                // 表示在滾動時間窗中,在請求值超過requestVolumeThreshold閾值的前提下,
                // 如果錯誤請求數百分比超過50,就把斷路器設定為“開啟”狀態,否則就設定為“關閉”狀態
                .withCircuitBreakerErrorThresholdPercentage(50);

        setter.andCommandPropertiesDefaults(proertiesSetter);

        return setter;
    }
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
 RouteLocatorBuilder.Builder routes = builder.routes();
 RouteLocatorBuilder.Builder serviceProvider = routes
  .route("accept",
   r -> r.method(HttpMethod.GET)
    .and()
    .path("/gateway-accept/**")
    .and()
    .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
    .filters(f -> {
     f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
     f.requestRateLimiter(
      config -> config.setKeyResolver(new GenericAccessResolver())
       .setRateLimiter(redisRateLimiter()));
     f.hystrix(config -> config.setName("accept")
      .setFallbackUri("forward:/gateway-fallback")
      .setSetter(getSetter()));
     return f;
    })
    .uri("http://localhost:8888")
     );

 return serviceProvider.build();
}

 

在上面的程式碼中,主要做了3件事情:限流、熔斷策略及降級方法配置

限流

  • 配置redis
spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password:
    timeout: 1500
    lettuce:
      pool:
        max-active: 300 #連線池最大連線數(使用負值表示沒有限制)
        max-idle: 10    #連線池中的最大空閒連線
        min-idle: 5     #連線池中的最小空閒連線
        max-wait: -1    #連線池最大阻塞等待時間(使用負值表示沒有限制)

 

  • 自定義解析
/**
 * @description: 按照訪問地址進行限流(也可以安裝其他條件進行限流),具體可以看exchange.getRequest()的方法和屬性
 **/
public class GenericAccessResolver implements KeyResolver {
    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return Mono.just(exchange.getRequest().getPath().value());
    }
}

 

  • 自定義限流配置
RedisRateLimiter redisRateLimiter() {
 //1000,1500對應replenishRate、burstCapacity
 return new RedisRateLimiter(1000, 1500);
}

 

  • 閘道器使用自定義限流器(閘道器使用程式碼實現)
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
    RouteLocatorBuilder.Builder routes = builder.routes();
    RouteLocatorBuilder.Builder serviceProvider = routes
        .route("accept",
               r -> r.method(HttpMethod.GET)
               .and()
               .path("/gateway-accept/**")
               .and()
               .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
               //.and()
               //.readBody(String.class, readBody -> true)
               .filters(f -> {
                   f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
                   f.requestRateLimiter(config -> config.setKeyResolver(new GenericAccessResolver()).setRateLimiter(redisRateLimiter()));                                   
                   return f;
               })
               .uri("http://localhost:8888")
              );

    return serviceProvider.build();
}
    • 測試

      • jmeter配置

      • 結果

  • 其他

    如果有多個路由,使用不同的限流策略,可以自定義KeyResolver和RedisRateLimiter, 在路由定義時加入

//基於ip限流
public class OtherAccessResolver implements KeyResolver {
    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
    }
}
RedisRateLimiter otherRedisRateLimiter() {
 //1000,1500對應replenishRate、burstCapacity
 return new RedisRateLimiter(100, 500);
}
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
 RouteLocatorBuilder.Builder routes = builder.routes();
 RouteLocatorBuilder.Builder serviceProvider = routes
  .route("accept",
   r -> r.method(HttpMethod.GET)
    .and()
    .path("/gateway-accept/**")
    .and()
    .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
    .filters(f -> {
     f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
     f.requestRateLimiter(
      config -> config.setKeyResolver(new GenericAccessResolver())
       .setRateLimiter(redisRateLimiter()));
     f.hystrix(config -> config.setName("accept")
      .setFallbackUri("forward:/gateway-fallback")
      .setSetter(getSetter()));
     return f;
    })
    .uri("http://localhost:8888"))
        .route("sign",
   r -> r.method(HttpMethod.POST)
    .and()
    .path("/gateway-sign/**")
    .and()
    .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
    .filters(f -> {
     f.rewritePath("/gateway-sign/(?<path>.*)", "/${path}");
     f.requestRateLimiter(
      config -> config.setKeyResolver(new OtherAccessResolver())
       .setRateLimiter(otherRedisRateLimiter()));
     f.hystrix(config -> config.setName("sign")
      .setFallbackUri("forward:/gateway-fallback")
      .setSetter(getSetter()));
     return f;
    })
    .uri("http://localhost:7777")
     );

 return serviceProvider.build();
}

 

熔斷策略

熔斷策略主要是執行緒配置和熔斷配置,上面已經說明很清楚了。在上篇文章中,為了解決閘道器呼叫後臺服務Connection prematurely closed BEFORE response的問題,要設定後臺服務執行緒的空閒時間和閘道器執行緒池執行緒的空閒時間,並讓閘道器執行緒池執行緒的空閒時間小於後臺服務的空閒時間

配置方法

spring:
  cloud:
    gateway:
      httpclient:
        pool:
            max-connections: 500
            max-idle-time: 10000

 

編碼實現

翻閱Spring Cloud Gateway英文資料,知道路由提供一個metadata方法,可以設定路由的元資料(https://docs.spring.io/spring-cloud-gateway/docs/2.2.6.RELEASE/reference/html/#route-metadata-configuration),這些元資料在RouteMetadataUtils中定義:

package org.springframework.cloud.gateway.support;

public final class RouteMetadataUtils {
    public static final String RESPONSE_TIMEOUT_ATTR = "response-timeout";
    public static final String CONNECT_TIMEOUT_ATTR = "connect-timeout";

    private RouteMetadataUtils() {
        throw new AssertionError("Must not instantiate utility class.");
    }
}

 

其中沒有我要的執行緒數量(max-connection)和空閒時間(max-idle-time)的設定,沒有關係,自己加上去:

@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        RouteLocatorBuilder.Builder routes = builder.routes();
        RouteLocatorBuilder.Builder serviceProvider = routes
           .route("accept",
               r -> r.method(HttpMethod.GET)
                     .and()
                     .path("/gateway-accept/**")
                     .and()
                     .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
                     .filters(f -> {
                          f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
                          f.requestRateLimiter(
                             config -> config.setKeyResolver(new GenericAccessResolver())
                                           .setRateLimiter(redisRateLimiter()));
                          f.hystrix(config -> config.setName("accept")
                                      .setFallbackUri("forward:/gateway-fallback")
                                      .setSetter(getSetter()));
                                return f;
                          })
                     .uri("http://localhost:8888")
                     .metadata("max-idle-time", 10000)  //閘道器呼叫後臺執行緒空閒時間設定
                     .metadata("max-connections", 200)  //閘道器呼叫後臺服務執行緒數量設定
          );

     return serviceProvider.build();
}

 

測試果然和yml配置一樣有效果。

降級方法

降級方法本身沒有什麼特別,有一個問題需要注意,呼叫降級方法也是使用執行緒池的,預設在HystrixThreadPoolProperties中定義:

public abstract class HystrixThreadPoolProperties {

    /* defaults */
    static int default_coreSize = 10;            // core size of thread pool
    static int default_maximumSize = 10;         // maximum size of thread pool
    static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
    static int default_maxQueueSize = -1;        // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
                                                 // -1 turns it off and makes us use SynchronousQueue

 

錯誤

如果上面的限流設定比較大,比如1000,最大突發2000,閘道器呼叫後臺服務發生熔斷降級, 熔斷後降級的方法呼叫太頻繁,10個執行緒不夠用,會導致以下500錯誤:

2021-02-01 14:29:45.076 ERROR 64868 --- [ioEventLoop-5-1] a.w.r.e.AbstractErrorWebExceptionHandler : [a0ed6911-18982]  500 Server Error for HTTP GET "/gateway-accept/test"

com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected.
 at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18]
 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
 |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
 |_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]
com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected.
 at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18]
 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
 |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
 |_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]

配置方法

所以要在yml中設定合適的呼叫降級方法的執行緒池, 合理的配置能夠杜絕閘道器500錯誤的發生。

hystrix:
  threadpool:
    group-accept:  #程式碼裡面設定的HystrixCommandGroupKey.Factory.asKey("group-accept")
      coreSize: 50 #併發執行的最大執行緒數,預設10
      maxQueueSize: 1500 #BlockingQueue的最大佇列數
      #即使maxQueueSize沒有達到,達到queueSizeRejectionThreshold該值後,請求也會被拒絕
      queueSizeRejectionThreshold: 1400                                   

閘道器異常截獲

上面的異常後,沒有捕獲異常直接返回前端500錯誤,一般情況下需要返回一個統一介面,比如:

@Data
@ToString
@EqualsAndHashCode
@Accessors(chain = true)
public class Result<T> implements Serializable {
    private Integer code;
    private String message;
    private T data;
    private String sign;

    public static final String SUCCESS = "成功";
    public static final String FAILURE = "失敗";


    public Result(int code, String message) {
        this.code = code;
        this.message = message;
    }

    public Result(int code, String message, T data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }
    public Result(int code, String message, T data, String sign) {
        this.code = code;
        this.message = message;
        this.data = data;
        this.sign = sign;
    }

    public static Result<Object> success() {
        return new Result<Object>(200, SUCCESS);
    }
    public static Result<Object> success(Object data) {
        return new Result<Object>(200, SUCCESS, data);
    }
    public static Result<Object> success(Object data, String sign) {
        return new Result<Object>(200, SUCCESS, data, sign);
    }
    public static Result<Object> failure() {
        return new Result<Object>(400, FAILURE);
    }
    public static Result<Object> failure(Object data) {
        return new Result<Object>(400, FAILURE, data);
    }
    public static Result<Object> failure(Object data, String sign) {
        return new Result<Object>(400, FAILURE, data, sign);
    }


}

 

建立GlobalExceptionConfiguration 實現ErrorWebExceptionHandler(這一段是來者網友提供的)

@Slf4j
@Order(-1)
@Component
@RequiredArgsConstructor
public class GlobalExceptionConfiguration implements ErrorWebExceptionHandler {
    private final ObjectMapper objectMapper;

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();

        if (response.isCommitted()) {
            return Mono.error(ex);
        }

        response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
        if (ex instanceof ResponseStatusException) {
            response.setStatusCode(((ResponseStatusException) ex).getStatus());
        }

        return response
                .writeWith(Mono.fromSupplier(() -> {
                    DataBufferFactory bufferFactory = response.bufferFactory();
                    try {
                        return bufferFactory.wrap(objectMapper.writeValueAsBytes(Result.failure(ex.getMessage())));
                    } catch (JsonProcessingException e) {
                        log.warn("Error writing response", ex);
                        return bufferFactory.wrap(new byte[0]);
                    }
                }));
    }
}

 

這樣,就會把閘道器異常統一包裝在介面中返回:如:

後臺日誌已經沒有之前的錯誤日誌了。

編碼實現,沒找到

由於Spring Cloud Gateway 中的 Hystrix採用的是HystrixObservableCommand.Setter, 沒有采用 HystrixCommand.Setter, 在 HystrixCommand.Setter中是可以編碼實現執行緒池配置的, 但是在HystrixObservableCommand.Setter沒有提供:

final public static class Setter {
        protected final HystrixCommandGroupKey groupKey;
        protected HystrixCommandKey commandKey;
        protected HystrixThreadPoolKey threadPoolKey;  //有屬性但是沒有set方法
        protected HystrixCommandProperties.Setter commandPropertiesDefaults;
        protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; //有屬性沒有set方法

        protected Setter(HystrixCommandGroupKey groupKey) {
            this.groupKey = groupKey;
            // default to using SEMAPHORE for ObservableCommand
            commandPropertiesDefaults = setDefaults(HystrixCommandProperties.Setter());
        }

        public static Setter withGroupKey(HystrixCommandGroupKey groupKey) {
            return new Setter(groupKey);
        }

        public Setter andCommandKey(HystrixCommandKey commandKey) {
            this.commandKey = commandKey;
            return this;
        }

        public Setter andCommandPropertiesDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) {
            this.commandPropertiesDefaults = setDefaults(commandPropertiesDefaults);
            return this;
        }

        private HystrixCommandProperties.Setter setDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) {
            if (commandPropertiesDefaults.getExecutionIsolationStrategy() == null) {
                // default to using SEMAPHORE for ObservableCommand if the user didn't set it
                commandPropertiesDefaults.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE);
            }
            return commandPropertiesDefaults;
        }
    }

 

由於本人水平有限,沒有找到Setter中設定HystrixThreadPoolKey和HystrixThreadPoolProperties.Setter的方法,所以只能在yml中配置。有知道的同學告訴我一聲,不勝感激。

總結

所以在Spring Cloud Gateway閘道器的配置中,需要綜合考慮限流大小、閘道器呼叫後臺連線池設定大小、後臺服務的連線池以及空閒時間,包括閘道器呼叫降級方法的執行緒池配置,都需要在壓測中調整到一個合理的配置,才能發揮最大的功效。

本人水平有限,跟深入的研究還在繼續,如果文章有表達錯誤或者不周,請大家指正,謝