1. 程式人生 > >Dubbo原始碼解析之客戶端初始化及服務呼叫

Dubbo原始碼解析之客戶端初始化及服務呼叫

準備

dubbo 版本:2.5.4

客戶端初始化過程

初始化過程

先上時序圖,幫助理解客戶端初始化過程。
Dubbo客戶端初始化

ReferenceBean 是客戶端初始化入口,其實現 InitializingBean 介面,在 bean 初始化過程中會呼叫其 afterPropertiesSet 方法,進而呼叫 getObject() -> get() -> init() ,之後再呼叫 ReferenceConfigcreateProxy() 方法。

ReferenceConfig

private T createProxy(Map<String, String> map) {
    URL tmpUrl =
new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { // 指定URL的情況下,不做本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { // 預設情況下如果本地有服務暴露,則引用本地服務
isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker =
refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 使用者自己指定URL,可能是點對點直連地址,也可能是註冊中心URL if (url != null && url.length() > 0) { String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 通過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); // 從註冊中心上獲得相應的協議url地址 if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { // refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); // refprotocol -> Protocol$Adaptive invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最後一個registry url } } // 註冊中心地址URL不為空 if (registryURL != null) { // 對有註冊中心的Cluster只用AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 返回的是MockClusterInvoker(FailoverClusterInvoker) invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && ! invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }

Protocol$Adpative

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        // extName -> registry
        // extension -> RegistryProtocol
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

RegistryProtocol

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter("registry", "dubbo")).removeParameter("registry");
    Registry registry = this.registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return this.proxyFactory.getInvoker(registry, type, url);
    } else {
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded("refer"));
        String group = (String)qs.get("group");
        return group == null || group.length() <= 0 || Constants.COMMA_SPLIT_PATTERN.split(group).length <= 1 && !"*".equals(group) ? this.doRefer(this.cluster, registry, type, url) : this.doRefer(this.getMergeableCluster(), registry, type, url);
    }
}

RegistryProtocol.doRefer方法存在一個引數 Cluster ,而 Cluster 是一個擴充套件點,存在加在方法級別上的 @Adaptive 註解,說明會動態生成自適應介面卡( Cluster$Adaptive )。在 RegistryProtocol 中存在 Cluster 擴充套件點成員變數及 setter 方法,說明是一個自動注入的擴充套件點。

@SPI("failover")
public interface Cluster {
    @Adaptive
    <T> Invoker<T> join(Directory<T> var1) throws RpcException;
}
// RegistryProtocol
private Cluster cluster;

public void setCluster(Cluster cluster) {
    this.cluster = cluster;
}

Cluster$Adpative

public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
    
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

又因為 Cluster 擴充套件點實現中存在以擴充套件點作為引數的構造方法,所以會被 Wrapper 裝飾,而該裝飾器就是 MockClusterWrapper

public class MockClusterWrapper implements Cluster {
    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker(directory, this.cluster.join(directory));
    }
}
// RegistryProtocol
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory(type, url);
    directory.setRegistry(registry); // registry -> ZookeeperRegistry
    directory.setProtocol(this.protocol); // protocol -> Protocol$Adaptive
    URL subscribeUrl = new URL("consumer", NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
        // 註冊consumer://協議地址到註冊中心
        registry.register(subscribeUrl.addParameters(new String[]{"category", "consumers", "check", String.valueOf(false)}));
    }
	// 註冊zookeeper地址變更
    directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
    // 返回一個MockClusterInvoker(FailoverClusterInvoker)
    return cluster.join(directory);
}

MockClusterWrapper

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {