1. 程式人生 > >dubbo原始碼解析——消費過程

dubbo原始碼解析——消費過程

上一篇 dubbo原始碼解析——概要篇中我們瞭解到dubbo中的一些概念及消費端總體呼叫過程。本文中,將進入消費端原始碼解析(具體邏輯會放到程式碼的註釋中)。本文先是對消費過程的總體程式碼邏輯理一遍,個別需要細講的點,後面會專門的文章進行解析。

開頭進入InvokerInvocationHandler 通過實現InvocationHandler,我們知道dubbo生成代理使用的是JDK動態代理。這個類中主要是對特殊方法進行處理。由於在生成代理例項的時候,在建構函式中賦值了invoker,因此可以只用該invoker進行invoke方法的呼叫。

/**
 * dubbo使用JDK動態代理,對介面物件進行注入
 * InvokerHandler
 *
 * 程式啟動的過程中,在建構函式中,賦值下一個需要呼叫的invoker,從而形成執行鏈
 */
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable { // 獲取方法名稱 String methodName = method.getName(); // 獲取引數型別 Class<?>[] parameterTypes = method.getParameterTypes(); // 方法所處的類 是 Object類,則直接呼叫 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args)
; } /* * toString、hashCode、equals方法比較特殊,如果interface裡面定義了這幾個方法,並且進行實現, * 通過dubbo遠端呼叫是不會執行這些程式碼實現的。 */ /* * 方法呼叫是toString,依次執行MockClusterInvoker、AbstractClusterInvoker的toString方法 */ if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } /* * interface中含有hashCode方法,直接呼叫invoker的hashCode */ if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } /* * interface中含有equals方法,直接呼叫invoker的equals */ if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } /* * invocationv包含了遠端呼叫的引數、方法資訊 */ RpcInvocation invocation; /* * todo這段程式碼在最新的dubbo版本中沒有 */ if (RpcUtils.hasGeneratedFuture(method)) { Class<?> clazz = method.getDeclaringClass(); String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length()); Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes()); invocation = new RpcInvocation(syncMethod, args); invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } else { invocation = new RpcInvocation(method, args); if (RpcUtils.hasFutureReturnType(method)) { invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } } // 繼續invoker鏈式呼叫 return invoker.invoke(invocation).recreate(); } }

進入MockClusterInvoker 這段程式碼主要是判斷是否需要進行mock呼叫

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 獲取mock引數,從而判斷是否需要mock
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();

        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            // 不需要mock,繼續往下呼叫
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            // 選擇mock的invoker
            result = doMockInvoke(invocation, null);
        } else {
            // 正常呼叫失敗,則呼叫mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }

進入AbstractClusterInvoker 進入這段程式碼,表明開始進入到叢集

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        // 檢查消費端invoker是否銷燬了
        checkWhetherDestroyed();

        // 將引數繫結到invocation
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }

        // 獲取滿足條件的invoker(從Directory獲取,並且經過router過濾)
        List<Invoker<T>> invokers = list(invocation);

        // 初始化loadBalance
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);

        // invocation ID將被新增在非同步操作
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        return doInvoke(invocation, invokers, loadbalance);
    }

進入AbstractDirectory

@Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        // 判斷Directory是否銷燬
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        // 從methodInvokerMap中取出滿足條件的invoker
        List<Invoker<T>> invokers = doList(invocation);

        // 根據路由列表,篩選出滿足條件的invoker
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

這裡主要是從Directory中獲取invoker,並且經過router路由的篩選,獲得滿足條件的invoker。 在AbstractDirectory中,有一個關鍵的方法com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#doList,這是一個抽象方法,子類RegistryDirectory的有具體實現,並且呼叫RegistryDirectory的doList方法。(這裡應該是用到了模板方法模式)。後面的文字中會詳細講下doList方法中做了啥。

進入FailoverClusterInvoker 經過從Directory中獲取invoker,然後router篩選出滿足條件的invoker之後,進入到FailoverClusterInvoker。為什麼會到這裡呢?

根據官網的描述: 在叢集呼叫失敗時,Dubbo 提供了多種容錯方案,預設為 failover 重試。 所以這個時候是到了FailoverClusterInvoker類,但是如果你配置的是Failfast Cluster(快速失敗),Failsafe Cluster(失敗安全),Failback Cluster(失敗自動恢復),Forking Cluster(並行呼叫多個伺服器,只要一個成功即返回),Broadcast Cluster(廣播呼叫所有提供者,逐個呼叫,任意一臺報錯則報錯),他也會到達相應的類。

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 區域性引用
        List<Invoker<T>> copyinvokers = invokers;

        // 引數校驗(這種封裝方法我在工作中借鑑,個人感覺比較好)
        checkInvokers(copyinvokers, invocation);

        // 獲取方法名稱
        String methodName = RpcUtils.getMethodName(invocation);

        // 獲取重試次數
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            // 最少要呼叫1次
            len = 1;
        }

        // 區域性引用
        RpcException le = null;
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);

        // i < len 作為迴圈條件,說明len是多少就迴圈多少次
        for (int i = 0; i < len; i++) {
            // 在重試之前,需要重新選擇,以避免候選invoker的改變
            if (i > 0) {
                // 檢查invoker是否被銷燬
                checkWhetherDestroyed();
                // 重新選擇invoker
                copyinvokers = list(invocation);
                // 引數檢查
                checkInvokers(copyinvokers, invocation);
            }

            /*
             * 這一步就是進入loadBalance負載均衡
             * 因為上述步驟可能篩選出invoker數量大於1,所以再次經過loadBalance的篩選
             */
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);

            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 遠端方法呼叫
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

到達終點站.我們回憶總結一下,文初提到的三個關鍵詞,在這個叢集容錯的整體架構過程中,dubbo究竟做了什麼.其實也就是三件事

(1)在Directory中找出本次叢集中的全部invokers (2)在Router中,將上一步的全部invokers挑選出能正常執行的invokers (3)在LoadBalance中,將上一步的能正常的執行invokers中,根據配置的負載均衡策略,挑選出需要執行的invoker

後面的文章中,對上述的一些細節進行解析