1. 程式人生 > >dubbo原始碼閱讀之負載均衡

dubbo原始碼閱讀之負載均衡

負載均衡

在之前叢集的文章中,我們分析了通過監聽註冊中心可以獲取到多個服務提供者,並建立多個Invoker,然後通過叢集類如FailoverClusterInvoker將多個Invoker封裝在一起,而外部的呼叫者以這個封裝的Invoker為入口呼叫內部的多個Invoker,但是我們一次呼叫實際只能呼叫一個真實的Invoker(這裡的真實的Invoker對應一個提供者),所以怎麼在多個Invoker中選擇出一個Invoker來,使得整體的服務呼叫的效能最大化,這就是負載均衡策略。另外,除了負載均衡,叢集類的一個重要功能就是處理服務呼叫故障,當一個invoker呼叫失敗後怎麼辦,根據對這種情況的不同處理策略分為不同的叢集類。當然除了以上兩個功能,叢集類的另一個功能就是粘滯呼叫,這裡不再贅述。

值得一提的是,消費端的Invoker其實是一個分層的結構,我們可以在配置檔案中指定多個註冊中心,這樣就會封裝出多個Invoker(每個Invoker對應一個註冊中心),對於這多個Invoker再通過叢集類的join方法封裝為一個總的Invoker(StaticDirectory),所以叢集類在執行故障處理,負載均衡策略時實際上也是分層級的。
好了,叢集類的回顧就到這裡,說這麼多主要是想引出一點:負載均衡邏輯的執行是在叢集類中,所以分析負載均衡就以叢集類為切入點。
dubbo提供了五種負載均衡器,下面我們一一分析。

RandomLoadBalance(隨機)

光看名字,會以為真的就是從Invoker列表中隨機選擇一個,實際上並不是這麼簡單,dubbo的隨機負載均衡考慮到了權重因素,每個服務提供者的每個方法都有一個有一個權重,權重大的意味著被呼叫的可能性更大。 so, talk is cheap ,show me your code !

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // Every invoker has the same weight?
    // 表示是否所有的invoker的權重都相等??
    boolean sameWeight = true;
    // the weight of every invokers
    // 用於記錄invoker權重的陣列
    int[] weights = new int[length];
    // the first invoker's weight
    // 第一個invoker的權重
    int firstWeight = getWeight(invokers.get(0), invocation);
    weights[0] = firstWeight;
    // The sum of weights
    // 所有invoker權重的和
    int totalWeight = firstWeight;
    // 遍歷所有的invoker,獲取權重,更新簿記量
    for (int i = 1; i < length; i++) {
        // 獲取權重,預設是100
        // 考慮到了服務預熱的特性,
        // 所謂服務預熱是指服務在剛啟動時不會一下子被賦予全部的權重,而是緩慢第達到設定的權重值
        // 預設權重是100, 預設的預熱時間是10分鐘
        int weight = getWeight(invokers.get(i), invocation);
        // save for later use
        // 記錄權重
        weights[i] = weight;
        // Sum
        // 記錄權重和
        totalWeight += weight;
        // 如果與第一個權重不相等,那麼更新簿記量
        if (sameWeight && weight != firstWeight) {
            sameWeight = false;
        }
    }
    // 如果invoker的權重有區別,那麼需要根據權重來隨機
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        // 這是根據權重值隨機的典型用法
        for (int i = 0; i < length; i++) {
            offset -= weights[i];
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    // 如果所有invoker的權重相等,或者權重和為0, 那麼隨機選擇一個invoker
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

這裡指的關注的是所謂的隨機並不是完全隨機,而是在考慮權重的基礎上的隨機,這種演算法值得我們學習,以後再需要的場景中也可以這樣做。
此外,dubbo還有服務預熱的特性,就像我在註釋中說的,這種策略應該是為了避免在服務剛啟動時就湧上來大量的請求,導致服務崩潰,設定一個緩衝時間可以有效避免這種情況,預設的預熱時間是10分鐘,預設的權重值是100。

RoundRobinLoadBalance(輪詢)

接下來,我們分析一下輪詢排程演算法。

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // 方法的key值
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
    if (map == null) {
        // 注意,這裡使用的是putIfAbsent方法,考慮到併發問題
        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
        map = methodWeightMap.get(key);
    }
    int totalWeight = 0;
    // 簿記量,記錄當前的最大權重
    long maxCurrent = Long.MIN_VALUE;
    long now = System.currentTimeMillis();
    // 被選中的Invoker
    Invoker<T> selectedInvoker = null;
    WeightedRoundRobin selectedWRR = null;
    for (Invoker<T> invoker : invokers) {
        // 獲取該url的唯一標識
        String identifyString = invoker.getUrl().toIdentityString();
        WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
        // 獲取權重,考慮到預熱
        int weight = getWeight(invoker, invocation);

        if (weightedRoundRobin == null) {
            weightedRoundRobin = new WeightedRoundRobin();
            // 設定權重
            weightedRoundRobin.setWeight(weight);
            // 仍然呼叫putIfAbsent方法
            map.putIfAbsent(identifyString, weightedRoundRobin);
            // TODO 個人認為這裡應該加下面這句
            // weightedRoundRobin = map.get(identifyString);
        }
        if (weight != weightedRoundRobin.getWeight()) {
            //weight changed
            // 更新權重
            weightedRoundRobin.setWeight(weight);
        }
        // 獲取當前的累積權重
        long cur = weightedRoundRobin.increaseCurrent();
        // 設定更新時間
        weightedRoundRobin.setLastUpdate(now);
        // 如果累積權重大於當前的最大權重,那麼將當前選中的invoker設為這個invoker
        // 並更新最大權重值
        if (cur > maxCurrent) {
            maxCurrent = cur;
            selectedInvoker = invoker;
            selectedWRR = weightedRoundRobin;
        }
        // 累加權重和
        totalWeight += weight;
    }
    
    // 如果invoker有變化,那麼將那些長時間未被選到的WeightedRoundRobin清除出去
    // 這裡用AtomicBoolean來加鎖,cas鎖
    if (!updateLock.get() && invokers.size() != map.size()) {
        if (updateLock.compareAndSet(false, true)) {
            try {
                // copy -> modify -> update reference
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                newMap.putAll(map);
                Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, WeightedRoundRobin> item = it.next();
                    if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                        it.remove();
                    }
                }
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }
    }
    if (selectedInvoker != null) {
        // 這一句很重要
        // 選中這個invoker之後,將他的累積權重減去這一輪的總權重,
        // 相當於將其放到隊尾,當然仍然還是考慮到權重的,權重的大的即是減去總權重也還是有較高的累積權重
        selectedWRR.sel(totalWeight);
        return selectedInvoker;
    }
    // should not happen here
    // 邏輯上將,程式碼是不會到這裡的
    // 但是由於語法的要求這裡必須要有返回語句
    return invokers.get(0);
}

