dubbo原始碼解析——cluster
我們再來回顧一下官網的對於叢集容錯的架構設計圖
Cluster 將 Directory 中的多個 Invoker 偽裝成一個 Invoker(偽裝過程用到loadBalance),對上層透明,偽裝過程包含了容錯邏輯,呼叫失敗後,重試另一個。簡單來說,就是應對出錯情況採取的策略。看看這個介面: 該介面有9個實現類,換個角度來說,就是有9中應對策略,本文介紹幾個比較常用的策略
FailoverCluster 失敗自動切換,當呼叫遠端服務失敗時,自動選擇其他服務進行呼叫。可以通過retries設定重試次數。由於重試,重試次數過多時,帶來時延。
/**
* Failover
* 當invoker呼叫失敗,列印錯誤日誌,並且重試其他invoker
* 重試將導致時延
*/
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings ({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 區域性引用
List<Invoker<T>> copyinvokers = invokers;
// 引數校驗
checkInvokers(copyinvokers, invocation) ;
// 獲取方法名稱
String methodName = RpcUtils.getMethodName(invocation);
// 獲取重試次數
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
// 最少要呼叫1次
len = 1;
}
// 區域性引用
RpcException le = null;
// 已經呼叫過的invoker列表
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
// 呼叫失敗的invoker地址
Set<String> providers = new HashSet<String>(len);
// i < len 作為迴圈條件,說明len是多少就迴圈多少次(len等於 重試次數 + 1)
for (int i = 0; i < len; i++) {
if (i > 0) {
// 檢查invoker是否被銷燬
checkWhetherDestroyed();
// 重新選擇invoker(在重試之前,需要重新選擇,以避免候選invoker的改變)
copyinvokers = list(invocation);
// 引數檢查
checkInvokers(copyinvokers, invocation);
}
/*
* 這一步就是進入loadBalance負載均衡
* 因為上述步驟可能篩選出invoker數量大於1,所以再次經過loadBalance的篩選(同時避免獲取到已經呼叫過的invoker)
*/
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 遠端方法呼叫
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
// 正常執行,直接返回結果。否則,如果還有重試次數,則繼續重試
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 能到這裡,說明都失敗了,providers儲存失敗的invoker地址
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
}
MergeableCluster 這個主要用在分組聚合中,我們來看一下官網的介紹
按組合並返回結果 ,比如選單服務,介面一樣,但有多種實現,用group區分,現在消費方需從每種group中呼叫一次返回結果,合併結果返回,這樣就可以實現聚合選單項。
下面補充一下使用方法(網上基本沒有使用方法的教程,樓主才疏學淺,花了幾個小時才摸索出來): (1)consumer側,提供合併merge方法 這裡有幾個步驟: a、在resources目錄下,新建META-INF及dubbo,新建文字com.alibaba.dubbo.rpc.cluster.Merger b、對映自定義的merger名稱到相應的實現類,如: myMerger=com.patty.dubbo.consumer.service.MyMerger c、實現合併函式,需要實現Merger介面,如下:
public class MyMerger implements Merger<ModelResult> {
@Override
public ModelResult merge(ModelResult... items) {
ModelResult result = new ModelResult();
for (ModelResult item : items) {
// 進行資料合併操作
result.setResult((String)result.getResult() + (String) item.getResult());
}
return result;
}
}
(2)將reference的cluster屬性設定為"mergeable",group設定為“*”,並且設定合併方法,如下:
<dubbo:reference
id="demoService"
interface="com.huang.yuan.api.service.DemoService"
version="1.0"
timeout="100000000"
group="*">
<dubbo:method name="test" merger="myMerger"/>
</dubbo:reference>
(3)同一套程式碼,分別利用不同的group,把服務釋出到註冊中心上面。例如:/group1/com.huangyuan.demoService 及 /group2/com.huangyuan.demoService
(3)接下來就可以直接使用了,這邊測試得到結果:(這裡合併只是簡單連線字串)
接下來再看下原始碼:
public Result invoke(final Invocation invocation) throws RpcException {
// 獲取Directory中的invoker
List<Invoker<T>> invokers = directory.list(invocation);
// 獲取合併方法的名稱
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
if (ConfigUtils.isEmpty(merger)) {
for (final Invoker<T> invoker : invokers) {
// 如果沒有合併方法,只調動其中一個分組
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
// 獲取返回值型別
Class<?> returnType;
try {
returnType = getInterface().getMethod(
invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
// 保留future(未真正執行遠端呼叫)
results.put(invoker.getUrl().getServiceKey(), future);
}
Object result = null;
// 結果列表
List<Result> resultList = new ArrayList<Result>(results.size());
// 超時時間
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
//
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
// 執行遠端呼叫
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),
r.getException());
} else {
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
}
}
if (resultList.isEmpty()) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
// 只有一個結果,直接返回了
return resultList.iterator().next();
}
if (returnType == void.class) {
return new RpcResult((Object) null);
}
if (merger.startsWith(".")) {
/*
* 配置的方法名稱,以"."開頭
* 這種方式,入參固定只有一個,沒有達到合併的效果,不建議使用
*/
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
returnType.getClass().getName() + " ]");
}
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException("Can not merge result: " + e.getMessage(), e);
}
} else {
/*
* 建議使用Merger擴充套件的方式
*/
Merger resultMerger;
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
}
PS:其實合併方法還有另外一個使用方式,使用".方法名稱",並且合併方法只能寫在結果類中,這種方式有一個很大的弊端,就是原始碼中入參固定只有一個,所以達不到合併效果,故不推薦使用。
<dubbo:reference
id="demoService"
interface="com.huang.yuan.api.service.DemoService"
version="1.0"
timeout="100000000"
group="*">
<dubbo:method name="test" merger=".myMerger"/>
</dubbo:reference>
AvailableCluster
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
// 僅僅執行可只用的invoker
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}