1. 程式人生 > >限流演算法之漏桶演算法、令牌桶演算法

限流演算法之漏桶演算法、令牌桶演算法

昨天CodeReview的時候看到同時使用RateLimiter這個類用作QPS訪問限制.學習一下這個類.

RateLimiter是Guava的concurrent包下的一個用於限制訪問頻率的類.

1.限流

每個API介面都是有訪問上限的,當訪問頻率或者併發量超過其承受範圍時候,我們就必須考慮限流來保證介面的可用性或者降級可用性.即介面也需要安裝上保險絲,以防止非預期的請求對系統壓力過大而引起的系統癱瘓.

通常的策略就是拒絕多餘的訪問,或者讓多餘的訪問排隊等待服務,或者引流.

如果要準確的控制QPS,簡單的做法是維護一個單位時間內的Counter,如判斷單位時間已經過去,則將Counter重置零.此做法被認為沒有很好的處理單位時間的邊界,比如在前一秒的最後一毫秒裡和下一秒的第一毫秒都觸發了最大的請求數,將目光移動一下,就看到在兩毫秒內發生了兩倍的QPS.

2.限流演算法

常用的更平滑的限流演算法有兩種:漏桶演算法和令牌桶演算法.

2.1 漏桶演算法

漏桶(Leaky Bucket)演算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水(介面有響應速率),當水流入速度過大會直接溢位(訪問頻率超過介面響應速率),然後就拒絕請求,可以看出漏桶演算法能強行限制資料的傳輸速率.示意圖如下:

可見這裡有兩個變數,一個是桶的大小,支援流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate),虛擬碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  1. double rate; // leak rate in calls/s

  2. double burst; // bucket size in calls

  3. long refreshTime; // time for last water refresh

  4. double water; // water count at refreshTime

  5. refreshWater() {

  6. long now = getTimeOfDay();

  7. //水隨著時間流逝,不斷流走,最多就流乾到0.

  8. water = max(0, water- (now - refreshTime)*rate);

  9. refreshTime = now;

  10. }

  11. bool permissionGranted() {

  12. refreshWater();

  13. if (water < burst) { // 水桶還沒滿,繼續加1

  14. water ++;

  15. return true;

  16. } else {

  17. return false;

  18. }

  19. }

因為漏桶的漏出速率是固定的引數,所以,即使網路中不存在資源衝突(沒有發生擁塞),漏桶演算法也不能使流突發(burst)到埠速率.因此,漏桶演算法對於存在突發特性的流量來說缺乏效率.

2.2 令牌桶演算法

令牌桶演算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的演算法,更加容易理解.隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶裡加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.

令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種演算法則實時的計算應該增加的令牌的數量.

3.RateLimiter簡介

Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶演算法(Token Bucket)來完成限流,非常易於使用.RateLimiter經常用於限制對一些物理資源或者邏輯資源的訪問速率.它支援兩種獲取permits介面,一種是如果拿不到立刻返回false,一種會阻塞等待一段時間看能不能拿到.

RateLimiter和Java中的訊號量(java.util.concurrent.Semaphore)類似,Semaphore通常用於限制併發量.

原始碼註釋中的一個例子,比如我們有很多工需要執行,但是我們不希望每秒超過兩個任務執行,那麼我們就可以使用RateLimiter:

1
2
3
4
5
6
7
  1. final RateLimiter rateLimiter = RateLimiter.create(2.0);

  2. void submitTasks(List<Runnable> tasks, Executor executor) {

  3. for (Runnable task : tasks) {

  4. rateLimiter.acquire(); // may wait

  5. executor.execute(task);

  6. }

  7. }

另外一個例子,假如我們會產生一個數據流,然後我們想以每秒5kb的速度傳送出去.我們可以每獲取一個令牌(permit)就傳送一個byte的資料,這樣我們就可以通過一個每秒5000個令牌的RateLimiter來實現:

1
2
3
4
5
  1. final RateLimiter rateLimiter = RateLimiter.create(5000.0);

  2. void submitPacket(byte[] packet) {

  3. rateLimiter.acquire(packet.length);

  4. networkService.send(packet);

  5. }

另外,我們也可以使用非阻塞的形式達到降級執行的目的,即使用非阻塞的tryAcquire()方法:

