1. 程式人生 > >Dubbo(三):深入理解Dubbo原始碼之如何將服務釋出到註冊中心

Dubbo(三):深入理解Dubbo原始碼之如何將服務釋出到註冊中心

一、前言

  前面有說到Dubbo的服務發現機制,也就是SPI,那既然Dubbo內部實現了更加強大的服務發現機制,現在我們就來一起看看Dubbo在發現服務後需要做什麼才能將服務註冊到註冊中心中。

二、Dubbo服務註冊簡介

  首先需要明白的是Dubbo是依賴於Spring容器的(至於為什麼在上篇部落格中有介紹),Dubbo服務註冊過程也是始於Spring容器釋出重新整理事件。而後Dubbo在接收到事件後,就會進行服務註冊,整個邏輯大致分為三個部分:

  1、檢查引數,組裝URL:服務消費方是通過URL拿到服務提供者的,所以我們需要為服務提供者配置好對應的URL。

  2、匯出服務到本地和遠端:這裡的本地指的是JVM,遠端指的是實現invoke,使得服務消費方能夠通過invoke呼叫到服務。

  3、向註冊中心註冊服務:能夠讓服務消費方知道服務提供方提供了那個服務。

三、接收Spring容器重新整理事件

  在簡介中我們提到Dubbo服務註冊是始於Spring容器釋出重新整理事件,那麼Dubbo是如何接收該事件的呢?

  在我們平常編寫provider的介面實現類時,都會打上@Service註解,從而這個標註這個類屬於ServiceBean。在ServiceBean中有這樣一個方法onApplicationEvent。該方法會在收到 Spring 上下文重新整理事件後執行服務註冊操作

 1 public void onApplicationEvent(ContextRefreshedEvent event) {
 2         //是否已匯出 && 是不是已被取消匯出
 3         if (!this.isExported() && !this.isUnexported()) {
 4             if (logger.isInfoEnabled()) {
 5                 logger.info("The service ready on spring started. service: " + this.getInterface());
 6             }
 7 
 8             this.export();
 9         }
10 
11     }
View Code

  注意這裡是2.7.3的Dubbo,接收Spring上下文重新整理事件已經不需要設定延遲匯出,而是在匯出的時候檢查配置再決定是否需要延時,所以只有兩個判斷。而在2.6.x版本的Dubbo存在著isDelay的判斷。這個是判斷服務是否延時匯出。這裡說個題外話2.6.x的版本是com.alibaba.dubbo的,而2.7.x是org.apache.dubbo的,而2.7.0也開始代表dubbo從Apache裡畢業了。

  在這裡就是Dubbo服務匯出到註冊中心過程的起點。需要我們在服務介面實現類上打上@Service。ServiceBean是Dubbo與Spring 框架進行整合的關鍵,可以看做是兩個框架之間的橋樑。具有同樣作用的類還有ReferenceBean。

