1. 程式人生 > >限流演算法的理解和應用場景和實現[臨界點處理]

限流演算法的理解和應用場景和實現[臨界點處理]

在開發高併發系統時,有三把利器來保護系統:快取、降級和限流。一下有幾種限流的方法可以參考。

訊號量和令牌桶的區別:

    訊號量限制的是併發,資源. 令牌桶如果耗時比較高的話,併發可能會比較大. 限制的是 qps.

計數器法

計數器法是限流演算法裡最簡單也是最容易實現的一種演算法。比如我們規定,對於A介面來說,我們1分鐘的訪問次數不能超過100個。那麼我們可以這麼做:在一開 始的時候,我們可以設定一個計數器counter,每當一個請求過來的時候,counter就加1,如果counter的值大於100並且該請求與第一個 請求的間隔時間還在1分鐘之內,那麼說明請求數過多;如果該請求與第一個請求的間隔時間大於1分鐘,且counter的值還在限流範圍內,那麼就重置 counter,具體演算法的示意圖如下:

2016-09-01_20:31:28.jpg

這個演算法雖然簡單,但是有一個十分致命的問題,那就是臨界問題,我們看下圖:

2016-09-01_20:35:21.jpg

從上圖中我們可以看到,假設有一個惡意使用者,他在0:59時,瞬間傳送了100個請求,並且1:00又瞬間傳送了100個請求,那麼其實這個使用者在 1秒裡面,瞬間傳送了200個請求。我們剛才規定的是1分鐘最多100個請求,也就是每秒鐘最多1.7個請求,使用者通過在時間視窗的重置節點處突發請求, 可以瞬間超過我們的速率限制。使用者有可能通過演算法的這個漏洞,瞬間壓垮我們的應用。

聰明的朋友可能已經看出來了,剛才的問題其實是因為我們統計的精度太低。那麼如何很好地處理這個問題呢?或者說,如何將臨界問題的影響降低呢?我們可以看下面的滑動視窗演算法。

public class CounterDemo {  
    public long timeStamp = getNowTime();  
    public int reqCount = 0;  
    public final int limit = 100; // 時間視窗內最大請求數  
    public final long interval = 1000; // 時間視窗ms  
    public boolean grant() {  
        long now = getNowTime();  
        if (now < timeStamp + interval) {  
            // 在時間視窗內  
            reqCount++;  
            // 判斷當前時間視窗內是否超過最大請求控制數  
            return reqCount <= limit;  
        }  
        else {  
            timeStamp = now;  
            // 超時後重置  
            reqCount = 1;  
            return true;  
        }  
    }  
}  

滑動視窗

滑動視窗,又稱rolling window。為了解決這個問題,我們引入了滑動視窗演算法。如果學過TCP網路協議的話,那麼一定對滑動視窗這個名詞不會陌生。下面這張圖,很好地解釋了滑動視窗演算法:

2016-09-01_20:42:46.jpg

在上圖中,整個紅色的矩形框表示一個時間視窗,在我們的例子中,一個時間視窗就是一分鐘。然後我們將時間視窗進行劃分,比如圖中,我們就將滑動視窗 劃成了6格,所以每格代表的是10秒鐘。每過10秒鐘,我們的時間視窗就會往右滑動一格。每一個格子都有自己獨立的計數器counter,比如當一個請求 在0:35秒的時候到達,那麼0:30~0:39對應的counter就會加1。

那麼滑動視窗怎麼解決剛才的臨界問題的呢?我們可以看上圖,0:59到達的100個請求會落在灰色的格子中,而1:00到達的請求會落在橘黃色的格 子中。當時間到達1:00時,我們的視窗會往右移動一格,那麼此時時間視窗內的總請求數量一共是200個,超過了限定的100個,所以此時能夠檢測出來觸 發了限流。

我再來回顧一下剛才的計數器演算法,我們可以發現,計數器演算法其實就是滑動視窗演算法。只是它沒有對時間視窗做進一步地劃分,所以只有1格。

由此可見,當滑動視窗的格子劃分的越多,那麼滑動視窗的滾動就越平滑,限流的統計就會越精確。

public class LeakyDemo {  
    public long timeStamp = getNowTime();  
    public int capacity; // 桶的容量  
    public int rate; // 水漏出的速度  
    public int water; // 當前水量(當前累積請求數)  
    public boolean grant() {  
        long now = getNowTime();  
        water = max(0, water - (now - timeStamp) * rate); // 先執行漏水,計算剩餘水量  
        timeStamp = now;  
        if ((water + 1) < capacity) {  
            // 嘗試加水,並且水還未滿  
            water += 1;  
            return true;  
        }  
        else {  
            // 水滿,拒絕加水  
            return false;  
        }  
    }  
}

