基於redis和lua的分散式限流器設計與實現
前言
在之前這篇文章中,我大致介紹了一下google guava庫中的RateLimiter的實現以及它背後的令牌桶演算法原理。但是也有新的問題,在分散式的環境中,我們如何針對多機環境做限流呢?在查閱了一些資料和其他人的部落格之後,我採用了redis來作為限流器的實現基礎。
原因主要有以下幾點:
- redis作為高效能快取系統,效能上能夠滿足多機之間高併發訪問的要求
- redis有比較好的api來支援限流器令牌桶演算法的實現
- 對於我們的系統來說,通過spring data redis來操作比較簡單和常見,避免了引入新的中介軟體帶來的風險
但是我們也知道,限流器在每次請求令牌和放入令牌操作中,存在一個協同的問題,即獲取令牌操作要儘可能保證原子性,否則無法保證限流器是否能正常工作。在RateLimiter的實現中使用了mutex作為互斥鎖來保證操作的原子性,那麼在redis中就需要一個類似於事務的機制來保證獲取令牌中多重操作的原子性。
面對這樣的需求,我們有幾個選擇:
- 用redis實現分散式鎖來保證操作的原子性,這個方案實現起來應該比較簡單,分散式鎖有現成的例子,然後就是把Rate Limiter的程式碼套用分散式鎖就行了,但是這樣的話效率會顯得不太高,特別是在大量訪問的情況下。
- 用redis的transaction,在我查閱redis官方文件和stackoverflow之後發現redis的transaction官方並不推薦,並且有可能在未來取消事務,因此不可取。
- 通過redis分散式鎖和本地鎖組成一個雙層結構,每次分散式獲取鎖之後可以預支一部分令牌量,然後放到本地通過本地的鎖來分配這些令牌,消耗完之後再到請求redis。這樣的好處是相比第一個方案,網路訪問延遲開銷會比較好,但是實現難度和複雜程度比較難估量,而且這樣的做法如果在多機不能保證均勻分配流量的情況下並不理想
- 通過將獲取鎖封裝到lua指令碼中,提交給redis進行eval和evalsha操作來完成lua指令碼的執行,由於lua指令碼在redis中天然的原子性,我們的需求能夠比較好的滿足,問題是將業務邏輯封裝在lua中,對於開發人員自身的能力和除錯存在一定的問題。
經過權衡,我採用了第四種方式,通過redis和lua來編寫令牌桶演算法來完成分散式限流的需求。
lua指令碼
話不多說,先貼出lua程式碼
-- 返回碼 1:操作成功 0:未配置 -1: 獲取失敗 -2:修改錯誤,建議重新初始化 -500:不支援的操作 -- redis hashmap 中存放的內容: -- last_mill_second 上次放入令牌或者初始化的時間 -- stored_permits 目前令牌桶中的令牌數量 -- max_permits 令牌桶容量 -- interval 放令牌間隔 -- app 一個標誌位,表示對於當前key有沒有限流存在 local SUCCESS = 1 local NO_LIMIT = 0 local ACQUIRE_FAIL = -1 local MODIFY_ERROR = -2 local UNSUPPORT_METHOD = -500 local ratelimit_info = redis.pcall("HMGET",KEYS[1], "last_mill_second", "stored_permits", "max_permits", "interval", "app") local last_mill_second = ratelimit_info[1] local stored_permits = tonumber(ratelimit_info[2]) local max_permits = tonumber(ratelimit_info[3]) local interval = tonumber(ratelimit_info[4]) local app = ratelimit_info[5] local method = ARGV[1] --獲取當前毫秒 --考慮主從策略和腳本回放機制,這個time由客戶端獲取傳入 --local curr_time_arr = redis.call('TIME') --local curr_timestamp = curr_time_arr[1] * 1000 + curr_time_arr[2]/1000 local curr_timestamp = tonumber(ARGV[2]) -- 當前方法為初始化 if method == 'init' then --如果app不為null說明已經初始化過,不要重複初始化 if(type(app) ~='boolean' and app ~=nil) then return SUCCESS end redis.pcall("HMSET", KEYS[1], "last_mill_second", curr_timestamp, "stored_permits", ARGV[3], "max_permits", ARGV[4], "interval", ARGV[5], "app", ARGV[6]) --始終返回成功 return SUCCESS end -- 當前方法為修改配置 if method == "modify" then if(type(app) =='boolean' or app ==nil) then return MODIFY_ERROR end --只能修改max_permits和interval redis.pcall("HMSET", KEYS[1], "max_permits", ARGV[3], "interval", ARGV[4]) return SUCCESS end -- 當前方法為刪除 if method == "delete" then --已經清除完畢 if(type(app) =='boolean' or app ==nil) then return SUCCESS end redis.pcall("DEL", KEYS[1]) return SUCCESS end -- 嘗試獲取permits if method == "acquire" then -- 如果app為null說明沒有對這個進行任何配置,返回0代表不限流 if(type(app) =='boolean' or app ==nil) then return NO_LIMIT end --需要獲取令牌數量 local acquire_permits = tonumber(ARGV[3]) --計算上一次放令牌到現在的時間間隔中,一共應該放入多少令牌 local reserve_permits = math.max(0, math.floor((curr_timestamp - last_mill_second) / interval)) local new_permits = math.min(max_permits, stored_permits + reserve_permits) local result = ACQUIRE_FAIL --如果桶中令牌數量夠則放行 if new_permits >= acquire_permits then result = SUCCESS new_permits = new_permits - acquire_permits end --更新當前桶中的令牌數量 redis.pcall("HSET", KEYS[1], "stored_permits", new_permits) --如果這次有放入令牌,則更新時間 if reserve_permits > 0 then redis.pcall("HSET", KEYS[1], "last_mill_second", curr_timestamp) end return result end return UNSUPPORT_METHOD
絕大部分邏輯在註釋裡面都已經寫清楚了(我java客戶端用的程式碼刪掉了所有的註釋,因為提交上去報編譯錯誤,但是redis-cli除錯就沒問題,我也沒太關注原因)。
大致上,我在這個指令碼中編寫了4種函式:
- init 初始化限流器
- modify 修改限流器配置(主要針對限流器的桶大小和放令牌間隔,即1/QPS)
- delete 刪除限流器配置
- acquire 嘗試獲取制定數目的令牌
程式碼基本上仿照了Guava RateLimiter的邏輯,實現了觸發式的放令牌策略。
由於我的需求中不需要像guava RateLimiter那樣的預支令牌的邏輯,因此如果當前沒有令牌可供服務,我就直接返回獲取失敗了。
還有一點需要注意的是,我本來在指令碼中寫了獲取redis伺服器當前時間的程式碼,但是我通過redis-cli執行的時候報錯了:
Write commands not allowed after non deterministic commands.
這個錯誤的原因大家可以參見ofollow,noindex">這篇文章 ,大致原因跟redis叢集的重放和備份策略有關,相當於我呼叫TIME操作,會在主從各執行一次,得到的結果肯定會存在差異,這個差異就給最終邏輯正確性帶來了不確定性。在redis 4.0之後引入了redis.replicate_commands()來放開限制。但我考慮了幾個因素之後,還是採用網上大部分人的做法,在執行前先行獲取到redis的時間戳,然後當做引數傳上去。
lua除錯
對lua除錯最開始花掉了我不少時間,主要對於redis-cli命令不太熟悉。大家有一樣問題的可以參見這篇文章 。大致來說就是將寫好的指令碼放到redis所在資料夾下(我是windows環境),然後在cmd下執行 redis-cli.exe --eval rate_limit.lua test2(key,可重複) , (逗號分隔) init 10101 100 100 10 test2 (後跟引數,空格隔開)。
java整合
在完成了lua的除錯工作之後,我們就開始java部分的整合程式碼編寫,我們使用的是spring boot來完成開發。
第一部分是redis配置:
@Bean("rateLimitLua") public DefaultRedisScript<Long> getRateLimitScript() { DefaultRedisScript<Long> rateLimitLua = new DefaultRedisScript<>(); rateLimitLua.setLocation(new ClassPathResource("rate_limit.lua")); rateLimitLua.setResultType(Long.class); return rateLimitLua; }
然後是一些與lua適配的列舉和一些bean:
/** * @author: Yuanqing Luo * @date: 2018/10/22 * * 限流的具體方法 */ public enum RateLimitMethod { //initialize rate limiter init, //modify rate limiter parameter modify, //delete rate limiter delete, //acquire permits acquire; }
/** * @author: Yuanqing Luo * @date: 2018/10/22 * rate limite result **/ public enum RateLimitResult { SUCCESS(1L), NO_LIMIT(0L), ACQUIRE_FAIL(-1L), MODIFY_ERROR(-2L), UNSUPPORT_METHOD(-500L), ERROR(-505L); private Long code; RateLimitResult(Long code){ this.code = code; } public static RateLimitResult getResult(Long code){ for(RateLimitResult enums: RateLimitResult.values()){ if(enums.code.equals(code)){ return enums; } } throw new IllegalArgumentException("unknown rate limit return code:" + code); } }
/** * @author: Yuanqing Luo * @date: 2018/10/22 **/ @Getter @Setter public class RateLimitVo { private String url; private boolean isLimit; private Double interval; private Integer maxPermits; private Integer initialPermits; }
第三部分就是限流器的呼叫組裝部分:
/** * @author: Yuanqing Luo * @date: 2018/10/22 **/ @Service @Slf4j public class RateLimitClient { private static final String RATE_LIMIT_PREFIX = "ratelimit:"; @Autowired StringRedisTemplate redisTemplate; @Resource @Qualifier("rateLimitLua") RedisScript<Long> rateLimitScript; public RateLimitResult init(String key, RateLimitVo rateLimitInfo){ return exec(key, RateLimitMethod.init, rateLimitInfo.getInitialPermits(), rateLimitInfo.getMaxPermits(), rateLimitInfo.getInterval(), key); } public RateLimitResult modify(String key, RateLimitVo rateLimitInfo){ return exec(key, RateLimitMethod.modify, key, rateLimitInfo.getMaxPermits(), rateLimitInfo.getInterval()); } public RateLimitResult delete(String key){ return exec(key, RateLimitMethod.delete); } public RateLimitResult acquire(String key){ return acquire(key, 1); } public RateLimitResult acquire(String key, Integer permits){ return exec(key, RateLimitMethod.acquire, permits); } /** * 執行redis的具體方法,限制method,保證沒有其他的東西進來 * @param key * @param method * @param params * @return */ private RateLimitResult exec(String key, RateLimitMethod method, Object... params){ try { Long timestamp = getRedisTimestamp(); String[] allParams = new String[params.length + 2]; allParams[0] = method.name(); allParams[1] = timestamp.toString(); for(int index = 0;index < params.length; index++){ allParams[2 + index] = params[index].toString(); } Long result = redisTemplate.execute(rateLimitScript, Collections.singletonList(getKey(key)), allParams); return RateLimitResult.getResult(result); } catch (Exception e){ log.error("execute redis script fail, key:{}, method:{}", key, method.name(), e); return RateLimitResult.ERROR; } } private Long getRedisTimestamp(){ Long currMillSecond = redisTemplate.execute( (RedisCallback<Long>) redisConnection -> redisConnection.time() ); return currMillSecond; } private String getKey(String key){ return RATE_LIMIT_PREFIX + key; } }
java程式碼這塊比較簡單了,基本就是封裝了之前lua指令碼中的4項操作。
第四部分就是測試程式碼:
/** * @author: Yuanqing Luo * @date: 2018/10/22 **/ @RunWith(SpringRunner.class) @SpringBootTest(classes = OpenApiGatewayApplication.class) public class RateLimitTest { @Autowired private RateLimitClient rateLimitClient; @Test public void testInit(){ RateLimitVo vo = new RateLimitVo(); vo.setInitialPermits(500); vo.setMaxPermits(500); vo.setInterval(2.0); rateLimitClient.init("test", vo); } @Test public void testAcquire() throws InterruptedException { //10個執行緒 ExecutorService executorService = Executors.newFixedThreadPool(20); Subject<RateLimitSummary, RateLimitSummary> writeSubject = new SerializedSubject<RateLimitSummary, RateLimitSummary>(PublishSubject.<RateLimitSummary>create()); Observable<RateLimitSummary> readSubject = writeSubject.share(); Observable<RateLimitSummary> bucketStream = Observable.defer(()->{ return readSubject.window(200, TimeUnit.MILLISECONDS) .flatMap( observable-> observable.reduce(new RateLimitSummary(0,0,0), (a, b)-> a.reduce(b)) ); }); Observable<RateLimitSummary> rollingBucketStream = bucketStream.window(5, 1) .flatMap(observable->observable.reduce(new RateLimitSummary(0, 0, 0), (a, b)-> a.reduce(b))); Runnable acquire = () -> { Random random = new Random(); while(true){ try { Thread.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } RateLimitResult result = rateLimitClient.acquire("test"); writeSubject.onNext(new RateLimitSummary(result)); } }; //初始時間 final long currentMillis = System.currentTimeMillis(); rollingBucketStream.subscribe(summary->{ double timestamp = (System.currentTimeMillis() - currentMillis)/1000.0; System.out.println("time:"+ timestamp + ", acquired:" + summary.acquire + ", reject " + summary.reject + ", error: " + summary.error); }); for(int i=0;i<20;i++){ executorService.submit(acquire); } while(true){ Thread.sleep(5000); } } private static class RateLimitSummary{ public int acquire; public int reject; public int error; public RateLimitSummary(RateLimitResult result){ this.acquire = result == RateLimitResult.SUCCESS?1:0; this.reject = result == RateLimitResult.ACQUIRE_FAIL?1:0; this.error = result == RateLimitResult.ERROR?1:0; } public RateLimitSummary(int acquire, int reject, int error){ this.acquire = acquire; this.reject = reject; this.error = error; } public RateLimitSummary reduce(RateLimitSummary toAdd){ return new RateLimitSummary(this.acquire + toAdd.acquire, this.reject + toAdd.reject, this.error + toAdd.error); } } }
這一段程式碼我仿照了Hystrix中的熔斷統計的程式碼,通過一個subject來存放獲取令牌結果,然後通過第一層bucketStream來將令牌結果按照200ms來分組並且reduce成一個結果。接著通過rollingBucketStream來將200ms的分組組合成一個一秒的時間窗(即5個為一組),並且以200ms為步長滾動。最後統計出來的結果通過subscribe來列印結果。之前的init程式碼我們看已經初始化了一個大小為500的令牌桶,存放令牌的時間間隔為2.0ms,所以支援的QPS為500。接著我們執行這段程式碼,並擷取一部分輸出:
time:75.857, acquired:460, reject 8, error: 0 time:76.056, acquired:483, reject 36, error: 0 time:76.268, acquired:506, reject 52, error: 0 time:76.454, acquired:503, reject 59, error: 0 time:76.707, acquired:457, reject 69, error: 0 time:76.854, acquired:417, reject 66, error: 0 time:77.054, acquired:454, reject 36, error: 0 time:77.255, acquired:459, reject 54, error: 0 time:77.453, acquired:458, reject 77, error: 0 time:77.658, acquired:474, reject 103, error: 0 time:77.858, acquired:490, reject 132, error: 0
可以看到,這個結果基本每200ms輸出一次,然後一秒鐘內的獲取了令牌數目最大值跟500接近,並且能夠很好地處理reject。有一部分結果一秒鐘獲取的令牌數與500差距較大,我分析的原因是因為請求重複時間段比較多,很多請求發生在前一個獲取了令牌之後的2ms內,產生了reject。
結語
通過redis和lua,我實現了一個簡單的分散式限流器。通過上述程式碼,大家能看到一個大致的實現框架,並且通過測試程式碼完成了驗證。如果各位看官有什麼問題歡迎留言,希望能跟大家共同學習。