這裡的輪詢仍然是考慮到權重因素的,這裡對於輪詢的實現很巧妙,使用累積權重的方法,巧妙地將權重因素考慮進去;而當所有的服務提供者的權重都相等時,這種演算法就會退化為標準的輪詢演算法,實際上累積權重在這裡還起到了記憶輪詢狀態的作用。可以這麼形象地理解,當呼叫過某個提供者後,就把這個提供者扔到靠後面的一個位置,權重越小越靠後,這樣就實現了輪詢的功能。

LeastActiveLoadBalance

// 最小活躍數,考慮權重因素
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // The least active value of all invokers
    // 用於記錄最小活躍數
    int leastActive = -1;
    // The number of invokers having the same least active value (leastActive)
    // 相同最小活躍數的invoker數量
    int leastCount = 0;
    // The index of invokers having the same least active value (leastActive)
    // 擁有相同的最小活躍數的那些invoker的下標
    int[] leastIndexes = new int[length];
    // the weight of every invokers
    // 每個invoker的權重
    int[] weights = new int[length];
    // The sum of the warmup weights of all the least active invokes
    // 權重和
    int totalWeight = 0;
    // The weight of the first least active invoke
    // 第一個最小活躍數的invoker的權重
    int firstWeight = 0;
    // Every least active invoker has the same weight value?
    // 如果有多個相同最小活躍數的invoker,他們的權重是否相同
    boolean sameWeight = true;


    // Filter out all the least active invokers
    // 這個迴圈的作用是選出最小活躍數的invoker,如果有多個相同最小活躍數的invoker,全部選擇出來
    for (int i = 0; i < length; i++) {
        Invoker<T> invoker = invokers.get(i);
        // Get the active number of the invoke
        int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
        // Get the weight of the invoke configuration. The default value is 100.
        int afterWarmup = getWeight(invoker, invocation);
        // save for later use
        weights[i] = afterWarmup;
        // If it is the first invoker or the active number of the invoker is less than the current least active number
        if (leastActive == -1 || active < leastActive) {
            // Reset the active number of the current invoker to the least active number
            leastActive = active;
            // Reset the number of least active invokers
            leastCount = 1;
            // Put the first least active invoker first in leastIndexs
            leastIndexes[0] = i;
            // Reset totalWeight
            totalWeight = afterWarmup;
            // Record the weight the first least active invoker
            firstWeight = afterWarmup;
            // Each invoke has the same weight (only one invoker here)
            sameWeight = true;
            // If current invoker's active value equals with leaseActive, then accumulating.
        } else if (active == leastActive) {
            // Record the index of the least active invoker in leastIndexs order
            leastIndexes[leastCount++] = i;
            // Accumulate the total weight of the least active invoker
            totalWeight += afterWarmup;
            // If every invoker has the same weight?
            if (sameWeight && i > 0
                    && afterWarmup != firstWeight) {
                sameWeight = false;
            }
        }
    }
    // Choose an invoker from all the least active invokers
    // 如果最小活躍數的invoker只有一個,那麼就不用考慮權重因素
    if (leastCount == 1) {
        // If we got exactly one invoker having the least active value, return this invoker directly.
        return invokers.get(leastIndexes[0]);
    }
    // 如果他們的權重不同,那麼需要考慮權重的因素
    // 這個方法與RandomLoadBalance中類似
    if (!sameWeight && totalWeight > 0) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on 
        // totalWeight.
        int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < leastCount; i++) {
            int leastIndex = leastIndexes[i];
            offsetWeight -= weights[leastIndex];
            if (offsetWeight < 0) {
                return invokers.get(leastIndex);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    // 如果他們的權重相同,那就隨機選擇一個
    return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}

這個方法相信大家都能看懂,這裡我想重點說一下最小活躍數的維護方法。
最小活躍數是通過RpcStatus類維護的,這個類對於每個服務的每個方法都維護了一個RpcStatus類,可以看出來,dubbo對於負載均衡是細化到方法級別的,粒度還是很細的,從前面輪詢演算法也可以看出來這點。
RpcStatus類中除了維護最小活躍數,還維護了呼叫事件,總呼叫時間,呼叫次數,失敗的呼叫次數等等統計量,那麼問題是:這些統計量是在什麼時候更新的呢?你肯定會回答當然是在服務呼叫的時候更新的。那麼在程式碼中是怎麼保證在每次服務呼叫的時候都更新相應的RpcStatus類呢??答案就是ExtensionLoader的AOP特性,可見dubbo的spi機制真是無處不在,貫穿了框架的各個部分。ExtensionLoader在載入一個介面的實現類的時候會將一些包裝類快取起來,當例項化一個實現類的時候會用這些包裝類對例項進行層層包裝(通過構造方法),這樣就實現了多層代理,每個包裝類都會加入一些通用的功能,實際上就是切面的思想。 服務呼叫的統計其實正是一個通用的功能,很適合作為一個橫切面,切入所有的Invoker中。而消費端Invoker的建立是在Protocol.refer方法中,我們看一下Protocol實現類,發現了一些包裝類:

  • ProtocolFilterWrapper, 嵌入可用的Filter
  • ProtocolListenerWrapper, 加入監聽器
  • QosProtocolWrapper, 啟動QOS服務

最小活躍數功能的實現是通過過濾器。ProtocolFilterWrapper.refer方法在建立invoker時會通過層層代理的方式形成一個過濾器鏈,注意對於註冊中心的Invoker是不用新增過濾器鏈的,只有對真正執行遠端呼叫的那些Invoker才會新增過濾器鏈。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

Filter也是一個SPI介面,是通過ExtensionLoader載入的,在載入Filter介面是呼叫了getActivateExtension方法,與Activate註解有關。
活躍數的功能是通過ActiveLimitFilter過濾器實現的。

ActiveLimitFilter.invoke

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    // 呼叫的方法名
    String methodName = invocation.getMethodName();
    // 最大活躍數,預設是0,0表示不限制,這個邏輯在RpcStatus.beginCount中
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    // 獲取方法對應的RpcStatus物件
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    // 如果增加活躍數失敗,就需要等待,
    // 這裡的活躍數類似於訊號量,作用是控制最大併發量
    if (!count.beginCount(url, methodName, max)) {
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        long start = System.currentTimeMillis();
        long remain = timeout;
        synchronized (count) {
            // 等待活躍數降下來,並試圖獲取到一個活躍數,知道超時
            while (!count.beginCount(url, methodName, max)) {
                try {
                    count.wait(remain);
                } catch (InterruptedException e) {
                    // ignore
                }
                long elapsed = System.currentTimeMillis() - start;
                remain = timeout - elapsed;
                if (remain <= 0) {
                    throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                            + invoker.getInterface().getName() + ", method: "
                            + invocation.getMethodName() + ", elapsed: " + elapsed
                            + ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
                            + ". max concurrent invoke limit: " + max);
                }
            }
        }
    }

    // 到這裡說明成功獲取到一個活躍數,那麼可以繼續後面的呼叫
    boolean isSuccess = true;
    long begin = System.currentTimeMillis();
    try {
        return invoker.invoke(invocation);
    } catch (RuntimeException t) {
        isSuccess = false;
        throw t;
    } finally {
        // 呼叫結束,釋放獲取到的活躍數
        count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
        // 這裡指的思考一下,為什麼只有在max>0時才通知
        // 因為max<=0時,最大活躍數是Integer.MAX_VALUE, 相當於不設上限,
        // 所以在前面的迴圈中第一次就能獲取到活躍數,壓根不會進入迴圈,也就不會獲取鎖等待,
        // 所以這裡也就不需要通知了
        if (max > 0) {
            synchronized (count) {
                count.notifyAll();
            }
        }
    }
}