四、檢查配置引數以及URL裝配

  1、檢查配置

    在這一階段Dubbo需要檢查使用者的配置是否合理,或者為使用者補充預設配置。就是從重新整理事件開始,進入export()方法,原始碼解析如下:

 1 public void export() {
 2         super.export();
 3         this.publishExportEvent();
 4     }
 5 
 6 //進入到ServiceConfig.class中的export。
 7 
 8 public synchronized void export() {
 9         //檢查並且更新配置
10         this.checkAndUpdateSubConfigs();
11         //是否需要匯出
12         if (this.shouldExport()) {
13             //是否需要延時
14             if (this.shouldDelay()) {
15                 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, (long)this.getDelay(), TimeUnit.MILLISECONDS);
16             } else {
17                 //立刻匯出
18                 this.doExport();
19             }
20 
21         }
22     }
23 
24 //進入checkAndUpdateSubConfigs。
25 
26 public void checkAndUpdateSubConfigs() {
27         //檢查配置項包括provider是否存在,匯出埠是否可用,註冊中心是否可以連線等等
28         this.completeCompoundConfigs();
29         this.startConfigCenter();
30         this.checkDefault();
31         this.checkProtocol();
32         this.checkApplication();
33         if (!this.isOnlyInJvm()) {
34             this.checkRegistry();
35         }
36         //檢查介面內部方法是否不為空
37         this.refresh();
38         this.checkMetadataReport();
39         if (StringUtils.isEmpty(this.interfaceName)) {
40             throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
41         } else {
42             if (this.ref instanceof GenericService) {
43                 this.interfaceClass = GenericService.class;
44                 if (StringUtils.isEmpty(this.generic)) {
45                     this.generic = Boolean.TRUE.toString();
46                 }
47             } else {
48                 try {
49                     this.interfaceClass = Class.forName(this.interfaceName, true, Thread.currentThread().getContextClassLoader());
50                 } catch (ClassNotFoundException var5) {
51                     throw new IllegalStateException(var5.getMessage(), var5);
52                 }
53 
54                 this.checkInterfaceAndMethods(this.interfaceClass, this.methods);
55                 this.checkRef();
56                 this.generic = Boolean.FALSE.toString();
57             }
58             //是否需要匯出服務或者只是在本地執行測試
59             Class stubClass;
60             if (this.local != null) {
61                 if ("true".equals(this.local)) {
62                     this.local = this.interfaceName + "Local";
63                 }
64 
65                 try {
66                     stubClass = ClassUtils.forNameWithThreadContextClassLoader(this.local);
67                 } catch (ClassNotFoundException var4) {
68                     throw new IllegalStateException(var4.getMessage(), var4);
69                 }
70 
71                 if (!this.interfaceClass.isAssignableFrom(stubClass)) {
72                     throw new IllegalStateException("The local implementation class " + stubClass.getName() + " not implement interface " + this.interfaceName);
73                 }
74             }
75 
76             if (this.stub != null) {
77                 if ("true".equals(this.stub)) {
78                     this.stub = this.interfaceName + "Stub";
79                 }
80 
81                 try {
82                     stubClass = ClassUtils.forNameWithThreadContextClassLoader(this.stub);
83                 } catch (ClassNotFoundException var3) {
84                     throw new IllegalStateException(var3.getMessage(), var3);
85                 }
86 
87                 if (!this.interfaceClass.isAssignableFrom(stubClass)) {
88                     throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + this.interfaceName);
89                 }
90             }
91 
92             this.checkStubAndLocal(this.interfaceClass);
93             this.checkMock(this.interfaceClass);
94         }
95     }
View Code

    上面的原始碼分析可看出。export方法主要檢查的配置項有@Service標籤的類是否屬性合法。服務提供者是否存在,是否有對應的Application啟動,埠是否能連線,是否有對應的註冊中心等等一些配置,在檢查完這些配置後Dubbo會識別我們此次啟動服務是想在本地啟動進行一些除錯,還是將服務暴露給別人。不想暴露出去可以進行配置

