1. 程式人生 > >1.Sentinel原始碼分析—FlowRuleManager載入規則做了什麼?

1.Sentinel原始碼分析—FlowRuleManager載入規則做了什麼?

最近我很好奇在RPC中限流熔斷降級要怎麼做,hystrix已經1年多沒有更新了,感覺要被遺棄的感覺,那麼我就把眼光聚焦到了阿里的Sentinel,順便學習一下阿里的原始碼。

這一章我主要講的是FlowRuleManager在載入FlowRule的時候做了什麼,下一篇正式講Sentinel如何控制併發數的。

下面我給出一個簡化版的demo,這個demo只能單執行緒訪問,先把過程講清楚再講多執行緒版本。

初始化流量控制的規則:限定20個執行緒併發訪問

public class FlowThreadDemo {

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();
    private static AtomicInteger activeThread = new AtomicInteger();

    private static volatile boolean stop = false;
    private static final int threadCount = 100;

    private static int seconds = 60 + 40;
    private static volatile int methodBRunningTime = 2000;

    public static void main(String[] args) throws Exception {
        System.out.println(
            "MethodA will call methodB. After running for a while, methodB becomes fast, "
                + "which make methodA also become fast ");
        tick();
        initFlowRule();

        Entry methodA = null;
        try {
            TimeUnit.MILLISECONDS.sleep(5);
            methodA = SphU.entry("methodA");
            activeThread.incrementAndGet();
            //Entry methodB = SphU.entry("methodB");
            TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
            //methodB.exit();
            pass.addAndGet(1);
        } catch (BlockException e1) {
            block.incrementAndGet();
        } catch (Exception e2) {
            // biz exception
        } finally {
            total.incrementAndGet();
            if (methodA != null) {
                methodA.exit();
                activeThread.decrementAndGet();
            }
        }
    }

    private static void initFlowRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource("methodA");
        // set limit concurrent thread for 'methodA' to 20
        rule1.setCount(20);
        rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        rule1.setLimitApp("default");

        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " total qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                    + ", pass:" + oneSecondPass
                    + ", block:" + oneSecondBlock
                    + " activeThread:" + activeThread.get());
                if (seconds-- <= 0) {
                    stop = true;
                }
                if (seconds == 40) {
                    System.out.println("method B is running much faster; more requests are allowed to pass");
                    methodBRunningTime = 20;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                + ", block:" + block.get());
            System.exit(0);
        }
    }
}

FlowRuleManager

在這個demo中,首先會呼叫FlowRuleManager#loadRules進行規則註冊
我們先聊一下規則配置的程式碼:

private static void initFlowRule() {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource("methodA");
    // set limit concurrent thread for 'methodA' to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
    rule1.setLimitApp("default");

    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

這段程式碼裡面先定義一個流量控制規則,然後呼叫loadRules進行註冊。

FlowRuleManager初始化

FlowRuleManager
FlowRuleManager 類裡面有幾個靜態引數:

//規則集合
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
//監聽器
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
//用來監聽配置是否發生變化
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

//建立一個延遲的執行緒池
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
    new NamedThreadFactory("sentinel-metrics-record-task", true));

static {
    //設定監聽
    currentProperty.addListener(LISTENER);
    //每一秒鐘呼叫一次MetricTimerListener的run方法
    SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}

在初始化的時候會為靜態變數都賦上值。

在新建MetricTimerListener例項的時候做了很多事情,容我慢慢分析。

MetricTimerListener

public class MetricTimerListener implements Runnable {

    private static final MetricWriter metricWriter = new MetricWriter(SentinelConfig.singleMetricFileSize(),
        SentinelConfig.totalMetricFileCount());
       ....
}

首次初始化MetricTimerListener的時候會建立一個MetricWriter例項。我們先看傳入的兩個引數SentinelConfig.singleMetricFileSize()和SentinelConfig.totalMetricFileCount()。

SentinelConfig在首次初始化的時候會初始化靜態程式碼塊:

SentinelConfig

