1. 程式人生 > >Dubbo原始碼解析之服務釋出與註冊

Dubbo原始碼解析之服務釋出與註冊

準備

dubbo版本:2.5.4

Spring自定義擴充套件

dubbo 是基於 spring 配置來實現服務釋出,並基於 spring 的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring 中提供了 NamespaceHandlerBeanDefinitionParser 兩個類用於實現擴充套件

  • NamespaceHandler :註冊一系列 BeanDefinitionParser ,利用他們進行解析

  • BeanDefinitionParser :用於解析每個 element 內容

  • Spring 預設會載入jar包下的 META-INF/spring.handlers

    檔案尋找對應的 NamespaceHandlerDubbo-config 模組下 dubbo-config-spring )。

dubbo-publish-and-register-01.png

檢視 DubboNamespaceHandler

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

	static {
		Version.checkDuplicate(DubboNamespaceHandler.class);
	}

	public void init() {
	    registerBeanDefinitionParser("application"
, new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser
("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }

通過 DubboBeanDefinitionParserdubbo 自定義標籤解析為相應 Bean 物件。同時為了 spring 啟動時,相應地啟動 provider 釋出服務註冊服務的過程,以及讓客戶端在啟動時自動訂閱發現服務,添加了兩個 beanServiceBeanReferenceBean ,分別繼承 ServiceConfigReferenceConfig

ServiceBean 實現了InitializingBeanDisposableBeanApplicationContextAwareApplicationListenerBeanNameAware

  • InitializingBeanbean 初始化過程中會呼叫 afterPropertiesSet 方法
  • DisposableBeanbean 銷燬時,spring 容器會自動執行 destory 方法,比如釋放資源
  • ApplicationContextAware:當 spring 容器初始化時會自動將 ApplicationContext 注入進來
  • ApplicationListenerApplicationEvent 事件監聽,spring 容器啟動後會發一個事件通知
  • BeanNameAware:獲得自身初始化時 beanid 屬性

服務釋出過程

先上時序圖,幫助跟蹤過程走向。
Dubbo釋出服務

serviceBean 是服務釋出入口,在 bean 初始化時會執行其 afterPropertiesSet 方法,在該方法最後會執行 export 方法進行釋出

// ServiceConfig
public synchronized void export() {
    if (provider != null) {
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && ! export.booleanValue()) {
        return;
    }
    if (delay != null && delay > 0) {
        Thread thread = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(delay);
                } catch (Throwable e) {
                }
                doExport();
            }
        });
        thread.setDaemon(true);
        thread.setName("DelayExportServiceThread");
        thread.start();
    } else {
        doExport();
    }
}

接著呼叫 doExportUrls 方法

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);// 是否獲得註冊中心配置
    for (ProtocolConfig protocolConfig : protocols) {
        // 執行協議釋出
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

doExportUrlsFor1Protocol

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    /**
     * 只保留核心程式碼
     */
    String scope = url.getParameter(Constants.SCOPE_KEY);
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            // registryURLs
            // registrys://...
            if (registryURLs != null && registryURLs.size() > 0
                    && url.getParameter("register", true)) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    // 通過proxyFactory獲取Invoker物件
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  					// 註冊服務 
                    // protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                    // 這裡的protocol是Protocol$Adpative介面卡物件
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                Exporter<?> exporter = protocol.export(invoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

會呼叫動態生成的介面卡類Protocol$Adpative

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        // 獲取指定名稱的擴充套件點
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

getExtension

public T getExtension(String name) {
    if (name != null && name.length() != 0) {
        if ("true".equals(name)) {
            return this.getDefaultExtension();
        } else {
            Holder<Object> holder = (Holder)this.cachedInstances.get(name);
            if (holder == null) {
                this.cachedInstances.putIfAbsent(name, new Holder());
                holder = (Holder)this.cachedInstances.get(name);
            }

            Object instance = holder.get();
            if (instance == null) {
                synchronized(holder) {
                    instance = holder.get();
                    if (instance == null) {
                        // 建立指定名稱的擴張點>>
                        instance = this.createExtension(name);
                        holder.set(instance);
                    }
                }
            }

            return instance;
        }
    } else {
        throw new IllegalArgumentException("Extension name == null");
    }
}

createExtension

private T createExtension(String name) {
    Class<?> clazz = (Class)this.getExtensionClasses().get(name);
    if (clazz == null) {
        throw this.findException(name);
    } else {
        try {
            T instance = EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = EXTENSION_INSTANCES.get(clazz);
            }
			// 獲取例項進行依賴注入
            this.injectExtension(instance);
            // cachedWrapperClasses在loadFile中設定
            Set<Class<?>> wrapperClasses = this.cachedWrapperClasses;
            Class wrapperClass;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                // 獲取帶有指定型別引數的構造方法建立例項,進行依賴注入
                for(Iterator var5 = wrapperClasses.iterator(); var5.hasNext(); instance = this.injectExtension(wrapperClass.getConstructor(this.type).newInstance(instance))) {
                    wrapperClass = (Class)var5.next();
                }
            }

            return instance;
        } catch (Throwable var7) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " + this.type + ")  could not be instantiated: " + var7.getMessage(), var7);
        }
    }
}

上面方法主要步驟

  • 根據name獲取對應class

  • 根據class建立例項

  • 獲取例項進行依賴注入

  • 獲取帶有指定型別引數的構造方法建立例項,進行依賴注入

以Protocol為例:

在dubbo-rpc-api的resources路徑下,可以找到com.alibaba.dubbo.rcp.Protocol檔案中有存在filter/listener

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper

而ProtocolFilterWrapper、ProtocolListenerWrapper都存在以Protocol為引數的構造方法

public ProtocolFilterWrapper(Protocol protocol){}
public ProtocolListenerWrapper(Protocol protocol){}

遍歷cachedWrapperClass對DubboProtocol 進行包裝,會通過ProtocolFilterWrapper、ProtocolListenerWrapper進行包裝 。

因為在方法doExportUrlsFor1Protocol中傳入的是registryURL,因此Protocol$Adpative的export方法最終會呼叫RegistryProtocol的export方法

RegistryProtocol

public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
    // 執行本地釋出>>
    final RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
    final Registry registry = this.getRegistry(originInvoker);
    final URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
    registry.register(registedProviderUrl);
    final URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
    final RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl);
    this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }

        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable var4) {
                RegistryProtocol.logger.warn(var4.getMessage(), var4);
            }

            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable var3) {
                RegistryProtocol.logger.warn(var3.getMessage(