1
2
3
4
5
  1. if(limiter.tryAcquire()) { //未請求到limiter則立即返回false

  2. doSomething();

  3. }else{

  4. doSomethingElse();

  5. }

4.RateLimiter主要介面

RateLimiter其實是一個abstract類,但是它提供了幾個static方法用於建立RateLimiter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  1. /**

  2. * 建立一個穩定輸出令牌的RateLimiter,保證了平均每秒不超過permitsPerSecond個請求

  3. * 當請求到來的速度超過了permitsPerSecond,保證每秒只處理permitsPerSecond個請求

  4. * 當這個RateLimiter使用不足(即請求到來速度小於permitsPerSecond),會囤積最多permitsPerSecond個請求

  5. */

  6. public static RateLimiter create(double permitsPerSecond);

  7. /**

  8. * 建立一個穩定輸出令牌的RateLimiter,保證了平均每秒不超過permitsPerSecond個請求

  9. * 還包含一個熱身期(warmup period),熱身期內,RateLimiter會平滑的將其釋放令牌的速率加大,直到起達到最大速率

  10. * 同樣,如果RateLimiter在熱身期沒有足夠的請求(unused),則起速率會逐漸降低到冷卻狀態

  11. *

  12. * 設計這個的意圖是為了滿足那種資源提供方需要熱身時間,而不是每次訪問都能提供穩定速率的服務的情況(比如帶快取服務,需要定期重新整理快取的)

  13. * 引數warmupPeriod和unit決定了其從冷卻狀態到達最大速率的時間

  14. */

  15. public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);

提供了兩個獲取令牌的方法,不帶引數表示獲取一個令牌.如果沒有令牌則一直等待,返回等待的時間(單位為秒),沒有被限流則直接返回0.0:

1
2
3
  1. public double acquire();

  2. public double acquire(int permits);

嘗試獲取令牌,分為待超時時間和不帶超時時間兩種:

1
2
3
4
5
6
  1. public boolean tryAcquire();

  2. //嘗試獲取一個令牌,立即返回

  3. public boolean tryAcquire(int permits);

  4. public boolean tryAcquire(long timeout, TimeUnit unit);

  5. //嘗試獲取permits個令牌,帶超時時間

  6. public boolean tryAcquire(int permits, long timeout, TimeUnit unit);

5.RateLimiter設計

考慮一下RateLimiter是如何設計的,並且為什麼要這樣設計.

RateLimiter的主要功能就是提供一個穩定的速率,實現方式就是通過限制請求流入的速度,比如計算請求等待合適的時間閾值.

實現QPS速率的最簡單的方式就是記住上一次請求的最後授權時間,然後保證1/QPS秒內不允許請求進入.比如QPS=5,如果我們保證最後一個被授權請求之後的200ms的時間內沒有請求被授權,那麼我們就達到了預期的速率.如果一個請求現在過來但是最後一個被授權請求是在100ms之前,那麼我們就要求當前這個請求等待100ms.按照這個思路,請求15個新令牌(許可證)就需要3秒.

有一點很重要:上面這個設計思路的RateLimiter記憶非常的淺,它的腦容量非常的小,只記得上一次被授權的請求的時間.如果RateLimiter的一個被授權請求q之前很長一段時間沒有被使用會怎麼樣?這個RateLimiter會立馬忘記過去這一段時間的利用不足,而只記得剛剛的請求q.

過去一段時間的利用不足意味著有過剩的資源是可以利用的.這種情況下,RateLimiter應該加把勁(speed up for a while)將這些過剩的資源利用起來.比如在向網路中發生資料的場景(限流),過去一段時間的利用不足可能意味著網絡卡緩衝區是空的,這種場景下,我們是可以加速傳送來將這些過程的資源利用起來.

另一方面,過去一段時間的利用不足可能意味著處理請求的伺服器對即將到來的請求是準備不足的(less ready for future requests),比如因為很長一段時間沒有請求當前伺服器的cache是陳舊的,進而導致即將到來的請求會觸發一個昂貴的操作(比如重新重新整理全量的快取).

