1. 程式人生 > >分散式環境下限流方案的實現

分散式環境下限流方案的實現

  • 業務背景介紹
    對於web應用的限流,光看標題,似乎過於抽象,難以理解,那我們還是以具體的某一個應用場景來引入這個話題吧。
    在日常生活中,我們肯定收到過不少不少這樣的簡訊,“雙11約嗎?,千款….”,“您有幸獲得唱讀卡,趕快戳連結…”。這種型別的簡訊是屬於推廣性質的簡訊。為什麼我要說這個呢?聽我慢慢道來。
    一般而言,對於推廣營銷類簡訊,它們針對某一群體(譬如註冊會員)進行定點推送,有時這個群體的成員量比較大,譬如京東的會員,可以達到千萬級別。因此相應的,傳送推廣簡訊的量也會增大。然而,要完成這些簡訊傳送,我們是需要呼叫服務商的介面來完成的。倘若一次傳送的量在200萬條,而我們的服務商介面每秒能處理的簡訊傳送量有限,只能達到200條每秒。那麼這個時候就會產生問題了,我們如何能控制好程式傳送簡訊時的速度暱?於是限流這個功能就得加上了
  • 生產環境背景
    1、服務商介面所能提供的服務上限是400條/s
    2、業務方呼叫簡訊傳送介面的速度未知,QPS可能達到800/s,1200/s,或者更高
    3、當服務商介面訪問頻率超過400/s時,超過的量將拒絕服務,多出的資訊將會丟失
    4、線上為多節點佈置,但呼叫的是同一個服務商介面
  • 需求分析
    1、鑑於業務方對簡訊傳送介面的呼叫頻率未知,而服務商的介面服務有上限,為保證服務的可用性,業務層需要對介面呼叫方的流量進行限制—–介面限流
  • 需求設計
    方案一、在提供給業務方的Controller層進行控制。
    1、使用guava提供工具庫裡的RateLimiter類(內部採用令牌捅演算法實現)進行限流
<!--核心程式碼片段-->
private RateLimiter rateLimiter = RateLimiter.create(400);//400表示每秒允許處理的量是400 if(rateLimiter.tryAcquire()) { //簡訊傳送邏輯可以在此處 }

2、使用Java自帶delayqueue的延遲佇列實現(編碼過程相對麻煩,此處省略程式碼)

3、使用redis實現,儲存兩個key,一個用於計時,一個用於計數。請求每呼叫一次,計數器增加1,若在計時器時間內計數器未超過閾值,則可以處理任務

 if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {            cacheDao.putToValue(API_WEB_TIME_KEY,0
,(long)1, TimeUnit.SECONDS); } if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) { LOGGER.info("呼叫頻率過快"); } //簡訊傳送邏輯

方案二、在簡訊傳送至服務商時做限流處理
方案三、同時使用方案一和方案二

  • 可行性分析
    最快捷且有效的方式是使用RateLimiter實現,但是這很容易踩到一個坑,單節點模式下,使用RateLimiter進行限流一點問題都沒有。但是…線上是分散式系統,佈署了多個節點,而且多個節點最終呼叫的是同一個簡訊服務商介面。雖然我們對單個節點能做到將QPS限制在400/s,但是多節點條件下,如果每個節點均是400/s,那麼到服務商那邊的總請求就是節點數x400/s,於是限流效果失效。使用該方案對單節點的閾值控制是難以適應分散式環境的,至少目前我還沒想到更為合適的方式。
    對於第二種,使用delayqueue方式。其實主要存在兩個問題,1:簡訊系統本身就用了一層訊息佇列,有用kafka,或者rabitmq,如果再加一層延遲佇列,從設計上來說是不太合適的。2:實現delayqueue的過程相對較麻煩,耗時可能比較長,而且達不到精準限流的效果
    對於第三種,使用redis進行限流,其很好地解決了分散式環境下多例項所導致的併發問題。因為使用redis設定的計時器和計數器均是全域性唯一的,不管多少個節點,它們使用的都是同樣的計時器和計數器,因此可以做到非常精準的流控。同時,這種方案編碼並不複雜,可能需要的程式碼不超過10行。

  • 實施方案
    根據可行性分析可知,整個系統採取redis限流處理是成本最低且最高效的。
    具體實現

    1、在Controller層設定兩個全域性key,一個用於計數,另一個用於計時

private static final String API_WEB_TIME_KEY = "time_key";

    private static final String API_WEB_COUNTER_KEY = "counter_key";

2、對時間key的存在與否進行判斷,並對計數器是否超過閾值進行判斷

if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {

            cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
            cacheDao.putToValue(API_WEB_COUNTER_KEY,0,(long)2, TimeUnit.SECONDS);//時間到就重新初始化為

        }

        if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {


            LOGGER.info("呼叫頻率過快");

        }
         //簡訊傳送邏輯

實施結果
可以達到非常精準的流控,截圖會在後續的過程中貼出來。歡迎有疑問的小夥伴們在評論區提出問題,我看到後儘量抽時間回答的