1. 程式人生 > >Spring Cloud Gateway 自定義限流

Spring Cloud Gateway 自定義限流

在使用Spring Cloud Gateway限流功能時官網提供的限流中的流速以及桶容量是針對所有策略的,意思是隻要配置上那麼所有的都是一樣的,不能根據不同的型別配置不同的引數,例如:A渠道、B渠道,若配置上replenishRate(流速)和burstCapacity(令牌桶容量),那麼不管是A渠道還是B渠道都是這個值,如果修改那麼對應的其他渠道也會修改,如何能做到分為不同渠道進行限流呢,A渠道replenishRate:10,burstCapacity:100,B渠道:replenishRate:20,burstCapacity:1000,下面開始分析:
限流方式採用的redis,底層使用的redis的lua指令碼實現的,具體可以自行查閱,不做講解,預設限流類:RedisRateLimiter,本文也是仿照這個進行重寫的。
本文是參考“

Spring Cloud Gateway 結合配置中心限流”進行簡化改造,通過配置的方式進行實現。

引入依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

自定義限流類

參照RedisRateLimiter進行自定義限流類SystemRedisRateLimiter用於渠道限流方式,實現程式碼如下:

/**
 * @author : Erick
 * @version : 1.0
 * @Description :
 * @time :2018-12-1
 */
public class SystemRedisRateLimiter extends AbstractRateLimiter<SystemRedisRateLimiter.Config> implements ApplicationContextAware {
	//這些變數全部從RedisRateLimiter複製的,都會用到。
    public static final String REPLENISH_RATE_KEY =
"replenishRate"; public static final String BURST_CAPACITY_KEY = "burstCapacity"; public static final String CONFIGURATION_PROPERTY_NAME = "sys-redis-rate-limiter"; public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript"; public static final String REMAINING_HEADER = "X-RateLimit-Remaining"; public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate"; public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity"; //處理速度 private static final String DEFAULT_REPLENISHRATE="default.replenishRate"; //容量 private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity"; private ReactiveRedisTemplate<String, String> redisTemplate; private RedisScript<List<Long>> script; private AtomicBoolean initialized = new AtomicBoolean(false); private String remainingHeader = REMAINING_HEADER; /** The name of the header that returns the replenish rate configuration. */ private String replenishRateHeader = REPLENISH_RATE_HEADER; /** The name of the header that returns the burst capacity configuration. */ private String burstCapacityHeader = BURST_CAPACITY_HEADER; private Config defaultConfig; public SystemRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script, Validator validator) { super(Config.class , CONFIGURATION_PROPERTY_NAME , validator); this.redisTemplate = redisTemplate; this.script = script; initialized.compareAndSet(false,true); } public SystemRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity){ super(Config.class , CONFIGURATION_PROPERTY_NAME , null); defaultConfig = new Config() .setReplenishRate(defaultReplenishRate) .setBurstCapacity(defaultBurstCapacity); } //具體限流實現,此處呼叫的是lua指令碼 @Override public Mono<RateLimiter.Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } if (ObjectUtils.isEmpty(rateLimiterConf) ){ throw new IllegalArgumentException("No Configuration found for route " + routeId); } //獲取的是自定義的map Map<String , Integer> rateLimitMap = rateLimiterConf.getRateLimitMap(); //快取的key String replenishRateKey = routeId + "." + id + "." + REPLENISH_RATE_KEY; //若map中不存在則採用預設值,存在則取值。 int replenishRate = ObjectUtils.isEmpty(rateLimitMap.get(replenishRateKey)) ? rateLimitMap.get(DEFAULT_REPLENISHRATE) : rateLimitMap.get(replenishRateKey); //容量key String burstCapacityKey = routeId + "." + id + "." + BURST_CAPACITY_KEY; //若map中不存在則採用預設值,存在則取值。 int burstCapacity = ObjectUtils.isEmpty(rateLimitMap.get(burstCapacityKey)) ? rateLimitMap.get(DEFAULT_BURSTCAPACITY) : rateLimitMap.get(burstCapacityKey); try { List<String> keys = getKeys(id); List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); RateLimiter.Response response = new RateLimiter.Response(allowed, getHeaders(replenishRate , burstCapacity , tokensLeft)); return response; }); } catch (Exception e) { e.printStackTrace(); } return Mono.just(new RateLimiter.Response(true, getHeaders(replenishRate , burstCapacity , -1L))); } private RateLimiterConf rateLimiterConf; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.rateLimiterConf = applicationContext.getBean(RateLimiterConf.class); } public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity , Long tokensLeft) { HashMap<String, String> headers = new HashMap<>(); headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(replenishRate)); headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity)); return headers; } static List<String> getKeys(String id) { // use `{}` around keys to use Redis Key hash tags // this allows for using redis cluster // Make a unique key per user. //此處可以自定義redis字首資訊 String prefix = "request_sys_rate_limiter.{" + id; // You need two Redis keys for Token Bucket. String tokenKey = prefix + "}.tokens"; String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); } @Validated public static class Config{ @Min(1) private int replenishRate; @Min(1) private int burstCapacity = 1; public int getReplenishRate() { return replenishRate; } public Config setReplenishRate(int replenishRate) { this.replenishRate = replenishRate; return this; } public int getBurstCapacity() { return burstCapacity; } public Config setBurstCapacity(int burstCapacity) { this.burstCapacity = burstCapacity; return this; } @Override public String toString() { return "Config{" + "replenishRate=" + replenishRate + ", burstCapacity=" + burstCapacity + '}'; } } }