為了處理這種情況,RateLimiter中增加了一個維度的資訊,就是過去一段時間的利用不足(past underutilization),程式碼中使用storedPermits變量表示.當沒有利用不足這個變數為0,最大能達到maxStoredPermits(maxStoredPermits表示完全沒有利用).因此,請求的令牌可能從兩個地方來:

  1. 1.過去剩餘的令牌(stored permits, 可能沒有)

  2. 2.現有的令牌(fresh permits,當前這段時間還沒用完的令牌)

我們將通過一個例子來解釋它是如何工作的:

對一個每秒產生一個令牌的RateLimiter,每有一個沒有使用令牌的一秒,我們就將storedPermits加1,如果RateLimiter在10秒都沒有使用,則storedPermits變成10.0.這個時候,一個請求到來並請求三個令牌(acquire(3)),我們將從storedPermits中的令牌為其服務,storedPermits變為7.0.這個請求之後立馬又有一個請求到來並請求10個令牌,我們將從storedPermits剩餘的7個令牌給這個請求,剩下還需要三個令牌,我們將從RateLimiter新產生的令牌中獲取.我們已經知道,RateLimiter每秒新產生1個令牌,就是說上面這個請求還需要的3個請求就要求其等待3秒.

想象一個RateLimiter每秒產生一個令牌,現在完全沒有使用(處於初始狀態),限制一個昂貴的請求acquire(100)過來.如果我們選擇讓這個請求等待100秒再允許其執行,這顯然很荒謬.我們為什麼什麼也不做而只是傻傻的等待100秒,一個更好的做法是允許這個請求立即執行(和acquire(1)沒有區別),然後將隨後到來的請求推遲到正確的時間點.這種策略,我們允許這個昂貴的任務立即執行,並將隨後到來的請求推遲100秒.這種策略就是讓任務的執行和等待同時進行.

一個重要的結論:RateLimiter不會記最後一個請求,而是即下一個請求允許執行的時間.這也可以很直白的告訴我們到達下一個排程時間點的時間間隔.然後定一個一段時間未使用的Ratelimiter也很簡單:下一個排程時間點已經過去,這個時間點和現在時間的差就是Ratelimiter多久沒有被使用,我們會將這一段時間翻譯成storedPermits.所有,如果每秒鐘產生一個令牌(rate==1),並且正好每秒來一個請求,那麼storedPermits就不會增長.

6.RateLimiter主要原始碼

RateLimiter定義了兩個create函式用於構建不同形式的RateLimiter:

  1. 1.public static RateLimiter create(double permitsPerSecond)

  2. 用於建立SmoothBursty型別的RateLimiter

  3. 2.public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

  4. 用於建立

原始碼下面以acquire為例子,分析一下RateLimiter如何實現限流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  1. public double acquire() {

  2. return acquire(1);

  3. }

  4. public double acquire(int permits) {

  5. long microsToWait = reserve(permits);

  6. stopwatch.sleepMicrosUninterruptibly(microsToWait);

  7. return 1.0 * microsToWait / SECONDS.toMicros(1L);

  8. }

  9. final long reserve(int permits) {

  10. checkPermits(permits);

  11. synchronized (mutex()) { //應對併發情況需要同步

  12. return reserveAndGetWaitLength(permits, stopwatch.readMicros());

  13. }

  14. }

  15. final long reserveAndGetWaitLength(int permits, long nowMicros) {

  16. long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

  17. return max(momentAvailable - nowMicros, 0);

  18. }

下面方法來自RateLimiter的具體實現類SmoothRateLimiter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  1. final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {

  2. resync(nowMicros); //補充令牌

  3. long returnValue = nextFreeTicketMicros;

  4. //這次請求消耗的令牌數目

  5. double storedPermitsToSpend = min(requiredPermits, this.storedPermits);

  6. double freshPermits = requiredPermits - storedPermitsToSpend;

  7. long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)

  8. + (long) (freshPermits * stableIntervalMicros);

  9. this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;

  10. this.storedPermits -= storedPermitsToSpend;

  11. return returnValue;

  12. }

  13. private void resync(long nowMicros) {

  14. // if nextFreeTicket is in the past, resync to now

  15. if (nowMicros > nextFreeTicketMicros) {

  16. storedPermits = min(maxPermits,

  17. storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);

  18. nextFreeTicketMicros = nowMicros;

  19. }

  20. }

