1. 程式人生 > >Guava RateLimiter限流原始碼解析和例項應用

Guava RateLimiter限流原始碼解析和例項應用

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流

  • 快取 快取的目的是提升系統訪問速度和增大系統處理容量
  • 降級 降級是當服務出現問題或者影響到核心流程時,需要暫時遮蔽掉,待高峰或者問題解決後再開啟
  • 限流 限流的目的是通過對併發訪問/請求進行限速,或者對一個時間視窗內的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務、排隊或等待、降級等處理

常用的限流演算法

漏桶演算法

漏桶演算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水,當水流入速度過大會直接溢位,可以看出漏桶演算法能強行限制資料的傳輸速率。

 

令牌桶演算法

對於很多應用場景來說,除了要求能夠限制資料的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶演算法可能就不合適了,令牌桶演算法更為適合。如圖所示,令牌桶演算法的原理是系統會以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務。

 

RateLimiter使用以及原始碼解析

Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶演算法實現流量限制,使用十分方便,而且十分高效。

Guava有兩種限流模式,一種為穩定模式(SmoothBursty:令牌生成速度恆定),一種為漸進模式(SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個穩定值) 兩種模式實現思路類似,主要區別在等待時間的計算上,本篇重點介紹SmoothBursty

public static RateLimiter create(double permitsPerSecond) {
  /*
* 預設的RateLimiter配置可以儲存最多一秒鐘的未使用許可證 */ return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond); }

RateLimiter是一個抽象類,SmoothBursty是其子類SmoothRateLimiter的子類,其兩個構造引數含義如下

  • SleepingStopwatch:guava中的一個時鐘類例項,會通過這個來計算時間及令牌
  • maxBurstSeconds:官方解釋,在ReteLimiter未使用時,最多儲存幾秒的令牌,預設是1
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  //根據每秒向桶中放入令牌的數量來設定當前儲存令牌數
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}
public final void setRate(double permitsPerSecond) {
  //如果每秒向桶中放入令牌的數量(permitsPerSecond)大於0且為數字,通過檢查,否則丟擲引數異常
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  //對每個執行緒進行互斥,建立互斥物件的鎖定
  synchronized (mutex()) {
    //由各項引數更新當前儲存令牌數
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}
public static void checkArgument(boolean expression, @Nullable Object errorMessage) {
  if (!expression) {
    throw new IllegalArgumentException(String.valueOf(errorMessage));
  }
}
private volatile Object mutexDoNotUseDirectly; //執行緒安全的互斥物件
private Object mutex() {
  Object mutex = mutexDoNotUseDirectly;
  if (mutex == null) {
    synchronized (this) {
      mutex = mutexDoNotUseDirectly;
      if (mutex == null) {
        mutexDoNotUseDirectly = mutex = new Object();
      }
    }
  }
  return mutex;
}

在SmoothBursty中

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
  //若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新資料 
  resync(nowMicros);
  //更新新增1個令牌的時間間隔(單位微妙)為1000000微妙(1秒)除以每秒放入令牌桶中的數量
  double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
  this.stableIntervalMicros = stableIntervalMicros;
  //將令牌桶中可以儲存令牌的時間引數加上更新當前可以儲存的令牌數
  doSetRate(permitsPerSecond, stableIntervalMicros);
}
private long nextFreeTicketMicros = 0L; //下一次請求可以獲取令牌的起始時間
double storedPermits; //當前儲存令牌數
double maxPermits; //最大儲存令牌數 = maxBurstSeconds * stableIntervalMicros
double stableIntervalMicros; //新增令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數)
final double maxBurstSeconds; //在RateLimiter未使用時,最多儲存幾秒的令牌
private void resync(long nowMicros) {
  //如果當前時間大於下一次請求可以獲取令牌的起始時間
  if (nowMicros > nextFreeTicketMicros) {
    //比較最大儲存令牌數和當前儲存的令牌數加上現在要增加的令牌數的大小,小的那個賦給當年儲存令牌數,即增加令牌數與當前令牌數之和不能大於最大令牌數
    storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
    //將當前時間賦給下一次請求可以獲取的起始時間
    nextFreeTicketMicros = nowMicros;
  }
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  //將最大儲存令牌數存入臨時副本
  double oldMaxPermits = this.maxPermits;
  //更新最大儲存令牌數為存放令牌的秒數乘以每秒向桶中放入的令牌數
  maxPermits = maxBurstSeconds * permitsPerSecond;
  //如果最大儲存令牌數的臨時副本為正無窮大
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    //更新當前儲存令牌數為最大儲存令牌數
    storedPermits = maxPermits;
  } else { //如果最大儲存令牌數的臨時副本不為正無窮大
    //如果最大儲存令牌數的臨時副本為0,則更新當前儲存令牌數為0,否則
    //更新當前儲存令牌數為當前儲存令牌數乘以最大儲存令牌數除以最大儲存令牌數的臨時副本數
    storedPermits = (oldMaxPermits == 0.0)
        ? 0.0 // initial state
        : storedPermits * maxPermits / oldMaxPermits;
  }
}

我們再來看一下RateLimiter的tryAcquire方法