static {
    try {
        initialize();
        loadProps();
        resolveAppType();
        RecordLog.info("[SentinelConfig] Application type resolved: " + appType);
    } catch (Throwable ex) {
        RecordLog.warn("[SentinelConfig] Failed to initialize", ex);
        ex.printStackTrace();
    }
}

這段靜態程式碼塊主要是設定一下配置引數。

SentinelConfig#singleMetricFileSize
SentinelConfig#totalMetricFileCount

public static long singleMetricFileSize() {
    try {
        //獲取的是 1024 * 1024 * 50
        return Long.parseLong(props.get(SINGLE_METRIC_FILE_SIZE));
    } catch (Throwable throwable) {
        RecordLog.warn("[SentinelConfig] Parse singleMetricFileSize fail, use default value: "
                + DEFAULT_SINGLE_METRIC_FILE_SIZE, throwable);
        return DEFAULT_SINGLE_METRIC_FILE_SIZE;
    }
}

public static int totalMetricFileCount() {
    try {
        //預設是:6
        return Integer.parseInt(props.get(TOTAL_METRIC_FILE_COUNT));
    } catch (Throwable throwable) {
        RecordLog.warn("[SentinelConfig] Parse totalMetricFileCount fail, use default value: "
                + DEFAULT_TOTAL_METRIC_FILE_COUNT, throwable);
        return DEFAULT_TOTAL_METRIC_FILE_COUNT;
    }
}

singleMetricFileSize方法和totalMetricFileCount主要是獲取SentinelConfig在靜態變數裡設入得引數。

然後我們進入到MetricWriter的構造方法中:
MetricWriter

public MetricWriter(long singleFileSize, int totalFileCount) {
    if (singleFileSize <= 0 || totalFileCount <= 0) {
        throw new IllegalArgumentException();
    }
    RecordLog.info(
            "[MetricWriter] Creating new MetricWriter, singleFileSize=" + singleFileSize + ", totalFileCount="
                    + totalFileCount);
    //  /Users/luozhiyun/logs/csp/
    this.baseDir = METRIC_BASE_DIR;
    File dir = new File(baseDir);
    if (!dir.exists()) {
        dir.mkdirs();
    }

    long time = System.currentTimeMillis();
    //轉換成秒
    this.lastSecond = time / 1000;
    //singleFileSize = 1024 * 1024 * 50
    this.singleFileSize = singleFileSize;
    //totalFileCount = 6
    this.totalFileCount = totalFileCount;
    try {
        this.timeSecondBase = df.parse("1970-01-01 00:00:00").getTime() / 1000;
    } catch (Exception e) {
        RecordLog.warn("[MetricWriter] Create new MetricWriter error", e);
    }
}

構造器裡面主要是建立資料夾,設定單個檔案大小,總檔案個數,設定時間。

講完了MetricTimerListener的靜態屬性,現在我們來講MetricTimerListener的run方法。

MetricTimerListener#run

public void run() {
    //這個run方法裡面主要是做定時的資料採集,然後寫到log檔案裡去
    Map<Long, List<MetricNode>> maps = new TreeMap<Long, List<MetricNode>>();
    //遍歷叢集節點
    for (Entry<ResourceWrapper, ClusterNode> e : ClusterBuilderSlot.getClusterNodeMap().entrySet()) {
        String name = e.getKey().getName();
        ClusterNode node = e.getValue();
        Map<Long, MetricNode> metrics = node.metrics();
        aggregate(maps, metrics, name);
    }
    //彙總統計的資料
    aggregate(maps, Constants.ENTRY_NODE.metrics(), Constants.TOTAL_IN_RESOURCE_NAME);
    if (!maps.isEmpty()) {
        for (Entry<Long, List<MetricNode>> entry : maps.entrySet()) {
            try {
                //寫入日誌中
                metricWriter.write(entry.getKey(), entry.getValue());
            } catch (Exception e) {
                RecordLog.warn("[MetricTimerListener] Write metric error", e);
            }
        }
    }
}

上面的run方法其實就是每秒把統計的資料寫到日誌裡去。其中Constants.ENTRY_NODE.metrics()負責統計資料,我們下面分析以下這個方法。

