1. 程式人生 > >深入理解dubbo之服務引用

深入理解dubbo之服務引用

在之前說了一下dubbo的服務釋出過程,其實嚴格意義上只說了一半吧,只把dubbo如何經過ProxyFactory的代理成一個Invoker,等待客戶端呼叫的過程講了一遍,而重要的Protocol.export方法略過去了,今天我將連帶dubbo的comsumer客戶端服務引用和Protocol機制來講一講。

dubbo服務引用

和上一篇文章一樣,先來個demo

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"
>
<dubbo:application name="ifenqu-web" /> <dubbo:registry protocol="zookeeper" address="${dubbo.address}" /> <dubbo:protocol name="dubbo" port="${dubbo.port}"/> <dubbo:consumer timeout="60000" check="false"/> <dubbo:service interface="com.xxx.xxx.xxxService"
ref="xxxService" timeout="5000"/>
</beans>

在上篇文章我們已經說了,對於service暴露方也就是provider方有對應的ServiceConfig,相應的Reference引用方也就是Consumer有對應的ReferenceConfig。ReferenceConfig中定義了每一個介面引數定義,這只是部分,還有一大堆引數在父類裡就不列出來了。

    // 介面型別
    private String               interfaceName;

    private Class<?>             interfaceClass;

    // 客戶端型別
    private String               client;

    // 點對點直連服務提供地址
    private String               url;

    // 方法配置
    private List<MethodConfig>   methods;//介面所有的方法配置

    // 預設配置
    private ConsumerConfig       consumer;//該引數對應的就是<dubbo:consumer timeout="60000" check="false"/>

    private String               protocol;//如果為空預設dubbo

引數設值分兩步,第一步是物件建立的時候,第二步是呼叫了get方法後執行init()方法。這個get方法就是服務引用的入口:

public synchronized T get() {
        if (destroyed){
            throw new IllegalStateException("Already destroyed!");
        }
        //服務例項已經存在就直接返回,沒有就進行初始化
        if (ref == null) {
            init();
        }
        return ref;
    }

init()方法主要分為兩個步驟,第一步:收集上下文,第二步:根據上下文建立服務例項
ref = createProxy(map);進入createProxy方法中去看看

private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                //預設情況下如果本地有服務暴露,則引用本地服務.
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // 通過註冊中心配置拼裝URL
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最後一個registry url
                    }
                }
                if (registryURL != null) { // 有 註冊中心協議的URL
                    // 對有註冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 註冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        //忽略非核心程式碼
        // 建立服務代理
        return (T) proxyFactory.getProxy(invoker);
    }

該方法中主要邏輯就是先判斷需要引用的型別,是本地服務暴露還是直連遠端服務還是叢集遠端服務。如果暴露的服務本地就有直接url就是localhost,而對於叢集還涉及到了loadbanlance。無論是什麼型別的服務核心都是refprotocol.refer(interfaceClass, urls.get(0)),這個方法的返回值就是上篇說的Invoker物件,回憶一下,Invoker物件中封裝了介面資訊和invoke方法,只要客戶端拿到了這個Invoker就可以執行invoke進而通過遠端通訊觸發服務端的service返回執行結果。讓我們的視線再回到refprotocol上:

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> var1) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException;

    void destroy();
}

這個Protocol介面定義了三個方法,export、refer、destory。分別是服務暴露和服務引用和銷燬方法。經過上一篇的講述,我們已經知道了dubbo支援dubbo、http、thrift等多種協議。那麼dubbo是如何做到多個版本協議可以切換自如呢?方法就在SPI上。

SPI

