1. 程式人生 > >Dubbo/Dubbox的服務消費(一)- 服務代理的建立

Dubbo/Dubbox的服務消費(一)- 服務代理的建立

dubbo的consumer的初始過程

一個常見的consumer配置是這樣的

<dubbo:reference id="dubboDemo" interface="com.company.dsp.adcenter.protocol.dubbo.DubboDemo"
                 protocol="dubbo"/>

這樣spring會建立一個beanName為dubboDemo的bean,使用上是這樣的

@Resource(name = "dubboDemo")
private DubboDemo dubboDemo;

那麼問題來了why why why? dubbo是怎麼建立這樣的bean的?全程都沒有要求我們使用bean標籤,也沒有要求使用Componment和Service註解,即使使用顯示的方式宣告一個Bean,那我們也沒有相關的實現類啊。

既然我們已經擁有上面的配置項,拍腦袋一想,已知dubbo啟動時會利用spring對自定義配置項的spring.handles檔案載入對應的handler解析自定義節點,且上文書已經分析
dubbo:reference節點的配置項儲存在com.alibaba.dubbo.config.spring.ReferenceBean 中,該類的uml圖如下。
這裡寫圖片描述
可見該類整合自com.alibaba.dubbo.config.ReferenceConfig 類,並實現了下面四個介面

org.springframework.beans.factory.FactoryBean
org.springframework
.context.ApplicationContextAware org.springframework.beans.factory.InitializingBean org.springframework.beans.factory.DisposableBean

InitializingBean和ApplicationContextAware我們都很熟悉了,還沒詳細瞭解過FactoryBean,不過這裡有一個道友的文章可以看看《FactoryBean的實現原理與作用》
其中提到了以下兩句

OK,那麼這個時候我們getBean(“personFactory”)得到的就是Person物件而不是PersonFactoryBean物件。具體原理參考上面在IOC的應用,我們通過bean = getObjectForBeanInstance(sharedInstance, name, beanName, null)這個方法,具體呼叫到了getObject方法,所以結果很明顯。
通過上面的小案例的程式碼,我們可以看到如果一個類實現了FactoryBean介面,那麼getBean得到的不是他本身了,而是它所產生的物件,如果我們希望得到它本身,只需要加上&符號即可。至於FactoryBean的實際應用,需要大家去發現理解,後面如果有機會會繼續聊聊這個東西。

那把目光轉向:com.alibaba.dubbo.config.spring.ReferenceBean,開啟找尋getObject方法之旅

/**
    * ReferenceFactoryBean
    *
    * @author william.liangf
    * @export
    */
    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
        //......
        public Object getObject() throws Exception {
        return get();
        }
        //......
    }