Constants.ENTRY_NODE這句程式碼會例項化一個ClusterNode例項。
ClusterNode是繼承StatisticNode,統計資料時在StatisticNode中實現的。

Metrics方法也是呼叫的StatisticNode方法。

我們先看看StatisticNode的全域性變數

public class StatisticNode implements Node {
        //構建一個統計60s的資料,設定60個滑動視窗,每個視窗1s
        //這裡建立的是BucketLeapArray例項來進行統計
        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);
        //上次統計的時間戳
        private long lastFetchTime = -1;
        .....
}

然後我們看看StatisticNode的metrics方法:
StatisticNode#metrics

public Map<Long, MetricNode> metrics() {
    // The fetch operation is thread-safe under a single-thread scheduler pool.
    long currentTime = TimeUtil.currentTimeMillis();
    //獲取當前時間的滑動視窗的開始時間
    currentTime = currentTime - currentTime % 1000;
    Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
    //獲取滑動窗口裡統計的資料
    List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
    long newLastFetchTime = lastFetchTime;
    // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
    for (MetricNode node : nodesOfEverySecond) {
        //篩選符合的滑動視窗的節點
        if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
            metrics.put(node.getTimestamp(), node);
            //選出符合節點裡最大的時間戳資料賦值
            newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
        }
    }
    //設定成滑動窗口裡統計的最大時間
    lastFetchTime = newLastFetchTime;

    return metrics;
}

這個方法主要是呼叫rollingCounterInMinute進行資料的統計,然後篩選出有效的統計結果返回。

我們進入到rollingCounterInMinute是ArrayMetric的例項,所以我們進入到ArrayMetric的details方法中

ArrayMetric#details

public List<MetricNode> details() {
    List<MetricNode> details = new ArrayList<MetricNode>();
    //呼叫BucketLeapArray
    data.currentWindow();
    //列出統計結果
    List<WindowWrap<MetricBucket>> list = data.list();
    for (WindowWrap<MetricBucket> window : list) {
        if (window == null) {
            continue;
        }
        //對統計結果進行封裝
        MetricNode node = new MetricNode();
        //代表一秒內被流量控制的請求數量
        node.setBlockQps(window.value().block());
        //則是一秒內業務本身異常的總和
        node.setExceptionQps(window.value().exception());
        // 代表一秒內到來到的請求
        node.setPassQps(window.value().pass());
        //代表一秒內成功處理完的請求;
        long successQps = window.value().success();
        node.setSuccessQps(successQps);
        //代表一秒內該資源的平均響應時間
        if (successQps != 0) {
            node.setRt(window.value().rt() / successQps);
        } else {
            node.setRt(window.value().rt());
        }
        //設定統計視窗的開始時間
        node.setTimestamp(window.windowStart());

        node.setOccupiedPassQps(window.value().occupiedPass());

        details.add(node);
    }

    return details;
}

這個方法首先會呼叫dat.currentWindow()設定當前時間視窗到視窗列表裡去。然後呼叫data.list()列出所有的視窗資料,然後遍歷不為空的視窗資料封裝成MetricNode返回。

data是BucketLeapArray的例項,BucketLeapArray繼承了LeapArray,主要的統計都是在LeapArray中進行的,所以我們直接看看LeapArray的currentWindow方法。

LeapArray#currentWindow

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
    //通過當前時間判斷屬於哪個視窗
    int idx = calculateTimeIdx(timeMillis);
    //計算出視窗開始時間
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);

    while (true) {
        //獲取數組裡的老資料
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
           
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
            // 如果對應時間視窗的開始時間與計算得到的開始時間一樣
            // 那麼代表當前即是我們要找的視窗物件,直接返回
        } else if (windowStart == old.windowStart()) {
             
            return old;
        } else if (windowStart > old.windowStart()) { 
            //如果當前的開始時間小於原開始時間,那麼就更新到新的開始時間
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            //一般來說不會走到這裡
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

這個方法裡首先會傳入一個timeMillis是當前的時間戳。然後呼叫calculateTimeIdx

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    //計算當前時間能夠落在array的那個節點上
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int)(timeId % array.length());
}

