1. 程式人生 > >Dubbo Provider啟動流程原始碼分析

Dubbo Provider啟動流程原始碼分析

簡單的官方demo:

provider的java程式碼:

public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
        context.start();

        System.in.read(); // 按任意鍵退出
    }

provider的spring配置:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"
>
<!-- 提供方應用資訊,用於計算依賴關係 --> <dubbo:application name="demo-provider"/> <!-- 使用multicast廣播註冊中心暴露服務地址 --> <dubbo:registry address="multicast://224.5.6.7:1234"/> <!-- 用dubbo協議在20880埠暴露服務 --> <dubbo:protocol name="dubbo" port="20880"/> <!-- 和本地bean一樣實現服務 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- 宣告需要暴露的服務介面 --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> </beans>

通過配置檔案,以及spring的mvc初始化流程,我們可以假設下服務啟動流程:
1. spring配置檔案的載入以及初始化
2. bean的單例生成
3. dubbo管理所有服務實現單例物件
4. 向配置中心註冊服務資訊

spring自定義標籤
dubbo標籤初始化實現了Spring提供的NamespaceHandler介面,所以下面先看看DubboNamespaceHandler類:

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        // DubboBeanDefinitionParser定義瞭如何解析dubbo節點資訊
        // DubboBeanDefinitionParser的第一個引數是beanclass

        // 應用相關配置
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        // 模組相關配置
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        // 註冊中心相關配置
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        // 
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        // 服務提供者配置
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        // 服務消費者配置
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        // 網路協議相關配置
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        // 服務bean配置
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        // 
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        //
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

在DubboBeanDefinitionParser中的parse中,解析設定了大部分配置資訊以及服務資訊。
我們可以關注下其中beanclass的原始碼,因為這章主要分析的是provider,這裡從provider進行分析:
首先是beanName叫做ServiceBean的bean例項。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
    ...
    // 初始化bean的時候執行
    public void afterPropertiesSet() throws Exception {
        ...// 初始化各種配置
        // 釋出服務
        if (!isDelay()) {
            export();
        }
    }
}

釋出程式碼:

    public synchronized void export() {
        ...
        // 延遲匯出
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

匯出程式碼:

   protected synchronized void doExport() {
        ...
        doExportUrls();
    }
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    // method => invoker => exporter
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        ...
        // 匯出服務
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }

        // 註冊中心可以是zk,consul等
        // 註冊中心host
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        // 註冊中心port
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        // 獲取暴露範圍配置
        String scope = url.getParameter(Constants.SCOPE_KEY);
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            // 如果暴露本地服務
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // 如果暴露遠端服務,走服務發現流程
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        // 註冊地址
                        URL monitorUrl = loadMonitor(registryURL);
                        // 動態代理,將class+method包裝位invoker,ref是服務的具體例項物件obj,invoker是個可執行物件
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        // invoker=>exporter,最終在服務端儲存下來的是exporter,對服務的暴露和引用都是通過這個物件實現的,而這個物件的實現由協議決定
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

invoker 生成,動態代理的過程

定義程式碼:

ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

@SPI("javassist") //使用javassist位元組碼技術生成物件
public interface ProxyFactory {

    /**
     * create proxy.
     * 定義生成代理物件的方法
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create invoker.
     * getProxy呼叫的引數生成
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

// jdk 動態代理
public class JdkProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

}

// javassist動態代理
public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    private final T proxy; //在proxyFactory.getInvoker的時候被設定,即

    private final Class<T> type;

    private final URL url;

    public Result invoke(Invocation invocation) throws RpcException {
        try {
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    // 呼叫的是jdkproxyfactory和javassistproxyfactory定義的方法
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

}

實際函式呼叫:

// 泛型呼叫
public class ExtensionLoader<T> {
    // 快取
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    // class->value
    private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();

    // 獲取loader,優先快取
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    // 每個class綁定了一個value,獲取這個value,優先快取
    public T getAdaptiveExtension() {
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                    }
                }
            }
        }
        return (T) instance;
    }
}

exporter 的生成

抽象類

public abstract class AbstractProtocol implements Protocol {

    // 一個協議對應著多個exporter
    protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

    // 對應著一堆invoker
    protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
}

對應的實現類
以thrift作為交換資料協議為例

public class ThriftProtocol extends AbstractProtocol {

    // thrift port
    public static final int DEFAULT_PORT = 40880;

    // 對應的資料交換方
    // ip:port -> ExchangeServer
    private final ConcurrentMap<String, ExchangeServer> serverMap =
            new ConcurrentHashMap<String, ExchangeServer>();

    // 服務釋出的函式呼叫
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 只能使用 thrift codec
        URL url = invoker.getUrl().addParameter(Constants.CODEC_KEY, ThriftCodec.NAME);
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一個只有server可以呼叫的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer && !serverMap.containsKey(key)) {
            serverMap.put(key, getServer(url));
        }
        // export service.
        key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 快取起來
        exporterMap.put(key, exporter);

        return exporter;
    }

    public void destroy() {
        // 銷燬invoker
        super.destroy();
        // 移除消費ip,關閉server
        for (String key : new ArrayList<String>(serverMap.keySet())) {
            ExchangeServer server = serverMap.remove(key);
            if (server != null) {
                server.close(getServerShutdownTimeout());
            } // ~ end of if ( server != null )
        } // ~ end of loop serverMap
    } // ~ end of method destroy

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ThriftInvoker<T> invoker = new ThriftInvoker<T>(type, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
}