其中get()方法 繼承自com.alibaba.dubbo.config.ReferenceConfig

 public synchronized T get() {
        if (destroyed){
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

可見get方法內部呼叫了一個init()方法

private void init() {
        if (initialized) {
            return;
        }
        initialized = true; //初始化標記,防止重複建立物件
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // 獲取消費者全域性配置,檢查consumer是否有配置,如果該屬性為null,則new一個ConsumerConfig物件
        // 方法內部呼叫appendProperties(consumer)方法,該方法內部會拼裝一個 dubbo.tagName.屬性名的key,在配置檔案中查詢值,如果有值則呼叫屬性的setter方法,設定屬性值。
        checkDefault();
        // 上邊註釋已經說明appendProperties方法用途
        appendProperties(this);
        if (getGeneric() == null && getConsumer() != null) { //判斷是不是泛化呼叫
            setGeneric(getConsumer().getGeneric());
        }
        if (ProtocolUtils.isGeneric(getGeneric())) {
            interfaceClass = GenericService.class;
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            //繼承自com.alibaba.dubbo.config.AbstractInterfaceConfig
            //檢查介面類中是否存在指定的方法,如果dubbo:service->dubbo:method 沒有配置的情況下,methods為null,該方法不會執行方法校驗。
            //如果有相關的配置,該方法會檢查name屬性對應的方法是否存在,不存在會拋IllegalStateException異常。
            checkInterfaceAndMethods(interfaceClass, methods);
        }
        //用-Ddubbo.resolve.file指定對映檔案路徑,此配置優先順序高於<dubbo:reference>中的配置,1.0.15及以上版本支援
        //......省略-Ddubbo.resolve.file的解析程式碼行......//
        //........省略進一步初始化相關配置的程式碼行 .......//
        //檢查應用配置,如果沒有配置會建立預設物件。其內部會呼叫appendProperties(AbstractConfig config) 填充屬性。
        //檢查失敗會丟擲IllegalStateException異常
        checkApplication();
        //
        checkStubAndMock(interfaceClass);
        //看到Map就應當想到dubbo又要初始化一個URL物件了
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        if (! isGeneric()) { //非泛化呼叫
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if(methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            }
            else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        map.put(Constants.INTERFACE_KEY, interfaceName);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        String prifix = StringUtils.getServiceKey(map);
            //如果指定了介面暴露哪些方法時才有效
        if (methods != null && methods.size() > 0) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                appendAttributes(attributes, method, prifix + "." + method.getName());
                checkAndConvertImplicitConfig(method, map, attributes);
            }
        }
        //attributes通過系統context進行儲存.
        StaticContext.getSystemContext().putAll(attributes);
        //建立當前服務的消費代理(十分重要)
        ref = createProxy(map);
    }
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
            private T createProxy(Map<String, String> map) {
                URL tmpUrl = new URL("temp", "localhost", 0, map);
                final boolean isJvmRefer;
                if (isInjvm() == null) {
                    if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用
                        isJvmRefer = false;
                    } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                        //預設情況下如果本地有服務暴露,則引用本地服務.
                        isJvmRefer = true;
                    } else {
                        isJvmRefer = false;
                    }
                } else {
                    isJvmRefer = isInjvm().booleanValue();
                }

                if (isJvmRefer) {
                    URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
                    invoker = refprotocol.refer(interfaceClass, url);
                    if (logger.isInfoEnabled()) {
                        logger.info("Using injvm service " + interfaceClass.getName());
                    }
                } else {
                    if (url != null && url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL
                        //......省略點對點直連的程式碼行......//
                    } else { // 通過註冊中心配置拼裝URL
                        //該方法繼承自com.alibaba.dubbo.config.AbstractInterfaceConfig
                        //在http://blog.csdn.net/weiythi/article/details/78467094 有相關介紹
                        List<URL> us = loadRegistries(false);
                        if (us != null && us.size() > 0) { //遍歷註冊中心地址
                            for (URL u : us) {
                                URL monitorUrl = loadMonitor(u);
                                if (monitorUrl != null) {
                                    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                                }
                                urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                            }
                        }
                        if (urls == null || urls.size() == 0) {
                            throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                        }
                    }

                    //以下的程式碼反映一個主題,即建立一個invoker例項(com.alibaba.dubbo.rpc.Invoker)
                    if (urls.size() == 1) {
                        //這裡refprotocol又是一個自適應的擴充套件點
                        invoker = refprotocol.refer(interfaceClass, urls.get(0));
                    } else {
                        List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                        URL registryURL = null;
                        for (URL url : urls) {
                            //注意這裡refprotocol是一個自適應擴充套件點,根據url的protocol判斷,這裡返回的是一個ProtocolListenerWrapper包裝類例項,幷包裝com.alibaba.dubbo.registry.integration.RegistryProtocol,refer方法涉及服務發現。獨立章節說明
                            invokers.add(refprotocol.refer(interfaceClass, url));
                            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                                 registryURL = url; // 用了最後一個registry url
                            }
                        }
                        if (registryURL != null) { // 有 註冊中心協議的URL
                            // 對有註冊中心的Cluster 只用 AvailableCluster
                            URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                            invoker = cluster.join(new StaticDirectory(u, invokers));
                        }  else { // 不是 註冊中心的URL
                            invoker = cluster.join(new StaticDirectory(invokers));
                        }
                    }
                }

                Boolean c = check;
                if (c == null && consumer != null) {
                    c = consumer.isCheck();
                }
                if (c == null) {
                    c = true; // default true
                }
                if (c && ! invoker.isAvailable()) {
                    throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
                }
                // 建立服務代理,
                return (T) proxyFactory.getProxy(invoker);
            }

注意這裡refprotocol是一個自適應擴充套件點,根據url的protocol判斷,這裡返回的是一個ProtocolListenerWrapper包裝類例項,幷包裝com.alibaba.dubbo.registry.integration.RegistryProtocol,refer方法涉及服務發現。有時間獨立章節說明
createProxy方法的最後一行return (T) proxyFactory.getProxy(invoker); 那麼這個proxyFactory又是啥?

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

好吧,他又是一個自適應的擴充套件點,看一下ProxyFactory的介面定義,結合我們對dubbo中擴充套件點機制的掌握,看一看這個proxyFactory建立了一個怎樣的代理 。com.alibaba.dubbo.rpc.ProxyFactory類的定義如下


                package com.alibaba.dubbo.rpc;

                import com.alibaba.dubbo.common.Constants;
                import com.alibaba.dubbo.common.URL;
                import com.alibaba.dubbo.common.extension.Adaptive;
                import com.alibaba.dubbo.common.extension.SPI;

                /**
                 * ProxyFactory. (API/SPI, Singleton, ThreadSafe)
                 *
                 * @author william.liangf
                 */
                @SPI("javassist")
                public interface ProxyFactory {

                    /**
                     * create proxy.
                     *
                     * @param invoker
                     * @return proxy
                     */
                    @Adaptive({Constants.PROXY_KEY})
                    <T> T getProxy(Invoker<T> invoker) throws RpcException;

                    /**
                     * create invoker.
                     *
                     * @param <T>
                     * @param proxy
                     * @param type
                     * @param url
                     * @return invoker
                     */
                    @Adaptive({Constants.PROXY_KEY})
                    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

                }