calculateTimeIdx方法用當前的時間戳除以每個視窗的大小,再和array資料取模。array資料是一個容量為60的陣列,代表被統計的60秒分割的60個小視窗。

舉例:
例如當前timeMillis = 1567175708975
timeId = 1567175708975/1000 = 1567175708
timeId % array.length() = 1567175708%60 = 8
也就是說當前的時間視窗是第八個。

然後呼叫calculateWindowStart計算當前時間開始時間

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    //用當前時間減去視窗大小,計算出視窗開始時間
    return timeMillis - timeMillis % windowLengthInMs;
}

接下來就是一個while迴圈:
在看while迴圈之前我們看一下array數組裡面是什麼樣的物件
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
WindowWrap是一個時間視窗的包裝物件,裡面包含時間視窗的長度,這裡是1000;視窗開始時間;視窗內的資料實體,是呼叫newEmptyBucket方法返回一個MetricBucket。

MetricBucket

public class MetricBucket {

    private final LongAdder[] counters;
    //預設4900
    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        //初始化minRt,預設是4900
        initMinRt();
    }
    ...
}

MetricEvent是一個列舉類:

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

也就是是MetricBucket為每個視窗通過一個內部陣列counters統計了這個視窗內的所有資料。

接下來我們來講一下while迴圈裡所做的事情:

  1. 從array裡獲取bucket節點
  2. 如果節點已經存在,那麼用CAS更新一個新的節點
  3. 如果節點是新的,那麼直接返回
  4. 如果節點失效了,設定當前節點,清除所有失效節點

舉例:

1. 如果array資料裡面的bucket資料如下所示:
     B0       B1      B2    NULL      B4
 ||_______|_______|_______|_______|_______||___
 200     400     600     800     1000    1200  timestamp
                             ^
                          time=888
正好當前時間所對應的槽位裡面的資料是空的,那麼就用CAS更新

2. 如果array裡面已經有資料了,並且槽位裡面的視窗開始時間和當前的開始時間相等,那麼直接返回
     B0       B1      B2     B3      B4
 ||_______|_______|_______|_______|_______||___
 200     400     600     800     1000    1200  timestamp
                             ^
                          time=888

3. 例如當前時間是1676,所對應窗口裡面的資料的視窗開始時間小於當前的視窗開始時間,那麼加上鎖,然後設定槽位的視窗開始時間為當前視窗開始時間,並把槽位裡面的資料重置
   (old)
             B0       B1      B2    NULL      B4
 |_______||_______|_______|_______|_______|_______||___
 ...    1200     1400    1600    1800    2000    2200  timestamp
                              ^
                           time=1676

所以上面的array陣列大概是這樣:

array陣列由一個個的WindowWrap例項組成,WindowWrap例項裡面由MetricBucket進行資料統計。

然後繼續回到ArrayMetric的details方法,講完了上面的data.currentWindow(),現在再來講data.list()

list方法最後也會呼叫到LeapArray的list方法中:
LeapArray#list

public List<WindowWrap<T>> list(long validTime) {
    int size = array.length();
    List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);

    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);
        //如果windowWrap節點為空或者當前時間戳比windowWrap的視窗開始時間大超過60s,那麼就跳過
        //也就是說只要60s以內的資料
        if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {
            continue;
        }
        result.add(windowWrap);
    }
    return result;
}

這個方法是用來把array裡面都統計好的節點都找出來,並且是不為空,且是當前時間60秒內的資料。

最後Constants.ENTRY_NODE.metrics() 會返回所有符合條件的統計節點資料然後傳入aggregate方法中,遍歷為每個MetricNode節點設定Resource為TOTAL_IN_RESOURCE_NAME,封裝好呼叫metricWriter.write進行寫日誌操作。

最後總結一下在初始化FlowRuleManager的時候做了什麼:

  1. FlowRuleManager在初始化的時候會呼叫靜態程式碼塊進行初始化
  2. 在靜態程式碼塊內呼叫ScheduledExecutorService執行緒池,每隔1秒呼叫一次MetricTimerListener的run方法
  3. MetricTimerListener會呼叫Constants.ENTRY_NODE.metrics()進行定時的統計
    1. 呼叫StatisticNode進行統計,統計60秒內的資料,並將60秒的資料分割成60個小視窗
    2. 在設定當前視窗的時候如果裡面沒有資料直接設定,如果存在資料並且是最新的直接返回,如果是舊資料,那麼reset原來的統計資料
    3. 每個小窗口裡面的資料由MetricBucket進行封裝
  4. 最後將統計好的資料通過metricWriter寫入到log裡去