1 <dubbo:provider export="false" />

  2、URL裝配

    在Dubbo中的URL一般包括以下欄位:protocol,host,port,path,parameters。在檢查配置後會進入到doExport中。

      protocol:就是URL最前面的欄位,表示的是協議,一般是:dubbo thrift http zk

      host.port:就是對應的IP地址和埠

      path:介面名稱

      parameters:引數鍵值對

 1 protected synchronized void doExport() {
 2         if (this.unexported) {
 3             throw new IllegalStateException("The service " + this.interfaceClass.getName() + " has already unexported!");
 4         } else if (!this.exported) {
 5             this.exported = true;
 6             if (StringUtils.isEmpty(this.path)) {
 7                 this.path = this.interfaceName;
 8             }
 9 
10             this.doExportUrls();
11         }
12     }
13 
14 //進入到doExportUrls
15 private void doExportUrls() {
16         //載入註冊中心連結
17         List<URL> registryURLs = this.loadRegistries(true);
18         //使用遍歷器遍歷protocols,並在每個協議下匯出服務
19         Iterator var2 = this.protocols.iterator();
20 
21         while(var2.hasNext()) {
22             ProtocolConfig protocolConfig = (ProtocolConfig)var2.next();
23             String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
24                 return p + "/" + this.path;
25             }).orElse(this.path), this.group, this.version);
26             ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass);
27             ApplicationModel.initProviderModel(pathKey, providerModel);
28             this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
29         }
30 
31     }
32 
33 //進入到載入註冊中心連結的方法
34 
35 protected List<URL> loadRegistries(boolean provider) {
36         List<URL> registryList = new ArrayList();
37         if (CollectionUtils.isNotEmpty(this.registries)) {
38             Iterator var3 = this.registries.iterator();
39             //迴圈的從註冊連結串列中拿取地址及配置
40             label47:
41             while(true) {
42                 RegistryConfig config;
43                 String address;
44                 do {
45                     if (!var3.hasNext()) {
46                         return registryList;
47                     }
48 
49                     config = (RegistryConfig)var3.next();
50                     address = config.getAddress();
51                     //address為空就預設為0.0.0.0
52                     if (StringUtils.isEmpty(address)) {
53                         address = "0.0.0.0";
54                     }
55                 } while("N/A".equalsIgnoreCase(address));
56 
57                 Map<String, String> map = new HashMap();
58                 // 新增 ApplicationConfig 中的欄位資訊到 map 中
59                 appendParameters(map, this.application);
60                 // 新增 RegistryConfig 欄位資訊到 map 中
61                 appendParameters(map, config);
62                 // 新增 path,protocol 等資訊到 map 中
63                 map.put("path", RegistryService.class.getName());
64                 appendRuntimeParameters(map);
65                 if (!map.containsKey("protocol")) {
66                     map.put("protocol", "dubbo");
67                 }
68                 // 解析得到 URL 列表,address 可能包含多個註冊中心 ip,
69                 // 因此解析得到的是一個 URL 列表
70                 List<URL> urls = UrlUtils.parseURLs(address, map);
71                 Iterator var8 = urls.iterator();
72 
73                 while(true) {
74                     URL url;
75                     do {
76                         if (!var8.hasNext()) {
77                             continue label47;
78                         }
79 
80                         url = (URL)var8.next();
81                         //// 將 URL 協議頭設定為 registry
82                         url = URLBuilder.from(url).addParameter("registry", url.getProtocol()).setProtocol("registry").build();
83                     // 通過判斷條件,決定是否新增 url 到 registryList 中,條件如下:
84                     // (服務提供者 && register = true 或 null) || (非服務提供者 && subscribe = true 或 null)
85                     } while((!provider || !url.getParameter("register", true)) && (provider || !url.getParameter("subscribe", true)));
86 
87                     //新增url到registryList中
88                     registryList.add(url);
89                 }
90             }
91         } else {
92             return registryList;
93         }
94     }
View Code

    loadRegistries方法主要包含如下的邏輯:

      1、構建引數對映集合,也就是 map

      2、構建註冊中心連結列表

      3、遍歷連結列表,並根據條件決定是否將其新增到 registryList 中

    實際上因為Dubbo現如今支援很多註冊中心,所以對於一些註冊中心的URL也要進行遍歷構建。這裡是生成註冊中心的URL。還未生成Dubbo服務的URL。比如說使用的是Zookeeper註冊中心,可能從loadRegistries中拿到的就是:

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.7.3&pid=1528&qos.port=22222&registry=zookeeper&timestamp=1530743640901

    這種型別的URL,表示這是一個註冊協議,現在可以根據這個URL定位到註冊中心去了。服務介面是RegistryService,registry的型別為zookeeper。可是我們還未生成Dubbo服務提供方的URL所以接著看下面程式碼

    然後進行到doExportUrlsFor1Protocol(裝配Dubbo服務的URL並且實行釋出)

  1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  2         //首先是將一些資訊,比如版本、時間戳、方法名以及各種配置物件的欄位資訊放入到 map 中
  3         //map 中的內容將作為 URL 的查詢字串。構建好 map 後,緊接著是獲取上下文路徑、主機名以及埠號等資訊。
  4        //最後將 map 和主機名等資料傳給 URL 構造方法建立 URL 物件。需要注意的是,這裡出現的 URL 並非 java.net.URL,而是 com.alibaba.dubbo.common.URL。
  5         String name = protocolConfig.getName();
  6         // 如果協議名為空,或空串,則將協議名變數設定為 dubbo
  7         if (StringUtils.isEmpty(name)) {
  8             name = "dubbo";
  9         }
 10 
 11         Map<String, String> map = new HashMap();
 12         // 新增 side、版本、時間戳以及程序號等資訊到 map 中
 13         map.put("side", "provider");
 14         appendRuntimeParameters(map);
 15         // 通過反射將物件的欄位資訊新增到 map 中
 16         appendParameters(map, this.metrics);
 17         appendParameters(map, this.application);
 18         appendParameters(map, this.module);
 19         appendParameters(map, this.provider);
 20         appendParameters(map, protocolConfig);
 21         appendParameters(map, this);
 22         String scope;
 23         Iterator metadataReportService;
 24         // methods 為 MethodConfig 集合,MethodConfig 中儲存了 <dubbo:method> 標籤的配置資訊
 25         if (CollectionUtils.isNotEmpty(this.methods)) {
 26             Iterator var5 = this.methods.iterator();
 27             //檢測 <dubbo:method> 標籤中的配置資訊,並將相關配置新增到 map 中
 28             label166:
 29             while(true) {
 30                 MethodConfig method;
 31                 List arguments;
 32                 do {
 33                     if (!var5.hasNext()) {
 34                         break label166;
 35                     }
 36 
 37                     method = (MethodConfig)var5.next();
 38                     appendParameters(map, method, method.getName());
 39                     String retryKey = method.getName() + ".retry";
 40                     if (map.containsKey(retryKey)) {
 41                         scope = (String)map.remove(retryKey);
 42                         if ("false".equals(scope)) {
 43                             map.put(method.getName() + ".retries", "0");
 44                         }
 45                     }
 46 
 47                     arguments = method.getArguments();
 48                 } while(!CollectionUtils.isNotEmpty(arguments));
 49 
 50                 metadataReportService = arguments.iterator();
 51 
 52                 while(true) {
 53                     ArgumentConfig argument;
 54                     Method[] methods;
 55                     do {
 56                         do {
 57                             while(true) {
 58                                 if (!metadataReportService.hasNext()) {
 59                                     continue label166;
 60                                 }
 61 
 62                                 argument = (ArgumentConfig)metadataReportService.next();
 63                                 if (argument.getType() != null && argument.getType().length() > 0) {
 64                                     methods = this.interfaceClass.getMethods();
 65                                     break;
 66                                 }
 67 
 68                                 if (argument.getIndex() == -1) {
 69                                     throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
 70                                 }
 71 
 72                                 appendParameters(map, argument, method.getName() + "." + argument.getIndex());
 73                             }
 74                         } while(methods == null);
 75                     } while(methods.length <= 0);
 76 
 77                     for(int i = 0; i < methods.length; ++i) {
 78                         String methodName = methods[i].getName();
 79                         if (methodName.equals(method.getName())) {
 80                             Class<?>[] argtypes = methods[i].getParameterTypes();
 81                             if (argument.getIndex() != -1) {
 82                                 if (!argtypes[argument.getIndex()].getName().equals(argument.getType())) {
 83                                     throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
 84                                 }
 85 
 86                                 appendParameters(map, argument, method.getName() + "." + argument.getIndex());
 87                             } else {
 88                                 for(int j = 0; j < argtypes.length; ++j) {
 89                                     Class<?> argclazz = argtypes[j];
 90                                     if (argclazz.getName().equals(argument.getType())) {
 91                                         appendParameters(map, argument, method.getName() + "." + j);
 92                                         if (argument.getIndex() != -1 && argument.getIndex() != j) {
 93                                             throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
 94                                         }
 95                                     }
 96                                 }
 97                             }
 98                         }
 99                     }
100                 }
101             }
102         }
103 
104         String host;
105          // 檢測 generic 是否為 "true",並根據檢測結果向 map 中新增不同的資訊
106         if (ProtocolUtils.isGeneric(this.generic)) {
107             map.put("generic", this.generic);
108             map.put("methods", "*");
109         } else {
110             host = Version.getVersion(this.interfaceClass, this.version);
111             if (host != null && host.length() > 0) {
112                 map.put("revision", host);
113             }
114              // 為介面生成包裹類 Wrapper,Wrapper 中包含了介面的詳細資訊,比如介面方法名陣列,欄位資訊等
115             String[] methods = Wrapper.getWrapper(this.interfaceClass).getMethodNames();
116             if (methods.length == 0) {
117                 logger.warn("No method found in service interface " + this.interfaceClass.getName());
118                 map.put("methods", "*");
119             } else {
120                 // 將逗號作為分隔符連線方法名,並將連線後的字串放入 map 中
121                 map.put("methods", StringUtils.join(new HashSet(Arrays.asList(methods)), ","));
122             }
123         }
124         // 新增 token 到 map 中
125         if (!ConfigUtils.isEmpty(this.token)) {
126             if (ConfigUtils.isDefault(this.token)) {
127                 map.put("token", UUID.randomUUID().toString());
128             } else {
129                 map.put("token", this.token);
130             }
131         }
132         //獲取host和port
133         host = this.findConfigedHosts(protocolConfig, registryURLs, map);
134         Integer port = this.findConfigedPorts(protocolConfig, name, map);
135         // 獲取上下文路徑並且組裝URL
136         URL url = new URL(name, host, port, (String)this.getContextPath(protocolConfig).map((p) -> {
137             return p + "/" + this.path;
138         }).orElse(this.path), map);
139         if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
140             // 載入 ConfiguratorFactory,並生成 Configurator 例項,然後通過例項配置 url,使用了前面提到的SPI機制
141             url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
142         }
143         //下面邏輯主要分三步
144         // 如果 scope = none,則什麼都不做
145         // scope != remote,匯出到本地
146         // scope != local,匯出到遠端
147         scope = url.getParameter("scope");
148         if (!"none".equalsIgnoreCase(scope)) {
149             if (!"remote".equalsIgnoreCase(scope)) {
150                 this.exportLocal(url);
151             }
152 
153             if (!"local".equalsIgnoreCase(scope)) {
154                 if (!this.isOnlyInJvm() && logger.isInfoEnabled()) {
155                     logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url);
156                 }
157 
158                 if (CollectionUtils.isNotEmpty(registryURLs)) {
159                     metadataReportService = registryURLs.iterator();
160 
161                     while(metadataReportService.hasNext()) {
162                         URL registryURL = (URL)metadataReportService.next();
163                         if (!"injvm".equalsIgnoreCase(url.getProtocol())) {
164                             url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
165                             URL monitorUrl = this.loadMonitor(registryURL);
166                             if (monitorUrl != null) {
167                                 url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
168                             }
169 
170                             if (logger.isInfoEnabled()) {
171                                 logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
172                             }
173 
174                             String proxy = url.getParameter("proxy");
175                             if (StringUtils.isNotEmpty(proxy)) {
176                                 registryURL = registryURL.addParameter("proxy", proxy);
177                             }
178                             // 為服務提供類(ref)生成 Invoker
179                             Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
180                             DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
181                             // 匯出服務,並生成 Exporter
182                             Exporter<?> exporter = protocol.export(wrapperInvoker);
183                             this.exporters.add(exporter);
184                         }
185                     }
186                 // 不存在註冊中心,僅匯出服務
187                 } else {
188                     Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url);
189                     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
190                     Exporter<?> exporter = protocol.export(wrapperInvoker);
191                     this.exporters.add(exporter);
192                 }
193 
194                 metadataReportService = null;
195                 MetadataReportService metadataReportService;
196                 if ((metadataReportService = this.getMetadataReportService()) != null) {
197                     metadataReportService.publishProvider(url);
198                 }
199             }
200         }
201 
202         this.urls.add(url);
203     }
View Code

    上面的原始碼前半段是進行URL裝配,這個URL就是Dubbo服務的URL,大致如下:

