1. 程式人生 > >Dubbo之——幾種負載均衡演算法

Dubbo之——幾種負載均衡演算法

1、RandomLoadBalance演算法

public class RandomLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "random";
    private final Random random = new Random();
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 總個數
        int totalWeight = 0; // 總權重
        boolean sameWeight = true; // 權重是否都一樣
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累計總權重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 計算所有權重是否一樣
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果權重不相同且權重大於0則按總權重數隨機
            int offset = random.nextInt(totalWeight);
            // 並確定隨機值落在哪個片斷上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果權重相同或權重為0則均等隨機
        return invokers.get(random.nextInt(length));
    }
}

2、RoundRobinLoadBalance演算法

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin"; 
        private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
    private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // 總個數
        int maxWeight = 0; // 最大權重
        int minWeight = Integer.MAX_VALUE; // 最小權重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // 累計最大權重
            minWeight = Math.min(minWeight, weight); // 累計最小權重
        }
        if (maxWeight > 0 && minWeight < maxWeight) { // 權重不一樣
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
            for (Invoker<T> invoker : invokers) { // 篩選權重大於當前權重基數的Invoker
                if (getWeight(invoker, invocation) > currentWeight) {
                    weightInvokers.add(invoker);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 取模輪循
        return invokers.get(sequence.getAndIncrement() % length);
    }
}

3、LeastActionLoadBalance演算法

public class LeastActiveLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "leastactive";
        private final Random random = new Random();
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 總個數
        int leastActive = -1; // 最小的活躍數
        int leastCount = 0; // 相同最小活躍數的個數
        int[] leastIndexs = new int[length]; // 相同最小活躍數的下標
        int totalWeight = 0; // 總權重
        int firstWeight = 0; // 第一個權重,用於於計算是否相同
        boolean sameWeight = true; // 是否所有權重相同
        for (int i = 0; i < length; i++) {
        	Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活躍數
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重
            if (leastActive == -1 || active < leastActive) { // 發現更小的活躍數,重新開始
                leastActive = active; // 記錄最小活躍數
                leastCount = 1; // 重新統計相同最小活躍數的個數
                leastIndexs[0] = i; // 重新記錄最小活躍數下標
                totalWeight = weight; // 重新累計總權重
                firstWeight = weight; // 記錄第一個權重
                sameWeight = true; // 還原權重相同標識
            } else if (active == leastActive) { // 累計相同最小的活躍數
                leastIndexs[leastCount ++] = i; // 累計相同最小活躍數下標
                totalWeight += weight; // 累計總權重
                // 判斷所有權重是否一樣
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // 如果只有一個最小則直接返回
            return invokers.get(leastIndexs[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 如果權重不相同且權重大於0則按總權重數隨機
            int offsetWeight = random.nextInt(totalWeight);
            // 並確定隨機值落在哪個片斷上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 如果權重相同或權重為0則均等隨機
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }
}

4、ConsistentHashLoadBalance演算法

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }
    private static final class ConsistentHashSelector<T> {
    private final TreeMap<Long, Invoker<T>> virtualInvokers;
    private final int                       replicaNumber;
    private final int                       identityHashCode;
    private final int[]                     argumentIndex;
  public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = System.identityHashCode(invokers);
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i ++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
        public int getIdentityHashCode() {
            return identityHashCode;
        }
        public Invoker<T> select(Invocation invocation) {
            String key = toKey(invocation.getArguments());
            byte[] digest = md5(key);
            Invoker<T> invoker = sekectForKey(hash(digest, 0));
            return invoker;
        }
        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }
        private Invoker<T> sekectForKey(long hash) {
            Invoker<T> invoker;
            Long key = hash;
            if (!virtualInvokers.containsKey(key)) {
                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
                if (tailMap.isEmpty()) {
                    key = virtualInvokers.firstKey();
                } else {
                    key = tailMap.firstKey();
                }
            }
            invoker = virtualInvokers.get(key);
            return invoker;
        }
        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8) 
                    | (digest[0 + number * 4] & 0xFF)) 
                    & 0xFFFFFFFFL;
        }
        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes = null;
            try {
                bytes = value.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.update(bytes);
            return md5.digest();
        }
    }
}