Constants.PROXY_KEY 常量的值為proxy,在\META-INF\dubbo\internal\com.alibaba.dubbo.rpc.ProxyFactory檔案中找到擴充套件點的配置如下:

stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
                jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
                javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory

其中@SPI註解的值為javassist 所以使用com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory 類作為擴充套件點的實現,但是這裡還有com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper包裝類

所以這裡會使用StubProxyFactoryWrapper例項包裝JavassistProxyFactory,呼叫getProxy(invoker)的化學反應;方法會呼叫繼承自com.alibaba.dubbo.rpc.proxy.AbstractProxyFactory 的 getProxy(invoker); 方法,其內部實現如下

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        Class<?>[] interfaces = null;
        //config 一般配置下這裡會返回 null
        String config = invoker.getUrl().getParameter("interfaces");
        if (config != null && config.length() > 0) {
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;
                for (int i = 0; i < types.length; i ++) {
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        if (interfaces == null) {
            interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
        }
        return getProxy(invoker, interfaces);
    }

可見dubbo 加進來了一個奇怪的介面 com.alibaba.dubbo.rpc.service.EchoService,這裡先暫時不管這個EchoService
看一下 getProxy(invoker, interfaces);的實現

@SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

com.alibaba.dubbo.common.bytecode.Proxy 是一個生成代理物件的工具類
1. 遍歷所有入參介面,以;分割連線起來, 以它為key以map為快取查詢如果有,說明代理物件已建立返回
2. 利用AtomicLong物件自增獲取一個long陣列來作為生產類的字尾,防止衝突
3. 遍歷介面獲取所有定義的方法,加入到一個集合Set worked中 ,用來判重,
Proxy.getProxy(interfaces),會返回一個代理物件,該方法內部操作複雜,會返回一個如下所示的代理類

package com.alibaba.dubbo.common.bytecode;

import java.lang.reflect.InvocationHandler;

public class Proxy0 extends com.alibaba.dubbo.common.bytecode.Proxy {

    @Override
    public Object newInstance(InvocationHandler h) {
        return new proxy0(h);
    }
}


class proxy0 implements 服務介面,com.alibaba.dubbo.rpc.service.EchoService{
    /**
     * 被代理的方法
     */
    public static java.lang.reflect.Method[] methods;
    private java.lang.reflect.InvocationHandler handler;


    public proxy0() {

    }

    /**
     * 建構函式
     *
     * @param handler
     */
    public proxy0(java.lang.reflect.InvocationHandler handler) {
        this.handler = handler;
    }

    /**
     * 被代理方法(有返回值,有引數)
     *
     * @return
     */
    com.company.project.common.dto.Result methodName0(com.company.project.module.protocol.request.AdGroupRequest adGroupRequest) throws Throwable {
        //args陣列長度為引數列表長度
        Object[] args = new Object[1];
        args[0] = adGroupRequest;
        Object ret = handler.invoke(this, methods[0], args);
        return (com.ksyun.dsp.common.dto.Result) ret;
    }


    /**
     * 被代理方法(無返回值,有引數)
     *
     * @return
     */
    void methodName1(com.company.project.module.protocol.request.AdGroupRequest adGroupRequest) throws Throwable {
        Object[] args = new Object[1];
        args[0] = adGroupRequest;
        Object ret = handler.invoke(this, methods[1], args);
    }

    /**
     * 被代理方法(無返回值,無引數)
     *
     * @return
     */
    void methodName2() throws Throwable {
        Object[] args = new Object[0];
        Object ret = handler.invoke(this, methods[2], args);
    }
}

從以上示例程式碼可見,剩下的操作都是通過
handler.invoke(this, method, args); 這種方式來實現代理方法的呼叫的
dubbo內部對InvocationHandler的實現如下,可見其還是實現java.lang.reflect.InvocationHandler這個介面的,但是以dubbo的代理生成方式來說,
好像沒什麼必要實現InvocationHandler介面,這裡可能是為了維護動態代理的語義吧

/**
 * InvokerHandler
 *
 * @author william.liangf
 */
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

最後一行 return invoker.invoke(new RpcInvocation(method, args)).recreate();,為便於理解拆分為以下兩行

      Result rpcResult = invoker.invoke(new RpcInvocation(method, args));
       return rpcResult.recreate();

其中RpcInvocation如圖
這裡寫圖片描述

總結:
1.dubbo通過實現org.springframework.beans.factory.FactoryBean介面的方式實現對服務的代理
2.Dubbo將所有的服務都實現了一個EchoService(回聲測試用於檢測服務是否可用,回聲測試按照正常請求流程執行,能夠測試整個呼叫是否暢通,可用於監控。所有服務自動實現EchoService介面,只需要將任意服務引用強制轉換為EchoService,即可使用。)
3.服務消費最終會呼叫com.alibaba.dubbo.rpc.Invoker.invoke(new RpcInvocation(method, args)); 方法