1. 程式人生 > >dubbo源碼閱讀之服務導出

dubbo源碼閱讀之服務導出

需要 sun 展開 meta onload 額外 illegal node obj

dubbo服務導出

常見的使用dubbo的方式就是通過spring配置文件進行配置。例如下面這樣

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://dubbo.apache.org/schema/dubbo
       http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="helloService-provider"/>
    <dubbo:registry address="zookeeper://localhost:2181"/>
    <dubbo:reference interface="com.zhuge.learn.dubbo.services.HelloService"
                     check="false" id="helloService">
        <dubbo:method name="sayHello" retries="2"/>
    </dubbo:reference>

</beans>

讀過spring源碼的應該知道,spring對於非默認命名空間的標簽的解析是通過NamespaceHandlerResolver實現的,NamespaceHandlerResolver也算是一種SPI機制,通過解析jar包中的META-INF/spring.handlers文件,將所有的NamespaceHandler實現類以k-v的形式解析出來並放到內存中。所以要想擴展spring的命名空間,就要實現一個NamespaceHandler。
dubbo實現了自己的命名空間,對應的NamespaceHandler實現類是com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler。這個類也很簡單,就是定義了用於解析不同標簽的BeanDefinition解析類。但是dubbo的實現稍有不同,它將所有標簽的解析都放到同一個類同一個方法中,個人認為這種設計欠妥,不利於擴展新的標簽。

使用過dubbo的都知道,如果我們要創建一個服務提供者,我們需要在配置文件中配置service標簽,所以dubbo的服務導出一定與這個標簽相關。查看DubboNamespaceHandler代碼,會發現,服務導出的邏輯主要是由ServiceBean實現的,所以接下來我們就以ServiceBean為入口,一步步來分析dubbo的服務導出過程。

ServiceBean概覽

ServiceBean繼承了ServiceConfig類,同時實現了一大堆接口,這些接口基本上都與spring框架相關。其中ApplicationListener接口會監聽ContextRefreshedEvent事件,這個事件是在spring容器完成刷新後發布的,導出邏輯的入口就在onApplicationEvent方法中。

onApplicationEvent

public void onApplicationEvent(ContextRefreshedEvent event) {
// 如果已經導出或者關閉服務,就忽略該事件
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        export();
    }
}

ServiceConfig.export

真正的導出服務的邏輯在父類方法中

// 這是一個同步方法,保證多線程情況下不會同時進行服務導出
public synchronized void export() {
    // 檢查一些配置是否為空,對於空的配置創建默認的配置
    checkAndUpdateSubConfigs();

    if (!shouldExport()) {
        return;
    }

    if (shouldDelay()) {
        // 延遲導出服務
        delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }
}

protected synchronized void doExport() {
        // 首先做一些狀態檢查
        // 如果已經反導出服務,說明服務已經被關閉
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        // 如果已經導出過了,就不需要重復導出了
        if (exported) {
            return;
        }
        exported = true;

        // 如果服務名為空,以服務接口名作為服務名稱
        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }

doExportUrls

我們直接進入核心代碼,