另外,對於storedPermits的使用,RateLimiter存在兩種策略,二者區別主要體現在使用storedPermits時候需要等待的時間。這個邏輯由storedPermitsToWaitTime函式實現:

1
2
3
4
5
6
7
8
9
  1. /**

  2. * Translates a specified portion of our currently stored permits which we want to

  3. * spend/acquire, into a throttling time. Conceptually, this evaluates the integral

  4. * of the underlying function we use, for the range of

  5. * [(storedPermits - permitsToTake), storedPermits].

  6. *

  7. * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}

  8. */

  9. abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

存在兩種策略就是為了應對我們上面講到的,存在資源使用不足大致分為兩種情況: (1).資源確實使用不足,這些剩餘的資源我們私海可以使用的; (2).提供資源的服務過去還沒準備好,比如服務剛啟動等;

為此,RateLimiter實際上由兩種實現策略,其實現分別見SmoothBursty和SmoothWarmingUp。二者主要的區別就是storedPermitsToWaitTime實現以及maxPermits數量的計算。

6.1 SmoothBursty

SmoothBursty使用storedPermits不需要額外等待時間。並且預設maxBurstSeconds未1,因此maxPermits為permitsPerSecond,即最多可以儲存1秒的剩餘令牌,比如QPS=5,則maxPermits=5.

下面這個RateLimiter的入口就是用來建立SmoothBursty型別的RateLimiter,

1
public static RateLimiter create(double permitsPerSecond)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  1. /**

  2. * This implements a "bursty" RateLimiter, where storedPermits are translated to

  3. * zero throttling. The maximum number of permits that can be saved (when the RateLimiter is

  4. * unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this

  5. * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.

  6. */

  7. static final class SmoothBursty extends SmoothRateLimiter {

  8. /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */

  9. final double maxBurstSeconds;

  10. SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {

  11. super(stopwatch);

  12. this.maxBurstSeconds = maxBurstSeconds;

  13. }

  14. void doSetRate(double permitsPerSecond, double stableIntervalMicros) {

  15. double oldMaxPermits = this.maxPermits;

  16. maxPermits = maxBurstSeconds * permitsPerSecond;

  17. System.out.println("maxPermits=" + maxPermits);

  18. if (oldMaxPermits == Double.POSITIVE_INFINITY) {

  19. // if we don't special-case this, we would get storedPermits == NaN, below

  20. storedPermits = maxPermits;

  21. } else {

  22. storedPermits = (oldMaxPermits == 0.0)

  23. ? 0.0 // initial state

  24. : storedPermits * maxPermits / oldMaxPermits;

  25. }

  26. }

  27. long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {

  28. return 0L;

  29. }

  30. }

一個簡單的使用示意圖及解釋,下面私海一個QPS=4的SmoothBursty:

  1. (1).t=0,這時候storedPermits=0,請求1個令牌,等待時間=0;

  2. (2).t=1,這時候storedPermits=3,請求3個令牌,等待時間=0;

  3. (3).t=2,這時候storedPermits=4,請求10個令牌,等待時間=0,超前使用了2個令牌;

  4. (4).t=3,這時候storedPermits=0,請求1個令牌,等待時間=0.5;

程式碼的輸出:

1
2
3
4
5
6
7
8
9
10
11
  1. maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472

  2. acquire(1), sleepSecond=0.0

  3. maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345

  4. acquire(3), sleepSecond=0.0

  5. maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668

  6. acquire(10), sleepSecond=0.0

  7. maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668

  8. acquire(1), sleepSecond=0.499591

