1. 程式人生 > >dubbo負載均衡演算法

dubbo負載均衡演算法

Dubbo提供了4中負載均衡策略

① Random LoadBalance(隨機均衡演算法)

② RoundRobin LoadBalance(權重輪詢均衡演算法)

③ LeastAction LoadBalance(最少活躍呼叫數均衡演算法)

④ ConsistentHash LoadBalance(一致性Hash均衡演算法)

預設時為Random隨機呼叫

1.Random LoadBalance(隨機均衡演算法)

官方:

隨機,按權重設定隨機概率。

在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。

2.RoundRobin LoadBalance(權重輪詢均衡演算法)

官方:

輪詢,按公約後的權重設定輪詢比率。

存在慢的提供者累積請求的問題,比如:第二臺機器很慢,但沒掛,當請求呼叫第二臺時就卡在那裡,久而久之,所有請求都卡在第二臺機器上。

Round Robin輪詢演算法,是按照公約後的權重設定輪詢比率,即權重輪詢演算法(Weighted Round-Robin),它是基於輪詢演算法改進而來的

輪詢排程演算法的原理是:每一次把來自使用者的請求輪流分配給內部中的伺服器。如從1開始,一直到N(其中N是內部伺服器的總個數),然後重新開始迴圈。

該演算法的優點:

簡潔,無需記錄當前所有連線的狀態,所以它是一種無狀態排程。

缺點:

輪詢排程演算法假設所有伺服器的處理效能都相同,不關心每臺伺服器的當前連線數和相應速度。當請求服務間隔時間變化比較大時,輪詢排程演算法容易導致伺服器間的負載不平衡。

所以此種均衡演算法適合於伺服器組中所有伺服器都有相同的軟硬體配置並且平均服務請求相對均衡的情況。但是在實際情況中,可能並不是這種情況。由於每臺伺服器的配置、安裝的業務應用等不同,其處理能力會不一樣。所以,我們根據伺服器的不同處理能力,給每個伺服器分配不同的權值,使其能夠接受相應權值數的服務請求。

權重輪詢排程演算法的流程:

假設有一組伺服器S={S0,S1,...Sn-1}

W(Si)表示伺服器Si的權值

指示變數i表示上一次選擇的伺服器

指示變數cw表示當前排程的權值

max(S)表示集合S中所有服務的最大權值

gcw(S)表示集合S中所有伺服器權值的最大公約數

變數i初始化為-1,cw初始化為0

while (true) {

  i = (i + 1) mod n;

  if (i == 0) {

     cw = cw - gcd(S);

     if (cw <= 0) {

       cw = max(S);

       if (cw == 0)

         return NULL;

     }

  }

  if (W(Si) >= cw)

    return Si;

}

這種演算法的邏輯實現如圖2所示,圖中我們假定四臺伺服器的處理能力為3:1:1:1

由於權重輪詢排程演算法考慮到了不同伺服器的處理能力,所以這種均衡演算法能確保高效能的伺服器得到更多的使用率,避免低效能的伺服器負載過重。所以在實際應用中比較常見。

3.LeastAction LoadBalance(最少活躍呼叫數均衡演算法)

官方:

最少活躍呼叫數,相同活躍數的隨機,活躍數指呼叫前後計數差。

使慢的提供者收到更少請求,因為越慢的提供者的呼叫前後技術差會越大。

4.ConsistentHash LoadBalance(一致性Hash均衡演算法)

官方

一致性hash,相同引數的請求總是傳送到同一提供者

當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其他提供者,不會引起劇烈變動弄。

一致性hash演算法可以解決服務提供者的增加、移除及掛掉時的情況,能儘可能小的改變已存在key的對映關係,儘可能的滿足單調性的要求。

一致性hash通過構建虛擬節點,能儘可能避免分配失衡,具有很好的平衡性。

以下示例假設物件(Object)就相當於Client發的請求,cache相當於服務提供者。

環形hash空間