FlowRuleManager載入規則

FlowRuleManager是呼叫loadRules進行規則載入的:

FlowRuleManager#loadRules

public static void loadRules(List<FlowRule> rules) {
    currentProperty.updateValue(rules);
}

currentProperty這個例項是在FlowRuleManager是在靜態程式碼塊裡面進行載入的,上面我們講過,生成的是DynamicSentinelProperty的例項。

我們進入到DynamicSentinelProperty的updateValue中:

public boolean updateValue(T newValue) {
    //判斷新的元素和舊元素是否相同
    if (isEqual(value, newValue)) {
        return false;
    }
    RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);

    value = newValue;
    for (PropertyListener<T> listener : listeners) {
        listener.configUpdate(newValue);
    }
    return true;
}

updateValue方法就是校驗一下是不是已經存在相同的規則了,如果不存在那麼就直接設定value等於新的規則,然後通知所有的監聽器更新一下規則配置。

currentProperty例項裡面的監聽器會在FlowRuleManager初始化靜態程式碼塊的時候設定一個FlowPropertyListener監聽器例項,FlowPropertyListener是FlowRuleManager的內部類:

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {

    @Override
    public void configUpdate(List<FlowRule> value) {
        Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
        if (rules != null) {
            flowRules.clear();
            //這個map的維度是key是Resource
            flowRules.putAll(rules);
        }
        RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
    }
     ....
}

configUpdate首先會呼叫FlowRuleUtil.buildFlowRuleMap()方法將所有的規則按resource分類,然後排序返回成map,然後將FlowRuleManager的原來的規則清空,放入新的規則集合到flowRules中去。

FlowRuleUtil#buildFlowRuleMap
這個方法最後會呼叫到FlowRuleUtil的另一個過載的方法:

public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
                                                          Predicate<FlowRule> filter, boolean shouldSort) {
    Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
    if (list == null || list.isEmpty()) {
        return newRuleMap;
    }
    Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();

    for (FlowRule rule : list) {
        //校驗必要欄位:資源名,限流閾值, 限流閾值型別,呼叫關係限流策略,流量控制效果等
        if (!isValidRule(rule)) {
            RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
            continue;
        }
        if (filter != null && !filter.test(rule)) {
            continue;
        }
        //應用名,如果沒有則會使用default
        if (StringUtil.isBlank(rule.getLimitApp())) {
            rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
        }
        //設定拒絕策略:直接拒絕、Warm Up、勻速排隊,預設是DefaultController
        TrafficShapingController rater = generateRater(rule);
        rule.setRater(rater);

        //獲取Resource名字
        K key = groupFunction.apply(rule);
        if (key == null) {
            continue;
        }
        //根據Resource進行分組
        Set<FlowRule> flowRules = tmpMap.get(key);

        if (flowRules == null) {
            // Use hash set here to remove duplicate rules.
            flowRules = new HashSet<>();
            tmpMap.put(key, flowRules);
        }

        flowRules.add(rule);
    }
    //根據ClusterMode LimitApp排序
    Comparator<FlowRule> comparator = new FlowRuleComparator();
    for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
        List<FlowRule> rules = new ArrayList<>(entries.getValue());
        if (shouldSort) {
            // Sort the rules.
            Collections.sort(rules, comparator);
        }
        newRuleMap.put(entries.getKey(), rules);
    }
    return newRuleMap;
}

這個方法首先校驗傳進來的rule集合不為空,然後遍歷rule集合。對rule的必要欄位進行校驗,如果傳入了過濾器那麼校驗過濾器,然後過濾resource為空的rule,最後相同的resource的rule都放到一起排序後返回。
注意這裡預設生成的rater是DefaultController。

到這裡FlowRuleManager已經分析完畢了,比較長