private void doExportUrls() {
// 加載所有的註冊中心的URL
List registryURLs = loadRegistries(true);
// 如果配置了多個協議,那麽每種協議都要導出,並且是對所有可用的註冊url進行註冊
for (ProtocolConfig protocolConfig : protocols) {
// 拼接服務名稱,這裏的path一般就是服務名
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// 服務提供者模型,用於全面描述服務提供者的信息
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
// 導出這個服務提供者,
// 向所有的可用的註冊中心進行註冊
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

從這段代碼,我們可以看出來,dubbo會對配置的每個協議類型,每個註冊中心全部進行服務導出和註冊,服務導出和註冊的次數=協議類型數*註冊中心數

doExportUrlsFor1Protocol

這段代碼主要封裝了參數解析,和url拼裝的邏輯。
創建代理類由ProxyFactory實現,
創建本地服務並註冊到註冊中心有RegistryProtocol實現

// 導出服務的核心代碼
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 協議名稱
    String name = protocolConfig.getName();
    // 如果沒有配置協議名稱,默認是dubbo
    if (StringUtils.isEmpty(name)) {
        name = Constants.DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    // 設置side屬性為provider,side表示服務提供者還是消費者
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
    // 添加運行時信息,包括
    // 1. dubbo協議的版本號,2.0.10 ~ 2.6.2
    // 2. dubbo版本號
    // 3. 時間戳 信息
    // 4. jvm進程號
    appendRuntimeParameters(map);
    // 添加ApplicationConfig, 配置屬性
    appendParameters(map, application);
    // 添加ModuleConfig配置屬性,模塊配置,覆蓋全局配置
    appendParameters(map, module);
    // 添加ProviderConfig配置屬性
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    // 添加協議配置,覆蓋前面的配置
    appendParameters(map, protocolConfig);
    // 添加當前服務的配置,service標簽的配置,覆蓋前面的配置
    // 容易看出來,配置的優先級:service > protocol > provider > module > application
    appendParameters(map, this);
    if (CollectionUtils.isNotEmpty(methods)) {
        // 添加方法配置
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            // 替換retry配置
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            // 添加方法參數配置
            List<ArgumentConfig> arguments = method.getArguments();
            if (CollectionUtils.isNotEmpty(arguments)) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    // 添加方法參數配置
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) {
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    // 只有一個回調
                                    // 添加方法參數配置
                                    if (argument.getIndex() != -1) {
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }

                }
            }
        } // end of methods for
    }

    // 是否是泛化服務
    if (ProtocolUtils.isGeneric(generic)) {
        map.put(Constants.GENERIC_KEY, generic);
        map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
    } else {
        // 添加版本信息
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }

        // 設置方法名
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 添加token信息
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(Constants.TOKEN_KEY, token);
        }
    }
    // export service
    // 導出服務
    // 添加bind.ip屬性,並返回用於註冊的ip
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    // 添加bind.port屬性,並返回用於註冊的port
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 根據前面獲取的參數信息創建一個URL
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    // 對URL進行額外的配置
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    // 獲取服務作用於,導出到本地還是遠程
    String scope = url.getParameter(Constants.SCOPE_KEY);
    // don't export when none is configured
    // scope屬性值是none的不進行導出,直接忽略
    if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        // 只要scope屬性不等於remote就會進行本地導出
        if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        // 只要scope屬性不等於local就會進行遠程導出
        if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                // 對每一個註冊中心都進行導出
                for (URL registryURL : registryURLs) {
                    // 添加dynamic屬性的參數
                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                    // 加載監控中心的url,監控中心也是一個服務提供者
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        // 添加參數到url中
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }

                    // For providers, this is used to enable custom proxy to generate invoker
                    // 獲取用戶配置的代理
                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }

                    // ref屬性是通過spring容器的IOC特性自動註入的,
                    // 在DubboBeanDefinitionParser中對該屬性進行了解析
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             */
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                metadataReportService.publishProvider(url);
            }
        }
    }
    // 記錄已經導出的
    this.urls.add(url);
}

ProxyFactory

