1. 程式人生 > >Guava-RateLimiter秒殺限流技術詳解

Guava-RateLimiter秒殺限流技術詳解

使用場景

系統使用下游資源時,需要考慮下游對資源受限、處理能力,在下游資源無法或者短時間內無法提升處理效能的情況下,可以使用限流器或者類似保護機制,避免下游服務崩潰造成整體服務的不可用。

常用演算法

常見限流演算法有兩種:漏桶演算法和令牌桶演算法。

漏桶演算法

具體問題

網站的訪問ip中,找出進行頻繁連線的ip,並對這些ip的訪問頻率進行限制。

解決方案

Leak Bucket / Token Bucket

學習資料

概述

將上述的尋找頻繁訪問ip的問題提升到一個更高的抽象層次,就是網站的流量控制。Leaky Bucket就是一種可以輔助實現流量控制的演算法。

在我看來,Leaky Bucket是一個抽象層次略高的演算法。它的作用,是通過一種模型(即桶),建立了一種合理地判斷流量是否異常的演算法。

至於在判斷出異常流量後,要觸發怎樣的操作——拋棄?放入等待佇列暫緩傳送?——仍然要交給演算法的實現者根據具體需求作出選擇。這並不是Leaky Bucket的管轄範疇。

根據wiki上的介紹,Leaky Bucket實際上有兩種不同的含義。

1)as a meter(作為計量工具)

2)as a queue(作為排程佇列)

其中,第一種含義和Token Bucket是等價的,只是表述的角度不同。更有趣的是,第二種含義其實是第一種的特例。這些對比和區別在後面再談,先整體看一下Leaky Bucket。

Leaky Bucket整體思想

Leaky Bucket的核心抽象模型就如字面意思:一個會漏水的桶。

Alt text

如圖,桶本身具有一個恆定的速率往下漏水,而上方時快時慢地會有水進入桶中。當桶還未滿時,上方的水可以加入。一旦水滿,上方的水就無法加入了。桶滿正是演算法中的一個的關鍵觸發條件(即流量異常判斷成立的條件)。而此條件下如何處理上方欲留下的水,則有了下面兩種常見的方式。

Traffic Shaping和Traffic Policing

在桶滿水之後,常見的兩種處理方式為:

1)暫時攔截住上方水的向下流動,等待桶中的一部分水漏走後,再放行上方水。

2)溢位的上方水直接拋棄。

將水看作網路通訊中資料包的抽象,則

方式1起到的效果稱為Traffic Shaping,

方式2起到的效果稱為Traffic Policing。

由此可見,Traffic Shaping的核心理念是“等待”,Traffic Policing的核心理念是“丟棄”。它們是兩種常見的流速控制方法。

演算法所需的引數

現在,再回顧一下上面的圖,可以看出演算法只需要兩個引數:

1)桶漏水的速率

2)桶的大小

核心:

利用桶模型判斷何時的流量達到異常了

外延:

1)流量異常時的處理方法:traffic policing v.s. traffic shaping

2)處理的資料包是否定長:定長 v.s. 變長

3)桶的大小是否等同於每個tick放行的水量:as a queue v.s. as a meter

總結

回頭再看,其實Leaky Bucket是一個很簡單的想法,在處理流量控制上也能有不錯的效果。wiki上的資料非常繁複,看了我一個下午。其實更多的是在大家運用這個詞時的情景多種多樣,而沒有很好地敘述出演算法的核心和外延。

我這裡做學習筆記,其實主要也是為了理清自己在學習Leaky Bucket時的混亂,試圖真正搞清楚哪些是核心,哪些是外延。

注意事項

在學習的過程中,我發現網上所有的中文資料在談及Leaky Bucket(漏桶)和Token Bucket(令牌桶)演算法時,都是把漏桶看作wiki解釋中的第二種。所以,以上文章裡的“漏桶”和本文的“Leaky Bucket”並不等價。