public boolean tryAcquire(long timeout, TimeUnit unit) {
  //嘗試在timeout時間內獲取令牌,如果可以則掛起(睡眠)等待相應時間並返回true,否則立即返回false 
  return tryAcquire(1, timeout, unit);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  //取等待時間的微妙數與0比較取大值賦給超時時間
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  //如果檢查時間>0,通過檢查,此處為1
  checkPermits(permits);
  long microsToWait;
  //建立互斥物件加鎖互斥
  synchronized (mutex()) {
    //獲取當前時間
    long nowMicros = stopwatch.readMicros();
    //如果下一次請求可以獲取令牌的起始時間減去等待時間大於當前時間
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false; //返回false
    } else { //如果下一次請求可以獲取令牌的起始時間減去等待時間小於等於當前時間
      //獲取下一次請求可以獲取令牌的起始時間減去當前時間的值與0之間的大值並重新整理各引數(下一次請求可以獲取令牌的起始時間、當前儲存令牌數)
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  //執行緒休眠microsToWait時間
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  //返回true
  return true;
}
private static int checkPermits(int permits) {
  checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  return permits;
}
final Stopwatch stopwatch = Stopwatch.createStarted();
@Override
long readMicros() {
  return stopwatch.elapsed(MICROSECONDS);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
  //返回下一次請求可以獲取令牌的起始時間減去等待時間是否小於等於當前時間
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  //獲取下一次請求可以獲取令牌的起始時間並更新各引數(下一次請求可以獲取令牌的起始時間、當前儲存令牌數long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  //返回下一次請求可以獲取令牌的起始時間減去當前時間的值與0之間的大值
  return max(momentAvailable - nowMicros, 0);
}
@Override
void sleepMicrosUninterruptibly(long micros) {
  if (micros > 0) {
    Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
  }
}

在SmoothBursty中

@Override
final long queryEarliestAvailable(long nowMicros) {
  //返回下一次請求可以獲取令牌的起始時間
  return nextFreeTicketMicros;
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  //若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新資料
  resync(nowMicros);
  //獲取下一次請求可以獲取令牌的起始時間
  long returnValue = nextFreeTicketMicros;
  //在允許的請求數(這裡為1)和當前儲存令牌數間取小值賦給允許消費的儲存令牌數(storedPermitsToSpend)
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  //將允許的請求數減去允許消費的儲存令牌數賦給允許重新整理數(freshPermits)
  double freshPermits = requiredPermits - storedPermitsToSpend;
  //將允許重新整理數乘以新增令牌時間間隔賦給等待微妙數(waitMicros)
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
      + (long) (freshPermits * stableIntervalMicros);
  //更新下一次請求可以獲取令牌的起始時間為下一次請求可以獲取令牌的起始時間加上等待微妙數
  this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
  //更新當前儲存令牌數為當前儲存令牌數減去允許消費的儲存令牌數
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}

在Uninterruptibles中

public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
  //定義是否已中斷為false
  boolean interrupted = false;
  try {
    //將下一次請求可以獲取令牌的起始時間減去當前時間的值轉化為納秒定義為remainingNanos
    long remainingNanos = unit.toNanos(sleepFor);
    //將系統的納秒值加上該轉化值為end
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        //執行緒休眠remainingNanos時間
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        //如果發生中斷異常,將是否已中斷更新為true
        interrupted = true;
        //更新remainingNanos為end減去系統的納秒值,並進入下一輪迴圈
        remainingNanos = end - System.nanoTime();
      }
    }
  } finally {
    //如果發生中斷異常
    if (interrupted) {
      //當前執行緒中斷
      Thread.currentThread().interrupt();
    }
  }
}

原始碼分析就是這些了,現在我們來看一下Guava RateLimiter的應用,在APO中攔截Controller,並進行限流

在pom中新增

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>18.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

標籤

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
    /**
     *
     * @return
     */
    String value() default "";

    /**
     * 每秒向桶中放入令牌的數量   預設最大即不做限流
     * @return
     */
    double perSecond() default Double.MAX_VALUE;

    /**
     * 獲取令牌的等待時間  預設0
     * @return
     */
    int timeOut() default 0;

    /**
     * 超時時間單位
     * @return
     */
    TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
}

AOP類

@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
    private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);

    /**
     * 帶有指定註解切入
     */
    @ResponseBody
    @Around(value = "@annotation(com.guanjian.annotation.LxRateLimit)")
    public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
        log.info("攔截到了{}方法...", pjp.getSignature().getName());
        Signature signature = pjp.getSignature();
        MethodSignature methodSignature = (MethodSignature)signature;
        //獲取目標方法
        Method targetMethod = methodSignature.getMethod();
        if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
            //獲取目標方法的@LxRateLimit註解
            LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
            rateLimiter.setRate(lxRateLimit.perSecond());
            if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
                return "伺服器繁忙,請稍後再試!";
        }
        return pjp.proceed();
    }
}

Controller

@RestController
public class AnnotationTestController {
    @GetMapping("/testannotation")
    @LxRateLimit(perSecond = 2000.0, timeOut = 500) //此處限速為2000qps
    public String testAnnotation() {
        return "get token success";
    }
}

我們先在Controller中將@LxRateLimit(perSecond = 2000.0, timeOut = 500)註釋掉

執行Jmeter進行壓測

我們啟用500執行緒壓測

壓測結果

吞吐量為7867.8qps,此時是不限速的

現在我們恢復Controller中的@LxRateLimit(perSecond = 2000.0, timeOut = 500)

吞吐量為2067.7qps

系統日誌可以看到大量的攔截

2019-05-26 21:24:33.370  INFO 11092 --- [o-8080-exec-176] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.370  INFO 11092 --- [io-8080-exec-27] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.374  INFO 11092 --- [o-8080-exec-128] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.374  INFO 11092 --- [o-8080-exec-191] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.374  INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.377  INFO 11092 --- [io-8080-exec-36] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.379  INFO 11092 --- [o-8080-exec-123] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.379  INFO 11092 --- [io-8080-exec-61] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.380  INFO 11092 --- [io-8080-exec-19] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.382  INFO 11092 --- [io-8080-exec-77] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnotation方法...
2019-05-26 21:24:33.384  INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect       : 攔截到了testAnnota