SPI(Service Provider Interface)本來是針對不同廠商或外掛的一個規範,提供擴充套件的時候可以對同一個功能用不同的實現。Java SPI的基本思想可以用設計模式六大原則之開閉原則解釋,也就是說要對介面開放,對修改關閉。基於這個原則我們就可以對不同的實現完成可拔插的效果。多種不用實現想用哪種用哪種,只要簡單修改配置。
Java SPI的具體約定為:當服務的提供者,提供了服務介面的一種實現之後,在jar包的META-INF/services/目錄裡同時建立一個以服務介面命名的檔案。該檔案裡就是實現該服務介面的具體實現類。而當外部程式裝配這個模組的時候,就能通過該jar包META-INF/services/裡的配置檔案找到具體的實現類名,並裝載例項化,完成模組的注入。 基於這樣一個約定就能很好的找到服務介面的實現類,而不需要再程式碼裡制定。jdk提供服務實現查詢的一個工具類:java.util.ServiceLoader

知道了SPI的定義現在就更好了解Protocol的實現原理了,在ReferenceConfig中獲取具體的Protocol是這一行程式碼private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();我們可以根據SPI的定義來看看它的實現過程。進入ExtensionLoader類,在這個類中我們可以看到以下幾個熟悉的全域性常量

 private static final String SERVICES_DIRECTORY = "META-INF/services/";
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    private static final String DUBBO_INTERNAL_DIRECTORY = "META-INF/dubbo/internal/";
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap();

debug斷點跟進去就會看到這個EXTENSION_LOADERS裡面已經裝滿了多個ExtensionLoaders,盡到dubbo的jar包的META-INF/dubbo/internal/路徑時就全明白了,這裡面就是這些ExtensionLoaders。就拿我們要說的Protocol來說,ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()這行程式碼說的就是傳入Protocol.class型別,就能在EXTENSION_LOADERS中找到com.alibaba.dubbo.rpc.Protocol這個類,也就是META-INF/dubbo/internal/路徑下的定義的檔案com.alibaba.dubbo.rpc.Protocol。進入這個檔案我們能看到所有Protocol實現類定義:

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol

redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

如果你選擇dubbo協議,Protocol的介面實現類就會使用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol我這裡使用的是系統預設的dubbo協議,所以我們上面ReferenceConfig方法裡面呼叫的invoker = refprotocol.refer(interfaceClass, urls.get(0));就是DubboProtocol的Refer方法:

 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
        this.invokers.add(invoker);
        return invoker;
    }

這個方法沒什麼好說的,就是Invoker的例項化而已,不過這個invoker已經有所有呼叫服務端的服務的必要引數。只需要通過invoker作為引數用ProxyFactory進行動態代理來拿到代理類就行了。

遠端呼叫

真正在方法呼叫時才會觸發invoker的doInvoke方法,讓我們看看這個doInvoke方法:

   protected Result doInvoke(Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation)invocation;
        String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment("path", this.getUrl().getPath());
        inv.setAttachment("version", this.version);
        //通訊客戶端,可以與socket的客戶端類比
        ExchangeClient currentClient;
        if(this.clients.length == 1) {
            currentClient = this.clients[0];
        } else {
            currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
        }

        try {
            boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
            int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
            if(isOneway) {
                boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture((Future)null);
                return new RpcResult();
            } else if(isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture((Future)null);
                return (Result)currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException var9) {
            throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var9.getMessage(), var9);
        } catch (RemotingException var10) {
            throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
        }
    }

這個方法也很簡單,首先拿到遠端呼叫的引數,比如方法名,呼叫路徑,暴露服務的版本號(版本號不同無法調通),拿到了ExchangeClient後就開始了請求,這個請求分同步和非同步,都是看你配置來的,如果是同步的就一直阻塞直到timeout或者結果返回,如果是非同步那麼直接返回一個ResponseFuture,等執行成功後提供回撥。到這,從consumer的方法引用到方法執行都說完了。講得比較精煉,把很多東西都給省略了,其實dubbo真的特別複雜,但是對於我個人來說只要瞭解原理就已經達到我的目的了,所以到這就可以了。其實還有一部分比較重要,那就是網路通訊部分,dubbo用的是netty。等有空了可以去膜拜膜拜。