在繼承AbstractRateLimiter泛型使用的是自定義類中的config:SystemRedisRateLimiter.Config

配置類

配置類主要用於初始化map引數。

/**
 * @author : Erick
 * @version : 1.0
 * @Description :
 * @time :2018-12-1
 */
@Component
//使用配置檔案的方式進行初始化
@ConfigurationProperties(prefix = "ratelimiter-conf")
public class RateLimiterConf {
    //處理速度
    private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
    //容量
    private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";

    private Map<String , Integer> rateLimitMap = new ConcurrentHashMap<String , Integer>(){
        {
            put(DEFAULT_REPLENISHRATE , 10);
            put(DEFAULT_BURSTCAPACITY , 100);
        }
    };

    public Map<String, Integer> getRateLimitMap() {
        return rateLimitMap;
    }

    public void setRateLimitMap(Map<String, Integer> rateLimitMap) {
        this.rateLimitMap = rateLimitMap;
    }
}

配置檔案主要採用配置檔案的方式進行初始化,若配置則進行新增,若沒有配置則採用預設值。

配置檔案

主要用於配置各個渠道的限流閥值,例如文章開頭舉例的A渠道和B渠道,配置如下:

//與配置類RateLimiterConf保持一致
ratelimiter-conf:
  #配置限流引數與RateLimiterConf類對映
  rateLimitMap:
    #格式為:routeid(gateway配置routes時指定的).系統名稱.replenishRate(流速)/burstCapacity令牌桶大小
    service.A.replenishRate: 10
    service.A.burstCapacity: 100
    service.B.replenishRate: 20
    service.B.burstCapacity: 1000

到此配置限流相關的程式碼已經完成,需要在啟動類和bootstrap.yml中進行配置才能夠真正的使用。

啟動類中聲名

在啟動類中宣告使用的策略,指定自定義限流類。

@Bean
 KeyResolver sysKeyResolver(){
 	  //從請求地址中擷取sys值,進行限流。
     return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("sys"));
 }

@Bean
@Primary
//使用自己定義的限流類
SystemRedisRateLimiter systemRedisRateLimiter(
        ReactiveRedisTemplate<String, String> redisTemplate,
        @Qualifier(SystemRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script,
        Validator validator){
    return new SystemRedisRateLimiter(redisTemplate , script , validator);
}

採用的是從請求地址中獲取sys引數對應的值,當然也可以設定為其他值,再聲名自定義的限流類。

配置限流

spring:
  cloud:
    gateway:
      routes:
        - id: service //id名稱需要與配置限流的保持一致
          uri: lb://serviceA
          predicates:
            - Path=/service/**/ //最後的/可以去掉,在本文中特意新增的如果去掉會把filters當成註釋內容。
          filters:
            - name: RequestRateLimiter
              args:
                //需要與上邊的方法名保持一致
                rate-limiter: "#{@systemRedisRateLimiter}"
                //需要與策略類的方法名保持一致。
                key-resolver: "#{@sysKeyResolver}"

至此自定義限流程式碼全部完成,如有什麼不妥支援請指正,同時也希望對其他人有幫助。例項程式碼已上傳到碼雲可以點選檢視。


在這裡插入圖片描述