Sentinel學習筆記(2)-- 流量控制程式碼分析
前言
接上文, 在瞭解了Sentinel如何進行流量統計之後,我們就可以來看看Sentinel是如何完成限流操作的了。根據之前的描述,我們先還是來看下整個Slot Chain流程圖:

Slot Chain 呼叫鏈
從上圖中我們可以看到,限流操作應該是在緊跟StatisticSlot的FlowSlot中完成的。那我們就去FlowSlot中一探究竟。
流量控制
下圖是流量控制所在的程式碼組織:

FlowSlot 所在
按慣例,我們先來看看FlowSlot中的entry方法:
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { //檢查是否能夠限流通過 FlowRuleManager.checkFlow(resourceWrapper, context, node, count); //呼叫責任鏈下游的Slot的entry fireEntry(context, resourceWrapper, node, count, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
這段程式碼很簡單,我們重定向到FlowRuleManager的checkFlow方法:
public static void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { //獲取呼叫的resource對應的所有限流規則 List<FlowRule> rules = flowRules.get(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //逐個規則判斷是否觸發限流操作 if (!rule.passCheck(context, node, count)) { //如果passCheck失敗,說明觸發限流,直接丟擲FlowException throw new FlowException(rule.getLimitApp()); } } } }
我們馬不停蹄再到FlowRule的passCheck:
@Override public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { //是否有呼叫方需要被限流,預設情況下limitApp為'default' String limitApp = this.getLimitApp(); //正常限流不會觸發這種情況 if (limitApp == null) { return true; } // 獲得上下文的呼叫方,這裡context會在後面的文章中分析 String origin = context.getOrigin(); //根據呼叫方和上下問以及FlowRule所配置的Strategy來獲取應該用於限流的統計Node //這個Node可以參照上文所描述的StatisticNode Node selectedNode = selectNodeByRequesterAndStrategy(origin, context, node); //如果沒有合乎規則的Node,則直接返回true,表示通過 if (selectedNode == null) { return true; } //如果存在統計Node, //則通過controller來判斷是否需要限流 //這個controller通過設定FlowRule的controllerBehavior來區分 //預設的實現有:0. default, 1. warm up, 2. rate limiter return controller.canPass(selectedNode, acquireCount); }
通過上面程式碼的分析,我們知道真正的限流邏輯藏在了FlowRule的controller裡面,而這個controller有三種實現,我們就挨個來看
Default 預設方式
@Override public boolean canPass(Node node, int acquireCount) { //獲取已經使用過的令牌 int curCount = avgUsedTokens(node); //如果使用過的令牌數目加上這次的超過了限流的數目 //則返回false,表示不能通過 if (curCount + acquireCount > count) { return false; } //否則返回true return true; } private int avgUsedTokens(Node node) { //一般無法觸發 if (node == null) { return -1; } //如果按照併發執行緒數限流則返回統計中的執行緒數目 //否則就返回現在已經pass的Qps數目 return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps(); }
這段程式碼很簡單了,邏輯也都在註釋中,值得注意的是兩點:
- 這裡沒有用任何加鎖操作,這裡我跟開發團隊交流得到的結論是這裡為了效能儘可能的實現了lock free,但是會導致在臨界狀態下多放,因為從這次返回true到entry呼叫回到StatisticSlot中addPass會有一個時間視窗,在加上多執行緒的環境自然就會有競態問題。不過這相當於是效能與正確性的一個trade-off,大家自己可以用Sentinel提供的demo測試下,在32執行緒併發下偶爾會有多放,但是也確實是能夠接受的。
- 這裡的QPS限流與我之前文章所描述的令牌桶演算法也有區別,這裡相當於是在當前時間一開始就把QPS個令牌全部放出,而不是每一個1000/QPS個毫秒間隔釋放一個令牌,這也是一種操作上的簡化。
Warm Up 冷啟動方式
根據文件描述:
該方式主要用於系統長期處於低水位的情況下,當流量突然增加時,直接把系統拉昇到高水位可能瞬間把系統壓垮。通過"冷啟動",讓通過的流量緩慢增加,在一定時間內逐漸增加到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮的情況。
根據註釋,Sentinel的預熱限流實現參照了Guava的SmoothWarmUp 限流器的演算法,我在這篇文章中對SmoothWarmUp的所用的演算法和實現做過介紹,不清楚的同學請先移步,這篇文章就不再敘述了。
我們來看看程式碼:
@Override public boolean canPass(Node node, int acquireCount) { long passQps = node.passQps(); long previousQps = node.previousPassQps(); //裝填令牌桶 syncToken(previousQps); // 開始計算它的斜率 // 如果進入了警戒線,開始調整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope+1/count //這裡相當於計算了一個在警戒線以上的interval //計算依據可以參考guava的依靠斜率來計算梯形面積部分程式碼 //1.0/count 為恆定速率 //有了interval 那麼1.0/interval即為這時候期望的QPS double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); //拿這個QPS來做比較 if (passQps + acquireCount <= warningQps) { return true; } } else { //如果在警戒線下,就退化為基本的QPS流控 if (passQps + acquireCount <= count) { return true; } } return false; } private void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); //獲取當前秒 currentTime = currentTime - currentTime % 1000; //獲取上一次裝填令牌桶的時間 long oldLastFillTime = lastFilledTime.get(); //如果當前秒數小於等於上次裝填時間 //則return,表示一秒鐘內只需要裝填一次 if (currentTime <= oldLastFillTime) { return; } //獲取目前令牌桶中的令牌數目 long oldValue = storedTokens.get(); //根據當前時間和前一秒pass的Qps來裝填 long newValue = coolDownTokens(currentTime, passQps); //如果CAS成功,那做下一步操作 //就算衝突也無所謂,保證一秒裝填一次就行 if (storedTokens.compareAndSet(oldValue, newValue)) { //將前一秒的QPS減去 //注意點:如果沒有前面保證一秒鐘內只修改一次的策略,會有多減的可能性 long currentValue = storedTokens.addAndGet(0 - passQps); //不要成為負數 if (currentValue < 0) { storedTokens.set(0L); } //修改裝填時間 lastFilledTime.set(currentTime); } } private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 新增令牌的判斷前提條件: // 當令牌的消耗程度遠遠低於警戒線的時候 //這裡將裝填的速率恆定為1.0/QPS //那麼裝填後的值即為oldValue + time / (1000 / count) if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { //如果之前的令牌桶數目已經高於警戒線,那麼看pass了的QPS是否小於一個閾值,決定是否裝填 if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); }
一些基礎的變數與Guava實現的對應關係如下:
- warningToken 對應 thresholdPermits
- maxToken 對應 maxPermits
- slop 不變
計算方式也與Guava實現一致。大家需要注意的就是canPass中首先通過syncToken重新裝填了一波令牌桶,並且把前一秒的Qps減掉,然後再根據當前令牌桶中剩餘的來計算是否需要限流。這一塊可能需要大家多理解一下Guava的演算法就清楚了。
Rate Limiter(PaceController) 勻速器
這裡作者使用了漏桶演算法來完成勻速器的功能,主要是做到了一個流量整形的功能,這樣的功能非常適合處理突發性流量的處理,比方說高併發整形到勻速佇列中,我們來看看程式碼:
@Override public boolean canPass(Node node, int acquireCount) { // 按照斜率來計算計劃中應該什麼時候通過(原始碼註釋) // 這裡獲取當前時間 long currentTime = TimeUtil.currentTimeMillis(); //獲取acquireCount個令牌需要的時間 long costTime = Math.round(1.0 * (acquireCount) / count * 1000); //期待時間(原始碼註釋) //上次pass的時間加上這次獲取需要花費的時間就得到了期望的時間 long expectedTime = costTime + latestPassedTime.get(); //如果期望時間小於等於當前時間 //說明現在立馬就能夠獲得令牌 if (expectedTime <= currentTime) { //這裡會有衝突,然而衝突就衝突吧.(原始碼註釋) latestPassedTime.set(currentTime); return true; } else { //如果期望時間大於當前時間 // 計算自己需要的等待時間(原始碼註釋) long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); //如果等待的時間大於等於最大的佇列等待時 if (waitTime >= maxQueueingTimeMs) { //返回false標明這一次需要被限流 return false; } else { //這裡應用了一個類似於double check的做法 //先呼叫latestPassedTime這個AtomicLong的原子加 //並獲取原子加後的新值(確實不知道為啥叫old) long oldTime = latestPassedTime.addAndGet(costTime); try { //再做一次判斷,如果這時候有併發的原子加操作 //那麼總有執行緒得到的oldTime與之前期望的不一致 //那麼在計算一次waitTime waitTime = oldTime - TimeUtil.currentTimeMillis(); //如果這時候waitTime超過了最長佇列等待時間 if (waitTime >= maxQueueingTimeMs) { //把加上去的時間又給減回去 //並且返回false,觸發限流 latestPassedTime.addAndGet(-costTime); return false; } //如果到達了這裡,恭喜你,你已經通過了勻速器的考驗 //現在需要做的就是等待該等待的時間,然後返回false //完成勻速器的使命 Thread.sleep(waitTime); return true; } catch (InterruptedException e) { } } } //正常情況不該到這兒,返回false return false; }
大致的程式碼的意義我也在註釋中加以說明,這裡通過一些比較巧妙的原子操作和雙重檢查來完成了勻速器的一個功能。
結語
測試部分我這裡就不再貼結果了,Sentinel提供了比較完備的demo來測試各種情況,程式碼在這裡,有疑問的同學可以自行執行:

測試程式碼所在
本篇文章通過對Sentinel提供的三種限流方案做了程式碼分析,希望大家能夠喜歡!
同時我這裡也做一點點猜想,對於叢集限流,是否可以不動用統計程式碼,只是在流控中將本機的數目上傳集中儲存(例如redis),並拉取整個叢集的流量資料來本地做限流?這樣可以儘可能較少程式碼量,並保證功能,當然按照Sentinel的roadmap,11月份應該就會提供叢集限流功能,我也很期待到時候的實現會怎樣?