dubbo系列四、dubbo啟動過程源碼解析
一、代碼準備
1、示例代碼
參考dubbo系列二、dubbo+zookeeper+dubboadmin分布式服務框架搭建(windows平臺)
2、簡單了解下spring自定義標簽
https://www.jianshu.com/p/16b72c10fca8
Spring自定義標簽總共可以分為以下幾個步驟
定義Bean 標簽解析生成接收配置的POJO。
定義schema文件,定義自定義標簽的attr屬性
定義解析類parser,遇到自定義標簽如何解析。
定義命名空間處理類namespaceSupport,遇到自定義的命名標簽,能夠路由到對應的解析類。
聲明schema,寫入spring.schema文件中
聲明自定義標簽的命名處理類namespaceHandler,寫入spring.handlers文件中
例如dubbo標簽:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> <!--dubbo應用程序命名--> <dubbo:application name="dubbo-demo-provider"/> <!--dubbo註冊地址--> <dubbo:registry address="zookeeper://192.168.1.100:2181"/> <!--dubbo協議地址--> <dubbo:protocol name="dubbo" port="20880"/> <!--接口聲明--> <dubbo:service interface="com.dubbo.demo.api.DemoRpcService" ref="demoRpcService"/> <bean id="demoRpcService" class="com.dubbo.demo.DemoRpcServiceImpl"/> </beans>
3、官網說明
官網:https://dubbo.incubator.apache.org/zh-cn/docs/dev/implementation.html
初始化過程細節
解析服務
基於 dubbo.jar 內的 META-INF/spring.handlers
配置,Spring 在遇到 dubbo 名稱空間時,會回調 DubboNamespaceHandler
。
所有 dubbo 的標簽,都統一用 DubboBeanDefinitionParser
進行解析,基於一對一屬性映射,將 XML 標簽解析為 Bean 對象。
在 ServiceConfig.export()
或 ReferenceConfig.get()
初始化時,將 Bean 對象轉換 URL 格式,所有 Bean 屬性轉成 URL 的參數。
然後將 URL 傳給 協議擴展點,基於擴展點的 擴展點自適應機制,根據 URL 的協議頭,進行不同協議的服務暴露或引用。
二、dubbo標簽解析
1、啟動服務,斷點跟蹤
public static void main(String[] args) throws IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:dubbo-provider.xml"); context.start(); // 阻塞當前進程,否則程序會直接停止 System.in.read(); }
spring啟動過程中,隨著Spring在初始化過程中,碰到dubbo命名的標簽,如(<dubbo:service>
,<dubbo:registry>
)等標簽,會由DubboNamespaceHandler
類處理,具體原理見鏈接Spring自定義標簽
DubboNamespaceHandler類源碼:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package com.alibaba.dubbo.config.spring.schema; 。。。import org.springframework.beans.factory.xml.NamespaceHandlerSupport; public class DubboNamespaceHandler extends NamespaceHandlerSupport { public DubboNamespaceHandler() { } public void init() {
// application標簽解析 <dubbo:application name="dubbo-demo-provider"/> this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); // module標簽解析
this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); // module標簽解析
this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
// <dubbo:protocol name="dubbo" port="20880"/>
this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
// service標簽
// <dubbo:service interface="com.dubbo.demo.api.DemoRpcService" ref="demoRpcService"/>
// <bean id="demoRpcService" class="com.dubbo.demo.DemoRpcServiceImpl"/>
// </beans>
this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); this.registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } static { Version.checkDuplicate(DubboNamespaceHandler.class); } }
遇到不同的標簽,會由不同的Parser
處理,這裏重點看服務發布,這行代碼:
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
也就是說,當Spring容器處理完<dubbo:service>
標簽後,會在Spring容器中生成一個ServiceBean
,服務的發布也會在ServiceBean
中完成。不妨看一下ServiceBean
的定義:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { }
2、啟動入口類
ServiceBean 同時也是service標簽解析之後的bean之一,繼承ServiceConfig
該Bean實現了很多接口,關於InitializingBean
,DisposableBean
,ApplicationContextAware
,BeanNameAware
,這些接口的使用介紹如下鏈接:
- InitializingBean&DisposableBean
- BeanNameAware& ApplicationContextAware
InitializingBean
的afterPropertiesSet
方法,在Spring容器加載完成,會接收到事件ContextRefreshedEvent
,調用ApplicationListener
的onApplicationEvent
方法。在
afterPropertiesSet
中,和onApplicationEvent
中,會調用export()
,在export()
中,會暴露dubbo服務,具體區別在於是否配置了delay
屬性,是否延遲暴露,如果delay
不為null
,或者不為-1
時,會在afterPropertiesSet
中調用export()
暴露dubbo服務,如果為null
,或者為-1
時,會在Spring容器初始化完成,接收到ContextRefreshedEvent
事件,調用onApplicationEvent
,暴露dubbo服務。
部分ServiceBean的代碼如下:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { //Spring容器初始化完成,調用 public void onApplicationEvent(ContextRefreshedEvent event) { if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } //暴露服務 export(); } } }
在export()
,暴露服務過程中,如果發現有delay屬性
,則延遲delay時間
,暴露服務,如果沒有,則直接暴露服務。
public synchronized void export() { //忽略若幹行代碼 if (delay != null && delay > 0) { //當delay不為null,且大於0時,延遲delay時間,暴露服務 delayExportExecutor.schedule(new Runnable() { public void run() { //暴露服務 doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { //直接暴露服務 doExport(); } }
而在doExport()
中,驗證參數,按照不同的Protocol
,比如(dubbo
,injvm
)暴露服務,在不同的zookeeper
集群節點上註冊自己的服務。
protected synchronized void doExport() { //忽略10000行代碼 doExportUrls(); //忽略10000行代碼 } private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { //按照不同的Protocal暴露服務 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
// 獲取註冊中心地址 protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); // protected List<RegistryConfig> registries;解析後的RegistryConfig中獲取地址列表 if (registries != null && !registries.isEmpty()) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } // 如果地址為空,再次從配置文件中取 String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } // 如果地址不為空,拼接協議類型、版本信息 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); appendParameters(map, application); appendParameters(map, config); map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 默認dubbo協議 if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 如果同一個標簽配置多個地址,則拆分 List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); url = url.setProtocol(Constants.REGISTRY_PROTOCOL); if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } // 返回格式化後的註冊地址 return registryList; }
3、服務暴露過程
這裏以dubbo
協議為例,看一下發布的過程,在發布過程中,會用一個變量map
保存URL的所有變量和value值,然後調用代理工程proxyFactory,獲取代理類,然後將invoker轉換成exporter,暴露服務,具體如下:
protocol://host:port/path?key=value&key=value
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //如果協議類型為null,則默認為dubbo協議 String name = protocolConfig.getName(); if (name == null || name.length() == 0) { name = "dubbo"; } //map是保存url中key-Value的值 Map<String, String> map = new HashMap<String, String>(); //URL中的side屬性,有兩個值,一個provider,一個consumer,暴露服務的時候為provider map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); //dubbo的版本號 url中的dubbo map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); //url中的timestamp map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //url中的pid if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } //從其他參數中獲取參數 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); //忽略若幹代碼 if (ProtocolUtils.isGeneric(generic)) { map.put("generic", generic); map.put("methods", Constants.ANY_VALUE); } else { //url中的revesion字段 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } //拼接URL中的methods String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } //token 臨牌校驗 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put("token", UUID.randomUUID().toString()); } else { map.put("token", token); } } //injvm協議 if ("injvm".equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } //獲取上下文路徑 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } //獲取主機名 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); //獲取端口 Integer port = this.findConfigedPorts(protocolConfig, name, map); //組裝URL,將map中的協議,版本號信息等 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); //如果url使用的協議存在擴展,調用對應的擴展來修改原url if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); //配置為none不暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { //如果不是remote,則暴露本地服務 exportLocal(url); } //如果配置不是local則暴露為遠程服務 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { //忽略日誌 如果註冊中心地址不為null if (registryURLs != null && registryURLs.size() > 0) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); //忽略不相幹的代碼 // 通過代理工廠將ref對象轉化成invoker對象 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //代理invoker對象 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 暴露服務 Exporter<?> exporter = protocol.export(wrapperInvoker); //一個服務可能有多個提供者,保存在一起 exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } } this.urls.add(url); }
將doExportUrlsFor1Protocol
代碼再簡化一下,如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { Map map=builderUrl(); // 通過代理工廠將ref對象轉化成invoker對象 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); //代理invoker對象 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 暴露服務 Exporter<?> exporter = protocol.export(wrapperInvoker); //一個服務可能有多個提供者,保存在一起 exporters.add(exporter); }
拼接後的url:
dubbo://192.168.1.100:20880/com.dubbo.demo.api.DemoRpcService?anyhost=true&application=dubbo-demo-provider&bind.ip=192.168.1.100&bi
nd.port=20880&dubbo=2.6.0&generic=false&interface=com.dubbo.demo.api.DemoRpcService&methods=getUserName&pid=18740&side=provider×tamp=1538311737815
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
而在上面proxyFactory.getInvoker
中,很顯然是獲取到的是接口的代理類。
而在 protocol.export(wrapperInvoker)
中,將服務暴露出去。
代碼如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); //忽略若幹代碼 //打開服務 openServer(url); optimizeSerialization(url); return exporter; }
private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); //是否server端 if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { //如果服務不存在,創建服務 serverMap.put(key, createServer(url)); } else { server.reset(url); } } }
private ExchangeServer createServer(URL url) { //忽略若幹代碼 ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } return server; }
而在headerExchanger
的bind
中,調用了Transporters.bind()
,一直調用到NettyServer
,綁定了端口和鏈接。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } //Transporters.bind public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { //忽略很多代碼 return getTransporter().bind(url, handler); } //上段代碼的getTransporter() public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
而在Transporter的定義中看到下面代碼:
@SPI("netty") public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; }
所以這裏調用的是NettyTransporter
,這裏啟動了一個新的NettyServer
。
public class NettyTransporter implements Transporter { public static final String NAME = "netty4"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
在NettyServer的構造方法中,調用了父類的構造方法,調用了doOpen()
方法指定了端口
public class NettyServer extends AbstractServer implements Server { public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } } public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); //忽略很多代碼 doOpen(); //忽略很多代碼 } @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
到這裏dubbo服務就啟動了,但是有一點還是有疑惑,那麽,dubbo服務什麽時候註冊到註冊中心的?帶著疑惑看了一下官方文檔。
也就是說,在調用DubboProtocol
暴露服務之前,回去調用攔截器,當發現是regiester
,則去註冊中心註冊服務。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //如果是registerProtocol,則調用RegisterProtocol.export方法 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); }
而在RegisterProtocol.export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //根據SPI機制獲取具體的Registry實例,這裏獲取到的是ZookeeperRegistry final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); boolean register = registedProviderUrl.getParameter("register", true); if (register) { //在這裏註冊服務 register(registryUrl, registedProviderUrl); //忽略很多代碼 } //忽略很多代碼 } public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
而ZookeeperRegistry
繼承父類FailbackRegistry
,在父類的register
方法中,調用了 doRegister
,doRegister
中,創建了ZK節點,這樣就將自己的服務暴露到註冊中心zk上:
@Override public void register(URL url) { //忽略很多代碼 doRegister(url); //忽略很多代碼 } protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
這樣,整個dubbo服務就啟動了。再回頭看官方文檔上的說明,就很清楚了。
dubbo系列四、dubbo啟動過程源碼解析