先看看漏桶演算法(Leaky bucket)


如圖所示,很明顯從原來兩個流量(12mbps 和2mbps)限流成了 3mbps. 

實現:

   一個比較簡單實現是: n 個執行緒這種先把資料流量放置到一個佇列裡(或者 一個介面拆成1個佇列,分而治之), 然後另外一個執行緒從裡面獲取資料,請求.

應用場景: 

     非同步化的呼叫比較好, 同步化的呼叫的話,就需要搞成類似 reactor 模式的形式,每個資料包需要有 seq_no 的概念(tcp,dubbo 非同步傳輸).

再看看令牌桶(Token bucket):

   Guava官方文件-RateLimiter類

public class TokenBucketDemo {  
    public long timeStamp = getNowTime();  
    public int capacity; // 桶的容量  
    public int rate; // 令牌放入速度  
    public int tokens; // 當前令牌數量  
    public boolean grant() {  
        long now = getNowTime();  
        // 先新增令牌  
        tokens = min(capacity, tokens + (now - timeStamp) * rate);   
        timeStamp = now;  
        if (tokens < 1) {  
            // 若不到1個令牌,則拒絕  
            return false;  
        }  
        else {  
            // 還有令牌,領取令牌  
            tokens -= 1;  
            return true;  
        }  
    }  
}  

使用Guava的RateLimiter進行限流控制

Guava是google提供的java擴充套件類庫,其中的限流工具類RateLimiter採用的就是令牌桶演算法。RateLimiter 從概念上來講,速率限制器會在可配置的速率下分配許可證,如果必要的話,每個acquire() 會阻塞當前執行緒直到許可證可用後獲取該許可證,一旦獲取到許可證,不需要再釋放許可證。通俗的講RateLimiter會按照一定的頻率往桶裡扔令牌,執行緒拿到令牌才能執行,比如你希望自己的應用程式QPS不要超過1000,那麼RateLimiter設定1000的速率後,就會每秒往桶裡扔1000個令牌。例如我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個,此時可以採用如下方式:


有一點很重要,那就是請求的許可數從來不會影響到請求本身的限制(呼叫acquire(1) 和呼叫acquire(1000) 將得到相同的限制效果,如果存在這樣的呼叫的話),但會影響下一次請求的限制,也就是說,如果一個高開銷的任務抵達一個空閒的RateLimiter,它會被馬上許可,但是下一個請求會經歷額外的限制,從而來償付高開銷任務。注意:RateLimiter 並不提供公平性的保證。

  1. publicclass <span style="font-size:14px;">RateLimiter</span>{  
  2.   publicdouble acquire() {  
  3.         return acquire(1);  
  4.     }  
  5.  publicdouble acquire(int permits) {  
  6.         checkPermits(permits);  //檢查引數是否合法(是否大於0)
  7.         long microsToWait;  
  8.         synchronized (mutex) { //應對併發情況需要同步
  9.             microsToWait = reserveNextTicket(permits, readSafeMicros()); //獲得需要等待的時間 
  10.         }  
  11.         ticker.sleepMicrosUninterruptibly(microsToWait); //等待,當未達到限制時,microsToWait為0
  12.         return1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);  
  13.     }  
  14. privatelong reserveNextTicket(double requiredPermits, long nowMicros) {  
  15.         resync(nowMicros); //補充令牌
  16.         long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;  
  17.         double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //獲取這次請求消耗的令牌數目
  18.         double freshPermits = requiredPermits - storedPermitsToSpend;  
  19.         long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)  
  20.                 + (long) (freshPermits * stableIntervalMicros);   
  21.         this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;  
  22.         this.storedPermits -= storedPermitsToSpend; // 減去消耗的令牌
  23.         return microsToNextFreeTicket;  
  24.     }  
  25. privatevoid resync(long nowMicros) {  
  26.         // if nextFreeTicket is in the past, resync to now
  27.         if (nowMicros > nextFreeTicketMicros) {  
  28.             storedPermits = Math.min(maxPermits,  
  29.                     storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);  
  30.             nextFreeTicketMicros = nowMicros;  
  31.         }  
  32.     }  
  33. }  

四、使用Semphore進行併發流控

Java 併發庫的Semaphore 可以很輕鬆完成訊號量控制,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合。下面的Demo中申明瞭一個只有5個許可的Semaphore,而有20個執行緒要訪問這個資源,通過acquire()和release()獲取和釋放訪問許可:



最後:進行限流控制還可以有很多種方法,針對不同的場景各有優劣,例如通過AtomicLong計數器控制、使用MQ訊息佇列進行流量消峰等等。