dubbo://192.168.1.6:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.1.6&bind.port=20880&dubbo=2.7.3&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider&timestamp=1530746052546

    這個URL表示它是一個dubbo協議(DubboProtocol),地址是當前伺服器的ip,埠是要暴露的服務的埠號,可以從dubbo:protocol配置,服務介面為dubbo:service配置釋出的介面。

    後半段主要是判斷scope變數來決定是否將服務匯出遠端或者本地,匯出到本地實際上很簡單隻需要生成Invoker。當匯出到遠端就需要新增監視器還要生成invoker。監視器能讓Dubbo定時檢視註冊中心掛了沒。會丟擲指定異常,而invoker使得服務消費方能夠遠端呼叫到服務。並且還會進行註冊到註冊中心下面我們接著來看看服務的釋出。因為Invoker比較重要在消費者和提供者中都有,所以這個後面會單獨拿出來進行探討。

五、服務釋出本地與遠端

  1、服務釋出到本地

1 private void exportLocal(URL url) {
2         //進行本地URL的構建
3         URL local = URLBuilder.from(url).setProtocol("injvm").setHost("127.0.0.1").setPort(0).build();
4         //根據本地的URL來實現對應的Invoker
5         Exporter<?> exporter = protocol.export(PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, local));
6         this.exporters.add(exporter);
7         logger.info("Export dubbo service " + this.interfaceClass.getName() + " to local registry url : " + local);
8     }
View Code

    可見釋出到本地是重新構建了protocol,injvm就是代表在本地的JVM裡,host與port都統一預設127.0.0.1:0。

  2、服務釋出到遠端

 1 public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
 2         //獲取註冊中心的URL,比如:zookeeper://127.0.0.1:2181/......
 3         URL registryUrl = this.getRegistryUrl(originInvoker);
 4         //獲取所有服務提供者的URL,比如:dubbo://192.168.1.6:20880/.......
 5         URL providerUrl = this.getProviderUrl(originInvoker);
 6         //獲取訂閱URL,比如:provider://192.168.1.6:20880/......
 7         URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl);
 8         //建立監聽器
 9         RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