我個人倒是並不反對用漏桶來指代wiki的第二種解釋,因為這樣就可以明確區分出“漏桶”和“令牌桶”。但是,在這種解釋下,我們需要牢記,“漏桶”就只是“令牌桶”的一個特例而已了。

令牌桶演算法

令牌桶演算法基於這樣的場景的模擬:
有一個裝有token且token數量固定的桶,token新增的速率時固定的,當有請求來(或者資料包到達),會檢查下桶中是否包含足夠多的token(一個請求可能需要多個token)。對於資料包而言,資料包的長度等同於需要獲取的token數量。即從桶中消費token,若token數量足夠,則消費掉,不夠則根據不同的策略處理(阻塞當前或提前消費等)。

Guava Ratelimiter實現

Guava實現更接近於令牌桶演算法:將一秒鐘切割為令牌數的時間片段,每個時間片段等同於一個token。

關鍵變數:

  • nextFreeTicketMicros:表示下一次允許補充許可的時間(時刻)。這個變數的解釋比較拗口,看下面流程會比較清晰
  • maxPermits:最大許可數
  • storedPermits:儲存的許可數,數量不能超過最大許可數

實現

這裡有一個關鍵方法(重)同步方法,在初始化以及獲取操作時都會用到:

void resync(long nowMicros) {
  // if nextFreeTicket is in the past, resync to now
  if (nowMicros > nextFreeTicketMicros) {
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}

如果當前時間(不是時刻,而是自建立起所流經的時間,下同)超過了上一次所設定的nextFreeTicketMicros時間,則會重新進行同步:

  1. 通過計算上一次設定nextFreeTicketMicros到當前時刻的時間差獲取新增的可用許可數;
  2. 計算可用的許可數:如果新增的許可數+原有的許可數小於最大許可數,則儲存的許可數增加新增的數量,否則同步為最大許可數;
  3. 同步下一次允許補充許可時間為當前時間

初始化

static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}

這裡使用一個StopWatch來計時,主要是獲取自限速器建立所流經的時間。
初始化關鍵變數(其實就是通過resync方法來實現主要邏輯的):
nextFreeTicketMicros為當前時間;maxPermits為傳入的每秒允許的許可數;storedPermits則為0

img

初始化

獲取許可(acquire)

獲取一定數量的許可,如果獲取不到,則阻塞相應時間,然後獲取相應許可。並返回當前操作所等待的時間。

  1. 嘗試resync操作
  2. 返回值所需等待時間設定為min(nextFreeTicketMicros-nowMicros,0)
  3. 實際消耗的許可數:min(請求許可數,儲存許可數中的小值);
  4. 需要重新整理獲取的許可數(freshPermits):請求許可數-實際消耗許可數
  5. 等待時間(waitMicros):需要重新整理獲取的許可數(freshPermits)*每個許可數所需時間
  6. 下一次允許補充許可時間(nextFreeTicketMicros)同步為:nextFreeTicketMicros+=waitMicros
  7. 更新剩餘儲存的許可數:儲存許可數-本次實際消耗許可數

根據resync方法條件:if (nowMicros > nextFreeTicketMicros)不難發現,如果申請獲取的許可數多於剩餘可分配的許可數,更新後的nextFreeTicketMicros時間會超過nowMicros,但是當前請求所需等待時間為0。即對於超量許可申請(大於當前可提供的許可數),等待操作是在下一次請求時才會發生。通俗點說就是:前人挖坑後人跳。

nextFreeTicketMicros早於當前時間,且許可數足夠的情況:

img

nextFreeTicketMicros早於nowMicros且許可足夠

nextFreeTicketMicros早於當前,但是許可數不夠的情況:

img

nextFreeTicketMicros早於nowMicros但許可不足

nextFreeTicketMicros晚於當前時間,主要是阻塞時間計算,許可數分發以及時間計算等同上兩場景。

