dubbo原始碼閱讀之負載均衡
負載均衡
在之前叢集的文章中,我們分析了通過監聽註冊中心可以獲取到多個服務提供者,並建立多個Invoker,然後通過叢集類如FailoverClusterInvoker將多個Invoker封裝在一起,而外部的呼叫者以這個封裝的Invoker為入口呼叫內部的多個Invoker,但是我們一次呼叫實際只能呼叫一個真實的Invoker(這裡的真實的Invoker對應一個提供者),所以怎麼在多個Invoker中選擇出一個Invoker來,使得整體的服務呼叫的效能最大化,這就是負載均衡策略。另外,除了負載均衡,叢集類的一個重要功能就是處理服務呼叫故障,當一個invoker呼叫失敗後怎麼辦,根據對這種情況的不同處理策略分為不同的叢集類。當然除了以上兩個功能,叢集類的另一個功能就是粘滯呼叫,這裡不再贅述。
好了,叢集類的回顧就到這裡,說這麼多主要是想引出一點:負載均衡邏輯的執行是在叢集類中,所以分析負載均衡就以叢集類為切入點。
dubbo提供了五種負載均衡器,下面我們一一分析。
RandomLoadBalance(隨機)
光看名字,會以為真的就是從Invoker列表中隨機選擇一個,實際上並不是這麼簡單,dubbo的隨機負載均衡考慮到了權重因素,每個服務提供者的每個方法都有一個有一個權重,權重大的意味著被呼叫的可能性更大。 so, talk is cheap ,show me your code !
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // Every invoker has the same weight? // 表示是否所有的invoker的權重都相等?? boolean sameWeight = true; // the weight of every invokers // 用於記錄invoker權重的陣列 int[] weights = new int[length]; // the first invoker's weight // 第一個invoker的權重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // The sum of weights // 所有invoker權重的和 int totalWeight = firstWeight; // 遍歷所有的invoker,獲取權重,更新簿記量 for (int i = 1; i < length; i++) { // 獲取權重,預設是100 // 考慮到了服務預熱的特性, // 所謂服務預熱是指服務在剛啟動時不會一下子被賦予全部的權重,而是緩慢第達到設定的權重值 // 預設權重是100, 預設的預熱時間是10分鐘 int weight = getWeight(invokers.get(i), invocation); // save for later use // 記錄權重 weights[i] = weight; // Sum // 記錄權重和 totalWeight += weight; // 如果與第一個權重不相等,那麼更新簿記量 if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 如果invoker的權重有區別,那麼需要根據權重來隨機 if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. // 這是根據權重值隨機的典型用法 for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. // 如果所有invoker的權重相等,或者權重和為0, 那麼隨機選擇一個invoker return invokers.get(ThreadLocalRandom.current().nextInt(length)); }
這裡指的關注的是所謂的隨機並不是完全隨機,而是在考慮權重的基礎上的隨機,這種演算法值得我們學習,以後再需要的場景中也可以這樣做。
此外,dubbo還有服務預熱的特性,就像我在註釋中說的,這種策略應該是為了避免在服務剛啟動時就湧上來大量的請求,導致服務崩潰,設定一個緩衝時間可以有效避免這種情況,預設的預熱時間是10分鐘,預設的權重值是100。
RoundRobinLoadBalance(輪詢)
接下來,我們分析一下輪詢排程演算法。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 方法的key值
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
// 注意,這裡使用的是putIfAbsent方法,考慮到併發問題
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
// 簿記量,記錄當前的最大權重
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
// 被選中的Invoker
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 獲取該url的唯一標識
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 獲取權重,考慮到預熱
int weight = getWeight(invoker, invocation);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// 設定權重
weightedRoundRobin.setWeight(weight);
// 仍然呼叫putIfAbsent方法
map.putIfAbsent(identifyString, weightedRoundRobin);
// TODO 個人認為這裡應該加下面這句
// weightedRoundRobin = map.get(identifyString);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
// 更新權重
weightedRoundRobin.setWeight(weight);
}
// 獲取當前的累積權重
long cur = weightedRoundRobin.increaseCurrent();
// 設定更新時間
weightedRoundRobin.setLastUpdate(now);
// 如果累積權重大於當前的最大權重,那麼將當前選中的invoker設為這個invoker
// 並更新最大權重值
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 累加權重和
totalWeight += weight;
}
// 如果invoker有變化,那麼將那些長時間未被選到的WeightedRoundRobin清除出去
// 這裡用AtomicBoolean來加鎖,cas鎖
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
// 這一句很重要
// 選中這個invoker之後,將他的累積權重減去這一輪的總權重,
// 相當於將其放到隊尾,當然仍然還是考慮到權重的,權重的大的即是減去總權重也還是有較高的累積權重
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
// 邏輯上將,程式碼是不會到這裡的
// 但是由於語法的要求這裡必須要有返回語句
return invokers.get(0);
}
這裡的輪詢仍然是考慮到權重因素的,這裡對於輪詢的實現很巧妙,使用累積權重的方法,巧妙地將權重因素考慮進去;而當所有的服務提供者的權重都相等時,這種演算法就會退化為標準的輪詢演算法,實際上累積權重在這裡還起到了記憶輪詢狀態的作用。可以這麼形象地理解,當呼叫過某個提供者後,就把這個提供者扔到靠後面的一個位置,權重越小越靠後,這樣就實現了輪詢的功能。
LeastActiveLoadBalance
// 最小活躍數,考慮權重因素
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
// 用於記錄最小活躍數
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
// 相同最小活躍數的invoker數量
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
// 擁有相同的最小活躍數的那些invoker的下標
int[] leastIndexes = new int[length];
// the weight of every invokers
// 每個invoker的權重
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokes
// 權重和
int totalWeight = 0;
// The weight of the first least active invoke
// 第一個最小活躍數的invoker的權重
int firstWeight = 0;
// Every least active invoker has the same weight value?
// 如果有多個相同最小活躍數的invoker,他們的權重是否相同
boolean sameWeight = true;
// Filter out all the least active invokers
// 這個迴圈的作用是選出最小活躍數的invoker,如果有多個相同最小活躍數的invoker,全部選擇出來
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoke
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoke configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexs
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexs order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
// 如果最小活躍數的invoker只有一個,那麼就不用考慮權重因素
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
// 如果他們的權重不同,那麼需要考慮權重的因素
// 這個方法與RandomLoadBalance中類似
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果他們的權重相同,那就隨機選擇一個
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
這個方法相信大家都能看懂,這裡我想重點說一下最小活躍數的維護方法。
最小活躍數是通過RpcStatus類維護的,這個類對於每個服務的每個方法都維護了一個RpcStatus類,可以看出來,dubbo對於負載均衡是細化到方法級別的,粒度還是很細的,從前面輪詢演算法也可以看出來這點。
RpcStatus類中除了維護最小活躍數,還維護了呼叫事件,總呼叫時間,呼叫次數,失敗的呼叫次數等等統計量,那麼問題是:這些統計量是在什麼時候更新的呢?你肯定會回答當然是在服務呼叫的時候更新的。那麼在程式碼中是怎麼保證在每次服務呼叫的時候都更新相應的RpcStatus類呢??答案就是ExtensionLoader的AOP特性,可見dubbo的spi機制真是無處不在,貫穿了框架的各個部分。ExtensionLoader在載入一個介面的實現類的時候會將一些包裝類快取起來,當例項化一個實現類的時候會用這些包裝類對例項進行層層包裝(通過構造方法),這樣就實現了多層代理,每個包裝類都會加入一些通用的功能,實際上就是切面的思想。 服務呼叫的統計其實正是一個通用的功能,很適合作為一個橫切面,切入所有的Invoker中。而消費端Invoker的建立是在Protocol.refer方法中,我們看一下Protocol實現類,發現了一些包裝類:
- ProtocolFilterWrapper, 嵌入可用的Filter
- ProtocolListenerWrapper, 加入監聽器
- QosProtocolWrapper, 啟動QOS服務
最小活躍數功能的實現是通過過濾器。ProtocolFilterWrapper.refer方法在建立invoker時會通過層層代理的方式形成一個過濾器鏈,注意對於註冊中心的Invoker是不用新增過濾器鏈的,只有對真正執行遠端呼叫的那些Invoker才會新增過濾器鏈。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
Filter也是一個SPI介面,是通過ExtensionLoader載入的,在載入Filter介面是呼叫了getActivateExtension方法,與Activate註解有關。
活躍數的功能是通過ActiveLimitFilter過濾器實現的。
ActiveLimitFilter.invoke
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
// 呼叫的方法名
String methodName = invocation.getMethodName();
// 最大活躍數,預設是0,0表示不限制,這個邏輯在RpcStatus.beginCount中
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 獲取方法對應的RpcStatus物件
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 如果增加活躍數失敗,就需要等待,
// 這裡的活躍數類似於訊號量,作用是控制最大併發量
if (!count.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (count) {
// 等待活躍數降下來,並試圖獲取到一個活躍數,知道超時
while (!count.beginCount(url, methodName, max)) {
try {
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}
// 到這裡說明成功獲取到一個活躍數,那麼可以繼續後面的呼叫
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
// 呼叫結束,釋放獲取到的活躍數
count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
// 這裡指的思考一下,為什麼只有在max>0時才通知
// 因為max<=0時,最大活躍數是Integer.MAX_VALUE, 相當於不設上限,
// 所以在前面的迴圈中第一次就能獲取到活躍數,壓根不會進入迴圈,也就不會獲取鎖等待,
// 所以這裡也就不需要通知了
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}
註釋已經很清楚,就不再贅述。
ConsistentHashLoadBalance
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// 以方法為粒度
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// invokers的唯一標示,
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果invokers列表變了,就重新hash
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 虛擬節點,用於使請求更均勻
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 虛擬節點個數
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 參與生成hash值的引數的序號
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 初始化的時候生成hash環
for (Invoker<T> invoker : invokers) {
// 一個地址(ip:port)生成的hash節點實際上是固定的
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 生成的hash值儘量雜湊
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 用指定的引數參與生成hash
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
// 只要引數相同,那麼生成的hash值就是相同的,選擇到的invoker就是相同的
// 這也就保證了相同的呼叫(同一個服務的,並且呼叫引數相同),總會被分配到固定的提供者
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
// entry == null說明hash比treeMap中最大的hash值還大,
// 根據一致性hash環的演算法,這是需要繞回第一個節點,即hash最小的那個節點
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
// 雜湊函式,這種方法使生成的hash儘量均勻分散
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
首先我們要理解一致性hash的演算法原理,明白一致性hash演算法之後再來看這個類,就簡單多了。
值得注意的是,這裡使用了TreeMap儲存hash環的所有hash值值得借鑑,這樣做的好處是查詢快速,效率很高