10         //向訂閱中心推送監聽器
11         this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
12         providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
13         //匯出服務
14         RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl);
15         Registry registry = this.getRegistry(originInvoker);
16         //獲取已註冊的服務提供者的URL,比如dubbo://192.168.1.6:20880/.......
17         URL registeredProviderUrl = this.getRegisteredProviderUrl(providerUrl, registryUrl);
18         // 向服務提供者與消費者登錄檔中註冊服務提供者
19         ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
20         // 獲取 register 引數
21         boolean register = registeredProviderUrl.getParameter("register", true);
22         // 根據 register 的值決定是否註冊服務
23         if (register) {
24             this.register(registryUrl, registeredProviderUrl);
25             providerInvokerWrapper.setReg(true);
26         }
27         // 向註冊中心進行訂閱 override 資料
28         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
29         exporter.setRegisterUrl(registeredProviderUrl);
30         exporter.setSubscribeUrl(overrideSubscribeUrl);
31         // 建立並返回 DestroyableExporter
32         return new RegistryProtocol.DestroyableExporter(exporter);
33     }
View Code

    上面的原始碼主要是根據前面生成的URL進行服務的釋出和註冊(註冊在下一節展開原始碼)。當執行到doLocalExport也就是釋出本地服務到遠端時候會呼叫 DubboProtocol 的 export 方法大致會經歷下面一些步驟來匯出服務

    • 從Invoker獲取providerUrl,構建serviceKey(group/service:version:port),構建DubboExporter並以serviceKey為key放入本地map快取
    • 處理url攜帶的本地存根和callback回撥
    • 根據url開啟伺服器埠,暴露本地服務。先以url.getAddress為key查詢本地快取serverMap獲取ExchangeServer,如果不存在,則通過createServer建立。
    • createServer方法,設定心跳時間,判斷url中的傳輸方式(key=server,對應Transporter服務)是否支援,設定codec=dubbo,最後根據url和ExchangeHandler物件繫結server返回,這裡的ExchangeHandler非常重要,它就是消費方呼叫時,底層通訊層回撥的Handler,從而獲取包含實際Service實現的Invoker執行器,它是定義在DubboProtocol類中的ExchangeHandlerAdapter內部類。
    • 返回DubboExporter物件

     到這裡大致的服務釋出圖如下:

 

 六、服務註冊

  服務註冊操作對於 Dubbo 來說不是必需的,通過服務直連的方式就可以繞過註冊中心。但通常我們不會這麼做,直連方式不利於服務治理,僅推薦在測試服務時使用。對於 Dubbo 來說,註冊中心雖不是必需,但卻是必要的。原始碼如下:

 1 public void register(URL url) {
 2     super.register(url);
 3     failedRegistered.remove(url);
 4     failedUnregistered.remove(url);
 5     try {
 6         // 模板方法,由子類實現
 7         doRegister(url);
 8     } catch (Exception e) {
 9         Throwable t = e;
10 
11         // 獲取 check 引數,若 check = true 將會直接丟擲異常
12         boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
13                 && url.getParameter(Constants.CHECK_KEY, true)
14                 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
15         boolean skipFailback = t instanceof SkipFailbackWrapperException;
16         if (check || skipFailback) {
17             if (skipFailback) {
18                 t = t.getCause();
19             }
20             throw new IllegalStateException("Failed to register");
21         } else {
22             logger.error("Failed to register");
23         }
24 
25         // 記錄註冊失敗的連結
26         failedRegistered.add(url);
27     }
28 }
29 
30 //進入doRegister方法
31 
32 protected void doRegister(URL url) {
33     try {
34         // 通過 Zookeeper 客戶端建立節點,節點路徑由 toUrlPath 方法生成,路徑格式如下:
35         //   /${group}/${serviceInterface}/providers/${url}
36         // 比如
37         //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
38         zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
39     } catch (Throwable e) {
40         throw new RpcException("Failed to register...");
41     }
42 }
43 
44 //進入create方法
45 
46 public void create(String path, boolean ephemeral) {
47     if (!ephemeral) {
48         // 如果要建立的節點型別非臨時節點,那麼這裡要檢測節點是否存在
49         if (checkExists(path)) {
50             return;
51         }
52     }
53     int i = path.lastIndexOf('/');
54     if (i > 0) {
55         // 遞迴建立上一級路徑
56         create(path.substring(0, i), false);
57     }
58     
59     // 根據 ephemeral 的值建立臨時或持久節點
60     if (ephemeral) {
61         createEphemeral(path);
62     } else {
63         createPersistent(path);
64     }
65 }
66 
67 //進入createEphemeral
68 
69 public void createEphemeral(String path) {
70     try {
71         // 通過 Curator 框架建立節點
72         client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
73     } catch (NodeExistsException e) {
74     } catch (Exception e) {
75         throw new IllegalStateException(e.getMessage(), e);
76     }
77 }
View Code

  根據上面的方法,可以將當前服務對應的配置資訊(儲存在URL中的)註冊到註冊中心/dubbo/org.apache.dubbo.demo.DemoService/providers/ 。裡面直接使用了Curator進行建立節點(Curator是Netflix公司開源的一套zookeeper客戶端框架)

七、總結

  到這裡Dubbo的服務註冊流程終於是解釋完。核心在於Dubbo使用規定好的URL+SPI進行尋找和發現服務,通過URL定位註冊中心,再通過將服務的URL釋出到註冊中心從而使得消費者可以知道服務的有哪些,裡面可以看見對於URL這種複雜的物件並且需要經常更改的,通常採用建造者模式。而2.7.3版本的Dubbo原始碼也使用了Java8以後的新特性Lambda表示式來構建隱式函式。而一整套流程下來可以在ZooInspector這個zk視覺化客戶端看見我們建立的節點,前提是註冊中心為zk。

&n