6.2 SmoothWarmingUp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  1. static final class SmoothWarmingUp extends SmoothRateLimiter {

  2. private final long warmupPeriodMicros;

  3. /**

  4. * The slope of the line from the stable interval (when permits == 0), to the cold interval

  5. * (when permits == maxPermits)

  6. */

  7. private double slope;

  8. private double halfPermits;

  9. SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit) {

  10. super(stopwatch);

  11. this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);

  12. }

  13. @Override

  14. void doSetRate(double permitsPerSecond, double stableIntervalMicros) {

  15. double oldMaxPermits = maxPermits;

  16. maxPermits = warmupPeriodMicros / stableIntervalMicros;

  17. halfPermits = maxPermits / 2.0;

  18. // Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate

  19. double coldIntervalMicros = stableIntervalMicros * 3.0;

  20. slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;

  21. if (oldMaxPermits == Double.POSITIVE_INFINITY) {

  22. // if we don't special-case this, we would get storedPermits == NaN, below

  23. storedPermits = 0.0;

  24. } else {

  25. storedPermits = (oldMaxPermits == 0.0)

  26. ? maxPermits // initial state is cold

  27. : storedPermits * maxPermits / oldMaxPermits;

  28. }

  29. }

  30. @Override

  31. long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {

  32. double availablePermitsAboveHalf = storedPermits - halfPermits;

  33. long micros = 0;

  34. // measuring the integral on the right part of the function (the climbing line)

  35. if (availablePermitsAboveHalf > 0.0) {

  36. double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);

  37. micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)

  38. + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);

  39. permitsToTake -= permitsAboveHalfToTake;

  40. }

  41. // measuring the integral on the left part of the function (the horizontal line)

  42. micros += (stableIntervalMicros * permitsToTake);

  43. return micros;

  44. }

  45. private double permitsToTime(double permits) {

  46. return stableIntervalMicros + permits * slope;

  47. }

  48. }

maxPermits等於熱身(warmup)期間能產生的令牌數,比如QPS=4,warmup為2秒,則maxPermits=8.halfPermits為maxPermits的一半.

參考註釋中的神圖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  1. * ^ throttling

  2. * |

  3. * 3*stable + /

  4. * interval | /.

  5. * (cold) | / .

  6. * | / . <-- "warmup period" is the area of the trapezoid between

  7. * 2*stable + / . halfPermits and maxPermits

  8. * interval | / .

  9. * | / .

  10. * | / .

  11. * stable +----------/ WARM . }

  12. * interval | . UP . } <-- this rectangle (from 0 to maxPermits, and

  13. * | . PERIOD. } height == stableInterval) defines the cooldown period,

  14. * | . . } and we want cooldownPeriod == warmupPeriod

  15. * |---------------------------------> storedPermits

  16. * (halfPermits) (maxPermits)

  17. *

下面是我們QPS=4,warmup為2秒時候對應的圖。 

maxPermits=8,halfPermits=4,和SmoothBursty相同的請求序列:

  1. (1).t=0,這時候storedPermits=8,請求1個令牌,使用1個storedPermits消耗時間=1×(0.75+0.625)/2=0.6875秒;

  2. (2).t=1,這時候storedPermits=8,請求3個令牌,使用3個storedPermits消耗時間=3×(0.75+0.375)/2=1.6875秒(注意已經超過1秒了,意味著下次產生新Permit時間為2.6875);

  3. (3).t=2,這時候storedPermits=5,請求10個令牌,使用5個storedPermits消耗時間=1×(0.375+0.25)/2+4*0.25=1.3125秒,再加上額外請求的5個新產生的Permit需要消耗=5*0.25=1.25秒,即總共需要耗時2.5625秒,則下一次產生新的Permit時間為2.6875+2.5625=5.25,注意當前請求私海2.6875才返回的,之前一直阻塞;

  4. (4).t=3,因為前一個請求阻塞到2.6875,實際這個請求3.6875才到達RateLimiter,請求1個令牌,storedPermits=0,下一次產生新Permit時間為5.25,因此總共需要等待5.25-3.6875=1.5625秒;

實際執行結果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  1. warmupPeriodMicros=2000000

  2. stableIntervalMicros=250000.0, maxPermits=8.0, halfPermits=4.0, coldIntervalMicros=750000.0, slope=125000.0, storedPermits=8.0

  3. maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1524

  4. acquire(1), sleepSecond=0.0

  5. maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1001946

  6. acquire(3), sleepSecond=0.0

  7. maxPermits=8.0, storedPermits=5.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2689446

  8. acquire(10), sleepSecond=0.687186

  9. maxPermits=8.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=5251946

  10. acquire(1), sleepSecond=1.559174

7.其他限流器