@SPI("javassist")
public interface ProxyFactory {

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    /**
     * create invoker.
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    // 這裏規定了以proxy為key去url中查找擴展名,如果沒有設置就用默認擴展名,
    // 默認擴展名是由SPI註解確定的,ProxyFactory的默認擴展名就是javassist
    // 查看META-INF/dubbo/internal/org.apache.dubbo.rpc.ProxyFactory文件,我們知道
    // javassist對應的擴展類就是org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

所以默認實現類是JavassistProxyFactory

JavassistProxyFactory.getInvoker

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    // Wrapper不能正確處理類名中帶有$的情況
    // 獲取一個包裝類,用來根據傳入的參數調用原始對象的不同方法
    // 起到的作用就是方法路由。
    // jdk動態代理使用反射調用不同的方法,效率較低。
    // 而javaassist通過方法名以及參數個數和參數類型進行判斷具體調用哪個方法,效率更高
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

    // 生成一個Invoker,內部僅僅是調用wrapper.invokeMethod方法
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

Wrapper.makeWrapper

真正負責代碼生成的是Wrapper.makeWrapper方法。這段代碼比較長,邏輯比較復雜,而且代碼生成的邏輯又很繁瑣,其實也沒有什麽高深的技術,所以我決定直接用單元測試來生成一段代碼,這樣就能直觀第理解生成的代碼長什麽樣

首先給出一個被代理的接口

public interface I2 {
    void setName(String name);

    void hello(String name);

    int showInt(int v);

    float getFloat();

    void setFloat(float f);
}

下面就是Wrapper.makeWrapper方法最後生成的代碼的樣子,

public class Wrapper0 extends Wrapper {
    public static String[] pns;
    public static java.util.Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;
    public static Class[] mts1;
    public static Class[] mts2;
    public static Class[] mts3;
    public static Class[] mts4;
    public static Class[] mts5;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String n) {
        return pts.containsKey($1);
    }

    public Class getPropertyType(String n) {
        return (Class) pts.get($1);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object o, String n, Object v) {
        org.apache.dubbo.common.bytecode.I2 w;
        try {
            w = ((org.apache.dubbo.common.bytecode.I2) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        if ($2.equals("name")) {
            w.setName((java.lang.String) $3);
            return;
        }
        if ($2.equals("float")) {
            w.setFloat(((Number) $3).floatValue());
            return;
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" field or setter method in class org.apache.dubbo.common.bytecode.I2.");
    }

    public Object getPropertyValue(Object o, String n) {
        org.apache.dubbo.common.bytecode.I2 w;
        try {
            w = ((org.apache.dubbo.common.bytecode.I2) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        if ($2.equals("float")) {
            return ($w) w.getFloat();
        }
        if ($2.equals("name")) {
            return ($w) w.getName();
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" field or setter method in class org.apache.dubbo.common.bytecode.I2.");
    }

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        org.apache.dubbo.common.bytecode.I2 w;
        try {
            w = ((org.apache.dubbo.common.bytecode.I2) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("getFloat".equals($2) && $3.length == 0) {
                return ($w) w.getFloat();
            }
            if ("setName".equals($2) && $3.length == 1) {
                w.setName((java.lang.String) $4[0]);
                return null;
            }
            if ("setFloat".equals($2) && $3.length == 1) {
                w.setFloat(((Number) $4[0]).floatValue());
                return null;
            }
            if ("hello".equals($2) && $3.length == 1) {
                w.hello((java.lang.String) $4[0]);
                return null;
            }
            if ("showInt".equals($2) && $3.length == 1) {
                return ($w) w.showInt(((Number) $4[0]).intValue());
            }
            if ("getName".equals($2) && $3.length == 0) {
                return ($w) w.getName();
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class org.apache.dubbo.common.bytecode.I2.");
    }
}

其中方法中的參數用$1,$2這種形式表示,猜測在javassist中會進行處理。
我們主要看invokeMethod,邏輯相對還是很明了的,通過方法名和參數個數判斷應該調用哪個方法。
到這裏,Invoker對象就創建完成了,接下來就進入到服務導出的部分。

Protocol.export

@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

這個方法帶有Adaptive註解,是一個自適應方法,自適應擴展類,我們之前分析過,通過入參獲取URL,通過URL獲取指定key的值,用這個獲取到的值作為擴展名加載擴展類,然後調用這個擴展類的方法。
但是export方法上的註解並沒有給出key,回想一下生成自適應擴展類代碼的細節,當Adaptive註解未指定key時,將接口名轉換為key,Protocol會被轉換為protocol,而對於key為protocol的情況會直接調用URL.getProtocol方法獲取協議類型作為擴展名。
在loadRegistries方法中加載註冊url時,已經將url的protocol屬性設為registry,也就是說會使用org.apache.dubbo.registry.integration.RegistryProtocol來進行服務導出,接下來我們就來分析這個類。

RegistryProtocol.export

所以接下來我們就分析一下RegistryProtocol.export的導出過程

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 獲取註冊的url,並將protocol替換為相應的協議類型
    // 前面講本來的協議類型設置到registry參數中,而將protocol參數設置為registry,
    // 這樣做是為了在自適應擴展機制在查找擴展名時能夠根據擴展名是registry找到RegistryProtocol
    // 找到之後並且進入這個類的方法之後,自然需要再把協議類型設置回來
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    // 獲取服務提供者url,用於導出到本地
    // 註冊的url中的一個參數,即export參數的值
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    // 獲取訂閱URL,用於
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    // 創建監聽器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    // 導出服務到本地
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    // 獲取用於發送到註冊中心的提供者url
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    // 想服務提供者與消費者註冊表中註冊服務
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // 向註冊中心註冊
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

我們略過一些不太重要的,這個方法主要就做了兩件事:

  • 對提供者url和註冊url進行處理
  • 將服務導出到本地
  • 向註冊中心發送服務提供者信息

doLocalExport

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    // 用於緩存的key,即提供者url
    String key = getCacheKey(originInvoker);

    // 如果服務在緩存中不存在,則需要進行導出
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
        // protocol成員變量在加載擴展類的時候會進行註入,通過SPI或spring容器查找到對應的
        // 通過SPI註入時會註入自適應擴展類,通過傳入的url動態決定使用哪個Protocol
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
    });
}

就像註釋中說的,通過自適應機制,根據運行時傳入的Invoker中的url動態決定使用哪個Protocol,以常用的dubbo協議為例,對應的實現類是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    // 服務的key組成:serviceGroup/serviceName:serviceVersion:port
    String key = serviceKey(url);
    // 創建一個DubboExporter。用於封裝一些引用
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    // 本地存根導出事件分發服務
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    // 是否是回調服務
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // 開啟服務
    openServer(url);
    optimizeSerialization(url);

    return exporter;
}

主要的邏輯在openServer中

openServer

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    // 客戶端也能夠導出服務,不過客戶端導出的服務只是給服務端調用的
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        // 雙重檢查鎖
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

這個方法的主要作用就是緩存的雙重檢查鎖,創建的服務的代碼在createServer中

createServer

終於到正題了

private ExchangeServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            // 服務端關閉時發送只讀事件
            .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            // 設置心跳間隔
            .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))
            // 設置編碼器
            .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
            .build();
    // 傳輸協議,默認是netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        // 綁定端口,開啟服務
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    // 客戶端傳輸協議
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return server;
}
  • 設置一些參數,如服務端關閉時發送只讀事件,心跳間隔,編解碼器等
  • 綁定端口,啟動服務

Exchangers.bind

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    // 編解碼器設為exchange
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).bind(url, handler);
}

默認的Exchanger是HeaderExchanger,

HeaderExchanger.bind

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

服務啟動邏輯在Transporters.bind中

Transporters.bind

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().bind(url, handler);
}

getTransporter方法返回的是自適應擴展類,會根據url決定使用哪個擴展類。

Transporter

@SPI("netty")
public interface Transporter {

    /**
     * Bind a server.
     *
     * @param url     server url
     * @param handler
     * @return server
     * @throws RemotingException
     * @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
     */
    // 依次根據server和transporter參數值決定擴展名
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * Connect to a server.
     *
     * @param url     server url
     * @param handler
     * @return client
     * @throws RemotingException
     * @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

server參數的默認值是netty,所以我們分析一下NettyTransporter

NettyTransporter.bind

@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

分析NettyServer構造器

NettyServer

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();

    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = Constants.ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    // 空閑線程超時時間,毫秒
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

主要邏輯:設置參數,然後打開服務。doOpen方法由子類實現。

NettyServer.doOpen

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    // boss線程池,這裏使用了newCachedThreadPool,如果需要就會創建新的線程,
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    // worker線程池
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    // 最大32核
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    // netty啟動類
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setOption("child.tcpNoDelay", true);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

這裏主要涉及到netty api的使用。設置boss線程池和worker線程池,然後通過啟動類ServerBootstrap綁定指定的host和port上,開始監聽端口。
服務啟動到這裏就告一段落,netty的部分就不再展開,不屬於dubbo框架的內容。

我只想說,這代碼也太特麽的深了。最初看到dubbo的架構圖,就是那張十層的圖時,我不太理解 ,看完這些代碼我才明白,dubbo為什麽能把層分得那麽細,那麽清晰。
代碼中基本上能抽象成接口的都抽象出來,擴展性是大大增強了,但是要想弄明白框架的整體架構就得多花點時間消化消化了。

另外,差點忘了一個重要內容,那就是netty的事件處理器,其實通過前面的層層調用我們可以發現,處理器類最開始在DubboProtocol被創建,沿著調用鏈一直傳遞到netty api。

DubboProtocol.requestHandler

這裏面一個重要的方法就是reply方法,這個方法的主要內容就是檢查參數類型,檢查方法存不存在,然後調用原始的Invoker。
實際上從socket接收到字節數組怎麽被解析為Invocation,這中間還有很長的調用鏈,通過代理模式進行層層封裝,這塊邏輯還不太懂,留著以後慢慢研究。

接下來,我們回過頭再來分析註冊的邏輯,也就是向註冊中心發送服務提供者信息。這部分的入口在

服務註冊

服務註冊部分的邏輯不是很復雜,主要還是通過url中的protocol參數值通過自適應機制找到對應的RegistryFactory類,然後獲取對應的Registry類。
以zookeeper為例,其實就是在zookeeper上創建對應的路徑。當然不僅僅是這麽簡單,其中還有重試,失敗回退等邏輯,這裏不再細說,目的就是知道大概的原理。

dubbo源碼閱讀之服務導出