嘗試獲取許可(tryAcquire)
如果nextFreeTicketMicros-timeout<=nowMicros,說明經過超時時間內也不會有一個許可可以分配(按上描述,只要有許可,就可用分配,無論申請的數量有多少),則tryAcquire操作直接返回false。否則按照acquire操作流程獲取許可資訊。

預熱(warmingup)
首先申請一個容量為100(每秒)的限流器,然後多執行緒併發獲取許可,併發數量為20,且每個執行緒只獲取一次。
附上測試程式碼:

public void testCurrent(){
  RateLimiter rateLimiter = RateLimiter.create(100);
  ExecutorService executorService = Executors.newFixedThreadPool(100);
  Runnable runnable = ()->{
    if(!rateLimiter.tryAcquire(1,100,TimeUnit.MILLISECONDS)){
      System.out.println("F"+Thread.currentThread().getName());
    }else {
      System.out.println("A"+Thread.currentThread().getName());
    }
  };
  for (int i = 0; i < 20; i++) {
    executorService.execute(runnable);
  }
  try {
    executorService.awaitTermination(1,TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

按上演算法描述應當不會出現F開頭的輸出,但是實際卻發現20次輸出基本有小半數的嘗試獲取失敗:

1489453467102 pool-1-thread-1
1489453467102 pool-1-thread-2
1489453467104 pool-1-thread-3
1489453467104 pool-1-thread-4
1489453467105 pool-1-thread-5
1489453467105 pool-1-thread-6
1489453467105 pool-1-thread-7
1489453467107 pool-1-thread-8
1489453467107 pool-1-thread-9
F 1489453467108 pool-1-thread-15
F 1489453467108 pool-1-thread-16
F 1489453467109 pool-1-thread-17
F 1489453467109 pool-1-thread-18
F 1489453467109 pool-1-thread-19
F 1489453467109 pool-1-thread-20
1489453467219 pool-1-thread-10
1489453467239 pool-1-thread-11
1489453467259 pool-1-thread-12
1489453467274 pool-1-thread-13
1489453467297 pool-1-thread-14

問題來自於初始化時,storedPermits儲存的許可數為0,而第一個執行緒進行獲取時,離初始時時間非常近,導致第一個執行緒獲取許可後,儲存的可用許可數並非為宣告的最大許可數,從而導致後續執行緒嘗試獲取幾次後會耗盡儲存的許可數,繼而導致tryAcquire操作失敗。

Guava-RateLimiter詳解2

常用的限流演算法有漏桶演算法和令牌桶演算法,guava的RateLimiter使用的是令牌桶演算法,也就是以固定的頻率向桶中放入令牌,例如一秒鐘10枚令牌,實際業務在每次響應請求之前都從桶中獲取令牌,只有取到令牌的請求才會被成功響應,獲取的方式有兩種:阻塞等待令牌或者取不到立即返回失敗.

本次實戰,我們用的是guava的RateLimiter,場景是spring mvc在處理請求時候,從桶中申請令牌,申請到了就成功響應,申請不到時直接返回失敗。

例項一

1、新增guava jar包

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>18.0</version>
    </dependency>

2、AccessLimitService.java 限流服務封裝到一個類中AccessLimitService,提供tryAcquire()方法,用來嘗試獲取令牌,返回true表示獲取到

@Service
public class AccessLimitService {

    //每秒只發出5個令牌
    RateLimiter rateLimiter = RateLimiter.create(5.0);

    /**
     * 嘗試獲取令牌
     * @return
     */
    public boolean tryAcquire(){
        return rateLimiter.tryAcquire();
    }
}

3、Controller層每次收到請求的時候都嘗試去獲取令牌,獲取成功和失敗列印不同的資訊

@Controller
public class HelloController {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private AccessLimitService accessLimitService;

    @RequestMapping("/access")
    @ResponseBody
    public String access(){
        //嘗試獲取令牌
        if(accessLimitService.tryAcquire()){
            //模擬業務執行500毫秒
            try {
                Thread.sleep(500);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "aceess success [" + sdf.format(new Date()) + "]";
        }else{
            return "aceess limit [" + sdf.format(new Date()) + "]";
        }
    }
}

4、測試:十個執行緒併發訪問介面

public class AccessClient {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

    /**
     * get請求
     * @param realUrl
     * @return
     */
    public static String sendGet(URL realUrl) {
        String result = "";
        BufferedReader in = null;
        try {
            // 開啟和URL之間的連線
            URLConnection connection = realUrl.openConnection();
            // 設定通用的請求屬性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立實際的連線
            connection.connect();

            // 定義 BufferedReader輸入流來讀取URL的響應
            in = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("傳送GET請求出現異常!" + e);
            e.printStackTrace();
        }
        // 使用finally塊來關閉輸入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }



    public void access() throws Exception{
        final URL url = new URL("http://localhost:8080/guavalimitdemo/access");

        for(int i=0;i<10;i++) {
            fixedThreadPool.submit(new Runnable() {
                public void run() {
                    System.out.println(sendGet(url));
                }
            });
        }

        fixedThreadPool.shutdown();
        fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws Exception{
        AccessClient accessClient = new AccessClient();
        accessClient.access();
    }
}

部分請求由於獲取的令牌可以成功執行,其餘請求沒有拿到令牌,我們可以根據實際業務來做區分處理。還有一點要注意,我們通過RateLimiter.create(5.0)配置的是每一秒5枚令牌,但是限流的時候發出的是6枚,改用其他值驗證,也是實際的比配置的大1。

以上就是快速實現限流的實戰過程,此處僅是單程序服務的限流,而實際的分散式服務中會考慮更多因素,會複雜很多。

RateLimiter方法摘要

修飾符和型別 方法和描述
double acquire() 從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求
double **acquire(int permits)**從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求
static RateLimiter create(double permitsPerSecond)根據指定的穩定吞吐率建立RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)
static RateLimiter **create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)**根據指定的穩定吞吐率和預熱期來建立RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)
double **getRate()**返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數
void **setRate(double permitsPerSecond)**更新RateLimite的穩定速率,引數permitsPerSecond 由構造RateLimiter的工廠方法提供。
String toString()返回物件的字元表現形式
boolean **tryAcquire()**從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話
boolean **tryAcquire(int permits)**從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話
boolean **tryAcquire(int permits, long timeout, TimeUnit unit)**從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那麼立即返回false (無需等待)
boolean **tryAcquire(long timeout, TimeUnit unit)**從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那麼立即返回false(無需等待)
  • 舉例來說明如何使用RateLimiter,想象下我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個:
//速率是每秒兩個許可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // 也許需要等待
        executor.execute(task);
    }
}

例項二

package com.didispace.demo;

import com.google.common.util.concurrent.RateLimiter;

import java.util.Random;
import java.util.concurrent.CountDownLatch;


/**
 * Created by daizhao.
 * User: tony
 * Date: 2018-10-30
 * Time: 10:01
 * info: RateLimiter限流技術(令牌桶演算法)
 */
public class TestRateLimiter {
    //每秒只發出10個令牌
    RateLimiter rateLimiter = RateLimiter.create(10);

    public void sendRequest() {
        if (rateLimiter.tryAcquire()) {//獲取令牌成功
            System.out.println(Thread.currentThread().getName() + "請求成功");
        } else {
            System.out.println(Thread.currentThread().getName() + "請求失敗");
        }
    }

    public static void main(String[] args) {
        TestRateLimiter testRateLimiter = new TestRateLimiter();
        CountDownLatch countDownLatch = new CountDownLatch(1);//發令槍
        Random random = new Random(10);
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                try {
                    countDownLatch.await();//所有執行緒阻塞在此
                    Thread.sleep(random.nextInt(1000));
                    testRateLimiter.sendRequest();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "tt").start();
        }
        countDownLatch.countDown();//所有執行緒同時開啟
    }

}