考慮通常的hash演算法都是將value對映到一個32位的key值,也即是0~2^32-1次方的數值空間。可以將這個空間想象成一個首(0)尾(2^32-1)相接的圓環,如:

把物件對映到hash空間

接下來考慮四個物件object1~object4,通過hash函式計算出hash值key在環上的分佈,如圖:

hash(object1) = key1;

...

hash(object4) = key4;

把cache對映到hash空間

Consistent hashing的基本思想就是將物件和cache都對映到同一個hash數值空間中,並且使用相同的hash演算法

假設現在有A,B,C共3臺cache,那麼對映結果如下圖所示,它們在hash空間中,以對應的hash值排序

hash(cache A) = key A

...

hash(cache C) = key C

cache的hash計算,一般的方法可以使用cache機器的ip地址或者機器名作為hash輸入。

把物件對映到cache

現在cache和物件都已經通過同一個hash演算法對映到hash數值空間中來了,接下來要考慮的就是如何將物件對映到cache上面了。

在這個環形空間中,如果沿著順時針方向從物件的key值出發,知道遇到一個cache,那麼就將該物件儲存在這個cache上,因為物件和cache的hash值是固定的,因此這個cache必然是唯一和固定的,這樣就找到了物件和cache的對映方法。

依據上面的方法,物件object1將被儲存到cache A上,object2和object3對應到cache C,object4對應到cache B

考察cache的變動

一致性hash演算法可以解決服務提供者的增加、移除及掛掉時的情況,能儘可能小的改變已存在key的對映關係,儘可能滿足單調性的要求。

1)移除cache

考慮假設cache B掛掉了,根據上面的對映方法,這時受影響的將僅是那些yan沿cache B逆時針便利直到下一個cache(cache C)之間的物件,也即是本來對映到cache B上的那些物件。

因此,這裡僅需要變動object4,將其重新對映到cache C上即可

2)新增cache

再考慮新增一臺新的cache D的情況,假設在這個環形hash空間中cache D被對映在物件object2和object3之間,這時受影響的將僅是那些沿cache D逆時針遍歷直到下一個cache(cache B)之間的物件(它們本來是對映到cache C上的物件的一部分),將這些物件重新對映到cache D上即可

因此,這裡僅需要變動物件object2,將其重新對映到cache D上。

虛擬節點

考慮hash演算法的另一個指標是平衡性(Balance),定義如下:

平衡性是指hash的結果能夠儘可能分佈到所有的緩衝中去,這樣可以使得所有的緩衝空間都得到利用。

hash演算法並不是保證絕對的平衡,如果cache較少的話,物件並不能被均勻的對映到cache上,比如在上面的例子中,僅部署cache A和cache C的情況下,在4個物件中,cache A僅儲存了object1,而cache C則儲存了object2、object3、object4,分散式很不均衡的。

為了解決這種情況,consistent hashing引入了虛擬節點的概念,它可以定義如下:

“虛擬節點”(virtual node)是實際節點在hash空間的複製品(repllica),一個節點對應了若干個“虛擬節點”,這個對應個數也成為“複製個數”,“虛擬節點”在hash空間中以hash值排列。

仍以僅部署cache A和cache C的情況為例,此情況cache分佈並不均勻。現在我們引入虛擬節點,並設定“複製個數”為2,這就意味著一共會存在4個“虛擬節點”,cache A1,cache A2代表了cache A;cache C1,cache C2代表了cache C;假設一種比較理想的情況:

 

此時,物件到虛擬節點的對映關係為

object 1 -> cache A2;object 2 -> cache A1;object 3 -> cache C1;object 4 -> cache C2;

因此object1和object2都對映到了cache A上,而object3和object4對映到了cache C上,平衡性有了很大提高。

引入“虛擬節點”後對映關係從 { 物件 -> 節點 } 轉換到了 { 物件 -> 虛擬節點 } 查詢物體所在的cache時的對映關係為:

“虛擬節點”的hash計算可以採用對應節點的ip地址加數字字尾的方式,例如,假設cache A的ip地址為202.168.14.241