註釋已經很清楚,就不再贅述。

ConsistentHashLoadBalance

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";

private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String methodName = RpcUtils.getMethodName(invocation);
    // 以方法為粒度
    String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
    // invokers的唯一標示,
    int identityHashCode = System.identityHashCode(invokers);
    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
    // 如果invokers列表變了,就重新hash
    if (selector == null || selector.identityHashCode != identityHashCode) {
        selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
        selector = (ConsistentHashSelector<T>) selectors.get(key);
    }
    return selector.select(invocation);
}

private static final class ConsistentHashSelector<T> {

    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    private final int replicaNumber;

    private final int identityHashCode;

    private final int[] argumentIndex;

    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
        // 虛擬節點,用於使請求更均勻
        this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        // 虛擬節點個數
        this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
        // 參與生成hash值的引數的序號
        String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
        argumentIndex = new int[index.length];
        for (int i = 0; i < index.length; i++) {
            argumentIndex[i] = Integer.parseInt(index[i]);
        }
        // 初始化的時候生成hash環
        for (Invoker<T> invoker : invokers) {
            // 一個地址(ip:port)生成的hash節點實際上是固定的
            String address = invoker.getUrl().getAddress();
            for (int i = 0; i < replicaNumber / 4; i++) {
                byte[] digest = md5(address + i);
                for (int h = 0; h < 4; h++) {
                    // 生成的hash值儘量雜湊
                    long m = hash(digest, h);
                    virtualInvokers.put(m, invoker);
                }
            }
        }
    }


    public Invoker<T> select(Invocation invocation) {
        // 用指定的引數參與生成hash
        String key = toKey(invocation.getArguments());
        byte[] digest = md5(key);
        // 只要引數相同,那麼生成的hash值就是相同的,選擇到的invoker就是相同的
        // 這也就保證了相同的呼叫(同一個服務的,並且呼叫引數相同),總會被分配到固定的提供者
        return selectForKey(hash(digest, 0));
    }

    private String toKey(Object[] args) {
        StringBuilder buf = new StringBuilder();
        for (int i : argumentIndex) {
            if (i >= 0 && i < args.length) {
                buf.append(args[i]);
            }
        }
        return buf.toString();
    }

    private Invoker<T> selectForKey(long hash) {
        Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
        // entry == null說明hash比treeMap中最大的hash值還大,
        // 根據一致性hash環的演算法,這是需要繞回第一個節點,即hash最小的那個節點
        if (entry == null) {
            entry = virtualInvokers.firstEntry();
        }
        return entry.getValue();
    }

    // 雜湊函式,這種方法使生成的hash儘量均勻分散
    private long hash(byte[] digest, int number) {
        return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                | (digest[number * 4] & 0xFF))
                & 0xFFFFFFFFL;
    }

    private byte[] md5(String value) {
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.reset();
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        md5.update(bytes);
        return md5.digest();
    }
}

}

首先我們要理解一致性hash的演算法原理,明白一致性hash演算法之後再來看這個類,就簡單多了。
值得注意的是,這裡使用了TreeMap儲存hash環的所有hash值值得借鑑,這樣做的好處是查詢快速,效率很高