1. 程式人生 > >dubbo源碼閱讀之集群(故障處理策略)

dubbo源碼閱讀之集群(故障處理策略)

rst fec 是否 adb indexof 均衡 lsa getc 問題

dubbo集群概述

dubbo集群功能的切入點在ReferenceConfig.createProxy方法以及Protocol.refer方法中。
在ReferenceConfig.createProxy方法中,如果用戶指定多個提供者url或註冊中心url,那麽會創建多個Invoker,然後用StaticDirectory將這多個Invoker封裝在一起,然後用相應的Cluster實現類將這個靜態的服務目錄包裝成一個Invoker,每種集群類都對應一種Invoker的集群包裝類,例如,FailoverClusterInvoker,FailbackClusterInvoker,FailfastClusterInvoker,FailsafeClusterInvoker,ForkingClusterInvoker等等,而這些封裝集群邏輯的Invoker包裝類都繼承自AbstractClusterInvoker抽象類。這個抽象類裏主要實現了調用時的狀態檢查,Invocation類參數設置,負載均衡,服務提供者可用性檢測等邏輯,而服務調用失敗後的行為邏輯則交由子類實現。

AbstractClusterInvoker.invoke

首先我們從這個方法看起,這個方法是Invoker類的調用入口,

@Override
// 這個方法的主要作用是為調用做一些前置工作,
// 包括檢查狀態,設置參數,從服務目錄取出invoker列表,根據<方法名>.loadbalance參數值獲取相應的負載均衡器
// 最後調用模板方法
public Result invoke(final Invocation invocation) throws RpcException {
    // 檢查該Invoker是否已經被銷毀
    // 在監聽到註冊中心變更刷新Invoker列表時可能會銷毀不再可用的Invoker
    checkWhetherDestroyed();

    // binding attachments into invocation.
    // 將RpcContext中的參數綁定到invocation上
    // 用戶可以通過RpcContext向每次調用傳遞不同的參數
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 列出所有的服務提供者
    // 這個方法直接調用服務目錄的list方法
    List<Invoker<T>> invokers = list(invocation);
    // 根據url中的loadbalance參數值獲取相應的負載均衡器,默認是隨機負載均衡RandomLoadBalance
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 添加調用id,唯一標識本次調用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 模板方法,子類實現
    return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker.doInvoke

我們以默認的集群類FailoverClusterInvoker為例,分析一下這個類的doInvoke方法

// 這個方法主要實現了重試的邏輯,這也正是這個類的特性,故障轉移功能
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    // 拷貝一份本地引用,invokers可能會變
    List<Invoker<T>> copyInvokers = invokers;
    // 檢查提供者列表是否為空
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 獲取調用的方法的retries參數值,重試次數等於該值+1,因為第一次調用不算重試
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    // 循環重試
    // 記錄最後一次出現的異常
    RpcException le = null; // last exception.
    // 記錄調用失敗的提供者
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    // 記錄調用過的提供者的地址,
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        // 每次循環都要重新檢查狀態,重新列出可用的提供者Invoker,並檢查可用的Invoker是否為空
        // 因為這些狀態或提供者信息隨時都可能發生變化
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        // 從可用的Invoker列表總選擇一個
        // 選擇邏輯中考慮了“粘滯”調用和負載均衡的邏輯
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        // 添加到已經調用的列表中
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // 對於業務異常直接拋出,這個異常會穿透dubbo框架直接拋給用戶
            // 非業務異常例如網絡問題,連接斷開,提供者下線等可以通過故障轉移,重試機制解決,
            // 這裏之所以直接拋出是因為一旦發生了業務異常就不是dubbo框架能處理的了,再重試也沒有意義了
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

這個方法的邏輯還是比較清晰的,就是重試,這也就是這個這個類的主要功能,故障轉移,如果調用發生異常,就重試調用其他可用的提供者。其中select方法的實現在抽象類AbstractClusterInvoker中。

AbstractClusterInvoker.select

// 這個方法主要實現了“粘滯”調用的邏輯
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

    // 可以通過在url中設置sticky參數的值來決定要不要啟用“粘滯”調用的特性
    // 默認不啟用該特性
    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    // 如果緩存的粘滯Invoker已經不在可用列表裏了,那麽就應當將其移除
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    // 如果啟用了粘滯調用,並且粘滯調用存在,並且粘滯的Invoker不在已經調用失敗的Invoker列表中
    // 那麽直接返回粘滯的Invoker
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }

    // 根據負載均衡策略選擇一個Invoker
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    // 設置粘滯的Invoker
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

這個方法主要實現了“粘滯”調用的邏輯。

AbstractClusterInvoker.doSelect

// 根據負載均衡策略選擇一個Invoker
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    // 根據負載均衡策略選擇一個Invoker
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // 對於選擇出來的Invoker還要再判斷其可用性
    // 對於如下情況需要再次選擇Invoker
    // 1. 選出的Invoker在調用失敗列表中
    // 2. 設置了可用檢查為true並且選出的Invoker不可用
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 重新選擇Invoker, 首先排除調用失敗列表進行選擇,實在不行會去調用失敗列表中看能不能找到又“活過來”的提供者
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    // 如果沒有重選出新的Invoker,那麽直接用下一個Invoker
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

第一次選擇是不考慮調用失敗列表的,所以選出來的Invoker有可能在調用失敗列表中,這時需要進行重選。

AbstractClusterInvoker.reselect

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // First, try picking a invoker not in `selected`.
    for (Invoker<T> invoker : invokers) {
        if (availablecheck && !invoker.isAvailable()) {
            continue;
        }

        // 排除調用失敗列表中的Invoker
        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }

    // 如果還有剩余的Invoker, 那麽根據負載均衡邏策略選擇一個
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // Just pick an available invoker using loadbalance policy
    // 是在沒有可用的,只能從調用失敗列表中找找看有沒有可用的
    // 因為在重試期間有可能之前調用失敗的提供者變成可用的了
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first
                    && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    // 再次選擇
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // 實在沒有可用的提供者,只能返回null了
    return null;
}

其實從這幾個選擇的方法中可以看出來,dubbo的作者還是很用心的,盡最大可能保證調用的成功。

FailfastClusterInvoker

快速失敗,只調用一次,失敗後直接拋異常。代碼很簡單,就不多說了

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                        + " select from all providers " + invokers + " for service " + getInterface().getName()
                        + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion()
                        + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                e.getCause() != null ? e.getCause() : e);
    }
}

FailsafeClusterInvoker

失敗安全的故障處理策略,所謂失敗安全是指在調用失敗後,不拋異常只記錄日誌。

@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        // 返回一個空結果,用戶需要對返回結果進行判斷
        return new RpcResult(); // ignore
    }
}

FailbackClusterInvoker

失敗後記錄下失敗的調用,之後以一定的間隔時間進行重試,這種策略很適合通知類的服務調用。重試間隔固定為5秒, 重試次數可以通過參數設置,默認是3次。

ForkingClusterInvoker

這種策略比較有意思,每次調用都會起多個線程並行第跑,誰先跑出結果就用誰的,這種估計很少用吧,誰這麽財大氣粗,大把大把的資源用來浪費。
不過這很像一些分布式計算框架中的推測執行策略,如果有些任務跑的慢,那麽就會在其他節點也跑這個任務,誰先跑完就用誰的結果,比如spark中就有推測執行的機制。

總結

不同的集群包裝類有不同的故障處理策略,默認的故障轉移,此外常用的有快速失敗,失敗安全,定時重試,合並調用等等。

dubbo源碼閱讀之集群(故障處理策略)