引入虛擬節點前,計算cache A的hash值:

hash("202.168.14.241");

引入虛擬節點後,計算虛擬節點cache A1和 cache A2的hash值

hash("202.168.14.241#1");//cache A1

hash("202.168.14.241#2");//cache A2

附錄:

1.RandomLoadBalance演算法

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        int length = invokers.size(); // 總個數

        int totalWeight = 0; // 總權重

        boolean sameWeight = true; // 權重是否都一樣

        for (int i = 0; i < length; i++) {

            int weight = getWeight(invokers.get(i), invocation);

            totalWeight += weight; // 累計總權重

            if (sameWeight && i > 0

                    && weight != getWeight(invokers.get(i - 1), invocation)) {

                sameWeight = false; // 計算所有權重是否一樣

            }

        }

        if (totalWeight > 0 && ! sameWeight) {

            // 如果權重不相同且權重大於0則按總權重數隨機

            int offset = random.nextInt(totalWeight);

            // 並確定隨機值落在哪個片斷上

            for (int i = 0; i < length; i++) {

                offset -= getWeight(invokers.get(i), invocation);

                if (offset < 0) {

                    return invokers.get(i);

                }

            }

        }

        // 如果權重相同或權重為0則均等隨機

        return invokers.get(random.nextInt(length));

    }

}

2.RoundRobinLoadBalance演算法

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

        private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

        int length = invokers.size(); // 總個數

        int maxWeight = 0; // 最大權重

        int minWeight = Integer.MAX_VALUE; // 最小權重

        for (int i = 0; i < length; i++) {

            int weight = getWeight(invokers.get(i), invocation);

            maxWeight = Math.max(maxWeight, weight); // 累計最大權重

            minWeight = Math.min(minWeight, weight); // 累計最小權重

        }

        if (maxWeight > 0 && minWeight < maxWeight) { // 權重不一樣

            AtomicPositiveInteger weightSequence = weightSequences.get(key);

            if (weightSequence == null) {

                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());

                weightSequence = weightSequences.get(key);

            }

            int currentWeight = weightSequence.getAndIncrement() % maxWeight;

            List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();

            for (Invoker<T> invoker : invokers) { // 篩選權重大於當前權重基數的Invoker

                if (getWeight(invoker, invocation) > currentWeight) {

                    weightInvokers.add(invoker);

                }

            }

            int weightLength = weightInvokers.size();

            if (weightLength == 1) {

                return weightInvokers.get(0);

            } else if (weightLength > 1) {

                invokers = weightInvokers;

                length = invokers.size();

            }

        }

        AtomicPositiveInteger sequence = sequences.get(key);

        if (sequence == null) {

            sequences.putIfAbsent(key, new AtomicPositiveInteger());

            sequence = sequences.get(key);

        }

        // 取模輪循

        return invokers.get(sequence.getAndIncrement() % length);

    }

}

3.LeastActionLoadBalance演算法

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

        private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        int length = invokers.size(); // 總個數

        int leastActive = -1; // 最小的活躍數

        int leastCount = 0; // 相同最小活躍數的個數

        int[] leastIndexs = new int[length]; // 相同最小活躍數的下標

        int totalWeight = 0; // 總權重

        int firstWeight = 0; // 第一個權重,用於於計算是否相同

        boolean sameWeight = true; // 是否所有權重相同

        for (int i = 0; i < length; i++) {

         Invoker<T> invoker = invokers.get(i);

            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活躍數

            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重

            if (leastActive == -1 || active < leastActive) { // 發現更小的活躍數,重新開始

                leastActive = active; // 記錄最小活躍數

                leastCount = 1; // 重新統計相同最小活躍數的個數

                leastIndexs[0] = i; // 重新記錄最小活躍數下標

                totalWeight = weight; // 重新累計總權重

                firstWeight = weight; // 記錄第一個權重

                sameWeight = true; // 還原權重相同標識

            } else if (active == leastActive) { // 累計相同最小的活躍數

                leastIndexs[leastCount ++] = i; // 累計相同最小活躍數下標

                totalWeight += weight; // 累計總權重

                // 判斷所有權重是否一樣

                if (sameWeight && i > 0

                        && weight != firstWeight) {

                    sameWeight = false;

                }

            }

        }

        // assert(leastCount > 0)

        if (leastCount == 1) {

            // 如果只有一個最小則直接返回

            return invokers.get(leastIndexs[0]);

        }

        if (! sameWeight && totalWeight > 0) {

            // 如果權重不相同且權重大於0則按總權重數隨機

            int offsetWeight = random.nextInt(totalWeight);

            // 並確定隨機值落在哪個片斷上

            for (int i = 0; i < leastCount; i++) {

                int leastIndex = leastIndexs[i];

                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);

                if (offsetWeight <= 0)

                    return invokers.get(leastIndex);

            }

        }

        // 如果權重相同或權重為0則均等隨機

        return invokers.get(leastIndexs[random.nextInt(leastCount)]);

    }

}

