dubbo原始碼分析-Directory 和 LoadBalance-筆記
阿新 • • 發佈:2018-12-03
Directory
- 訂閱節點的變化,
- 當zookeeper上指定節點發生變化以後,會通知到RegistryDirectory的notify方法
- 將url轉化為invoker物件
呼叫過程中invokers的使用
- StaticDirectory: 靜態目錄服務,
- 它的所有Invoker通過建構函式傳入,
- 服務消費方引用服務的時候, 服務對多註冊中心的引用,將Invokers集合直接傳入 StaticDirectory構造器,再由Cluster偽裝成一個Invoker
- StaticDirectory的list方法直接返回所有invoker集合;
- RegistryDirectory: 註冊目錄服務,
- 它的Invoker集合是從註冊中心獲取的,
- 它實現了NotifyListener介面實現了回撥介面notify(List<Url>)
Directory目錄服務的更新過程
- RegistryProtocol.doRefer方法,也就是消費端在初始化的時候,這裡涉及到了RegistryDirectory這個類。然後執行cluster.join(directory)方法。
- 這些程式碼在上節課有分析過。
- cluster.join其實就是將Directory中的多個Invoker偽裝成一個Invoker, 對上層透明,包含叢集的容錯機制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//對多個invoker進行組裝 directory.setRegistry(registry); //ZookeeperRegistry directory.setProtocol(protocol); //protocol=Protocol$Adaptive //url=consumer://192.168.111.... URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); //會把consumer://192... 註冊到註冊中心 if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { //zkClient.create() registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //Cluster$Adaptive return cluster.join(directory); }
directory.subscribe
訂閱節點的變化,
- 當zookeeper上指定節點發生變化以後,會通知到RegistryDirectory的notify方法
- 將url轉化為invoker物件
呼叫過程中invokers的使用
- 再呼叫過程中,AbstractClusterInvoker.invoke方法中
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
list方法
- 從directory中獲得invokers
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
負載均衡LoadBalance
- LoadBalance負載均衡, 負責從多個 Invokers中選出具體的一個Invoker用於本次呼叫,呼叫過程中包含了負載均衡的演算法。
負載均衡程式碼訪問入口
- 在AbstractClusterInvoker.invoke中程式碼如下,通過名稱獲得指定的擴充套件點。RandomLoadBalance
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
AbstractClusterInvoker.doselect
- 呼叫LoadBalance.select方法,將 invokers 按照指定演算法進行負載
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 如果只有兩個invoker,退化成輪循
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果 selected中包含(優先判斷) 或者 不可用&&availablecheck=true 則重試.
if( (selected != null && selected.contains(invoker))
||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
try{
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if(rinvoker != null){
invoker = rinvoker;
}else{
//看下第一次選的位置,如果不是最後,選+1位置.
int index = invokers.indexOf(invoker);
try{
//最後在避免碰撞
invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
}catch (Exception e) {
logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
}
}
}catch (Throwable t){
logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
}
}
return invoker;
}
- 預設情況下,LoadBalance使用的是Random演算法,但是這個隨機和我們理解上的隨機還是不一樣的,因為他還有個概念叫weight(權重)
RandomLoadBalance
- 假設有四個叢集節點A,B,C,D,對應的權重分別是1,2,3,4,
- 那麼請求到A節點的概率就為1/(1+2+3+4) = 10%.B,C,D節點依次類推為20%,30%,40%.
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 總個數
int totalWeight = 0; // 總權重
boolean sameWeight = true; // 權重是否都一樣
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累計總權重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 計算所有權重是否一樣
}
}
if (totalWeight > 0 && ! sameWeight) {
// 如果權重不相同且權重大於0則按總權重數隨機
int offset = random.nextInt(totalWeight);
// 並確定隨機值落在哪個片斷上
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果權重相同或權重為0則均等隨機
return invokers.get(random.nextInt(length));
}