4.ConsistentHashLoadBalance演算法

public class ConsistentHashLoadBalance extends AbstractLoadBalance {

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")

    @Override

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

        int identityHashCode = System.identityHashCode(invokers);

        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

        if (selector == null || selector.getIdentityHashCode() != identityHashCode) {

            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));

            selector = (ConsistentHashSelector<T>) selectors.get(key);

        }

        return selector.select(invocation);

    }

    private static final class ConsistentHashSelector<T> {

    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    private final int                       replicaNumber;

    private final int                       identityHashCode;

    private final int[]                     argumentIndex;

  public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {

            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();

            this.identityHashCode = System.identityHashCode(invokers);

            URL url = invokers.get(0).getUrl();

            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);

            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));

            argumentIndex = new int[index.length];

            for (int i = 0; i < index.length; i ++) {

                argumentIndex[i] = Integer.parseInt(index[i]);

            }

            for (Invoker<T> invoker : invokers) {

                for (int i = 0; i < replicaNumber / 4; i++) {

                    byte[] digest = md5(invoker.getUrl().toFullString() + i);

                    for (int h = 0; h < 4; h++) {

                        long m = hash(digest, h);

                        virtualInvokers.put(m, invoker);

                    }

                }

            }

        }

        public int getIdentityHashCode() {

            return identityHashCode;

        }

        public Invoker<T> select(Invocation invocation) {

            String key = toKey(invocation.getArguments());

            byte[] digest = md5(key);

            Invoker<T> invoker = sekectForKey(hash(digest, 0));

            return invoker;

        }

        private String toKey(Object[] args) {

            StringBuilder buf = new StringBuilder();

            for (int i : argumentIndex) {

                if (i >= 0 && i < args.length) {

                    buf.append(args[i]);

                }

            }

            return buf.toString();

        }

        private Invoker<T> sekectForKey(long hash) {

            Invoker<T> invoker;

            Long key = hash;

            if (!virtualInvokers.containsKey(key)) {

                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);

                if (tailMap.isEmpty()) {

                    key = virtualInvokers.firstKey();

                } else {

                    key = tailMap.firstKey();

                }

            }

            invoker = virtualInvokers.get(key);

            return invoker;

        }

        private long hash(byte[] digest, int number) {

            return (((long) (digest[3 + number * 4] & 0xFF) << 24)

                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)

                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)

                    | (digest[0 + number * 4] & 0xFF))

                    & 0xFFFFFFFFL;

        }

        private byte[] md5(String value) {

            MessageDigest md5;

            try {

                md5 = MessageDigest.getInstance("MD5");

            } catch (NoSuchAlgorithmException e) {

                throw new IllegalStateException(e.getMessage(), e);

            }

            md5.reset();

            byte[] bytes = null;

            try {

                bytes = value.getBytes("UTF-8");

            } catch (UnsupportedEncodingException e) {

                throw new IllegalStateException(e.getMessage(), e);

            }

            md5.update(bytes);

            return md5.digest();

        }

    }

}