服務註冊的幾個步驟
對於RPC框架的服務註冊,一般包含了如下的流程:
- 載入服務提供者,可能是通過xml配置的,也可能是通過掃描註解的
- 例項化服務提供者,並以服務介面作為key,實現類作為value儲存到一個map容器中
- 開啟網路監聽
- 將服務提供者的地址路徑(ip:port/服務名?引數等)註冊到註冊中心
- 當網路監聽接收到請求時,根據請求過來的服務名及引數等,從容器中獲取到服務提供者實現,通過消費端呼叫時傳送的方法名稱反射呼叫服務提供者的相關方法
Dubbo原始碼分析
Dubbo與Spring的整合
在實際的開發過程中,Dubbo大部分情況都是與Spring的生態進行整合使用的,所以在真正進入Dubbo的服務註冊之前,我們需要先了解Dubbo是怎麼將自己的環境嵌入到Spring生態中的。
在Spring中使用Dubbo的方式有兩種,一種是通過XML配置檔案,一種是通過註解的方式,由於當下Spring Boot盛行,所以這裡會比較深入的分析Dubbo在Spring Boot中的整合。不過其實兩種方式最終的都是將Dubbo的相關元件注入到Spring 的容器中
在Spring 中提供了一種NamespaceHandler的機制,用於對Spring標籤的擴充套件,所以在Spring使用xml的方式時,Dubbo中會提供一個名為DubboNamespaceHandler的處理器,用於解析spring 的xml中的各種dubbo標籤,並注入到容器中,這裡不再深入。DubboNamespaceHandler原始碼如下:
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
// 將xml中的相關標籤注入到spring中
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
/**
* Override {@link NamespaceHandlerSupport#parse(Element, ParserContext)} method
*
* @param element {@link Element}
* @param parserContext {@link ParserContext}
* @return
* @since 2.7.5
*/
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
BeanDefinitionRegistry registry = parserContext.getRegistry();
registerAnnotationConfigProcessors(registry);
/**
* @since 2.7.8
* issue : https://github.com/apache/dubbo/issues/6275
*/
registerCommonBeans(registry);
BeanDefinition beanDefinition = super.parse(element, parserContext);
setSource(beanDefinition);
return beanDefinition;
}
/**
* Register the processors for the Spring Annotation-Driven features
*
* @param registry {@link BeanDefinitionRegistry}
* @see AnnotationConfigUtils
* @since 2.7.5
*/
private void registerAnnotationConfigProcessors(BeanDefinitionRegistry registry) {
AnnotationConfigUtils.registerAnnotationConfigProcessors(registry);
}
}
接下來重點來看在Spring Boot中的整合。
在Dubbo與Spring的整合,有兩個入口可以讓我們進入到Dubbo主見初始化的,第一種就是通過@EnableDubbo註解上的DubboComponentScan註解,它是一個@Import註解,Spring會通過引入一個DubboComponentScanRegistrar註冊器在其registerBeanDefinitions方法上注入一個ServiceAnnotationBeanPostProcessor後置處理器。基於Spring的後置處理器原理,我們可以知道,它將會在bean例項化完成,初始化之前和之後各自產生回撥,具體稍後再敘。注入方法如下:
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(2);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}
另一個入口就是Spring Boot的自動裝配,在Dubbo中存在一個DubboAutoConfiguration類,該類通過Spring Boot中的自動裝配原理註冊到IOC容器中,同時該類是一個配置類,在該類中同時也裝配了一個ServiceAnnotationBeanPostProcessor
的bean,方法如下:
@ConditionalOnProperty(prefix = DUBBO_SCAN_PREFIX, name = BASE_PACKAGES_PROPERTY_NAME)
@ConditionalOnBean(name = BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME)
@Bean
public ServiceAnnotationBeanPostProcessor serviceAnnotationBeanPostProcessor(
@Qualifier(BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME) PropertyResolver propertyResolver) {
Set<String> packagesToScan = propertyResolver.getProperty(BASE_PACKAGES_PROPERTY_NAME, Set.class, emptySet());
return new ServiceAnnotationBeanPostProcessor(packagesToScan);
}
接下來就進入到ServiceAnnotationBeanPostProcessor一探究竟。
首先進入構造方法
public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
super(packagesToScan);
}
這裡將傳入的packagesToScan往父類進行傳遞,由於它繼承了ServiceClassPostProcessor,現在進入ServiceClassPostProcessor類的構造方法:
public ServiceClassPostProcessor(Set<String> packagesToScan) {
this.packagesToScan = packagesToScan;
}
ServiceClassPostProcessor只是將傳入的掃包路徑賦值給packagesToScan
根據BeanPostProcessor的特性,現在進入到postProcessBeanDefinitionRegistry方法
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
// @since 2.7.5
registerBeans(registry, DubboBootstrapApplicationListener.class);
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
registerServiceBeans(resolvedPackagesToScan, registry);
} else {
if (logger.isWarnEnabled()) {
logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
}
}
}
該方法主要做了以下幾件事:
- 註冊了一個DubboBootstrapApplicationListener監聽,具體作用稍後再敘
- 呼叫resolvePackagesToScan方法解析所有包名的路徑。可能包名中存在一Placeholders的特殊定義
- 呼叫registerServiceBeans方法進行註冊
具體怎麼解析包路徑不在本次討論範圍,所有就先不深入了,現在直接進入到registerServiceBeans方法中
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
// refactor @since 2.7.7
serviceAnnotationTypes.forEach(annotationType -> {
scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
});
for (String packageToScan : packagesToScan) {
// Registers @Service Bean first
scanner.scan(packageToScan);
// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
+ packageToScan + "]");
}
}
}
}
該方法有主要做了以下幾件事
構建了一個DubboClassPathBeanDefinitionScanner物件,該物件繼承自Spring的ClassPathBeanDefinitionScanner。在Spring中,ClassPathBeanDefinitionScanner是一個掃描程式,主要用來掃描Classpath下符合條件的物件,然後將物件注入到給定的registry中
定義一個為Bean生成名稱的BeanNameGenerator,這裡生成的是AnnotationBeanNameGenerator這個策略
將bean名稱策略set到scanner中
新增過濾Filter,這裡遍歷serviceAnnotationTypes,獲取到所有的過濾條件,這裡是基於註解的攔截,到serviceAnnotationTypes賦值的地方,可以看到。初始化了以下三種註解作為攔截
private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
// @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007
DubboService.class,
// @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service
Service.class,
// @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330
com.alibaba.dubbo.config.annotation.Service.class
);
遍歷解析後的掃描的包,呼叫scanner.scan(packageToScan)註冊所有標註了@Service的bean注入到ioc容器中
呼叫findServiceBeanDefinitionHolders查詢所有標註了@Service的Class封裝成BeanDefinitionHolders,不管是否被@ComponentScan掃描
如果beanDefinitionHolders存在元素,遍歷beanDefinitionHolders,呼叫registerServiceBean註冊
將標註了@Service註解的bean注入到ioc容器不屬於本次討論內容,這裡也不做詳細說明
下面進入到findServiceBeanDefinitionHolders方法,瞭解一下該方法都返回了那些型別的BeanDefinitionHolder
private Set<BeanDefinitionHolder> findServiceBeanDefinitionHolders(
ClassPathBeanDefinitionScanner scanner, String packageToScan, BeanDefinitionRegistry registry,
BeanNameGenerator beanNameGenerator) {
Set<BeanDefinition> beanDefinitions = scanner.findCandidateComponents(packageToScan);
Set<BeanDefinitionHolder> beanDefinitionHolders = new LinkedHashSet<>(beanDefinitions.size());
for (BeanDefinition beanDefinition : beanDefinitions) {
String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry);
BeanDefinitionHolder beanDefinitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
beanDefinitionHolders.add(beanDefinitionHolder);
}
return beanDefinitionHolders;
}
- 首先掃描傳入的packageToScan包下的所有的符合在scanner中定義的過濾註解的.class檔案,封裝成BeanDefinition
- 遍歷掃描到的beanDefinitions,通過名稱策略,為Bean生成名稱,同時用Bean和名稱構建成BeanDefinitionHolder,加入到beanDefinitionHolders中,返回該集合
beanDefinitionHolders的獲取到這裡就已經完成了,接下來進入到 registerServiceBean方法中,看看具體的註冊流程
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
DubboClassPathBeanDefinitionScanner scanner) {
// 通過beanDefinitionHolder中的BeanDefinition中儲存的全類名通過Class.forName載入成class物件
Class<?> beanClass = resolveClass(beanDefinitionHolder);
Annotation service = findServiceAnnotation(beanClass);
/**
* The {@link AnnotationAttributes} of @Service annotation
*/
AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
AbstractBeanDefinition serviceBeanDefinition =
buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
// ServiceBean Bean name
String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
registry.registerBeanDefinition(beanName, serviceBeanDefinition);
if (logger.isInfoEnabled()) {
logger.info("The BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean has been registered with name : " + beanName);
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean[ bean name : " + beanName +
"] was be found , Did @DubboComponentScan scan to same package in many times?");
}
}
}
- 獲取掃描到的類的位元組碼的class物件
- 獲取 beanClass上的註解,會跟初始化時的serviceAnnotationTypes屬性中的註解進行匹配,返回匹配到的註解
- 獲取匹配到的註解service上的所有屬性及其屬性值serviceAnnotationAttributes
- 獲取掃描到的.class物件實現的介面的class物件interfaceClass
- 獲取組裝BeanDefinitionHolder是為Bean生成的名稱
- 將所有引數傳入到buildServiceBeanDefinition方法中,構建一個AbstractBeanDefinition的物件,這裡我們暫時是不知道AbstractBeanDefinition儲存的是哪個Bean的定義
- 通過generateServiceBeanName方法構建一個ServiceBean的名稱。
- 檢查是否存在重複名稱的bean,如果不存在,則直接注入AbstractBeanDefinition的定義到ioc容器中
現在我們先來探索一下在buildServiceBeanDefinition中構建的是一個什麼Bean的定義,由於方法比較長,這裡就不貼程式碼了,該方法的大概的流程就是建立了一個ServiceBean的BeanDefinition。然後組裝前面解析到的註解的引數和獲取到的實現類的介面等為ServiceBean的屬性進行賦值,然後最後返回一個ServiceBean的BeanDefinition。
然後再來看看ServiceBean的命名規則是怎麼樣的
private String generateServiceBeanName(AnnotationAttributes serviceAnnotationAttributes, Class<?> interfaceClass) {
ServiceBeanNameBuilder builder = create(interfaceClass, environment)
.group(serviceAnnotationAttributes.getString("group"))
.version(serviceAnnotationAttributes.getString("version"));
return builder.build();
}
可以看到,ServiceBean的命名規則是通過介面的全類名以及group,version等一起來保證唯一名稱的,或許是長這樣的ServiceBean:com.bobo.dubbo.api.HelloService:2.0.1這一系列操作下來,就是為了構建一個ServiceBean。而我們在DubboNamespaceHandler的方式中,也可以看到,最終也注入了一個ServiceBean。那麼ServiceBean到底有何神奇之處呢?馬上揭曉!進入到ServiceBean,看到其繼承了ServiceConfig,同時實現了InitializingBean,DisposableBean,ApplicationContextAware,BeanNameAware,ApplicationEventPublisherAware等介面。而ServiceConfig又繼承了AbstractConfig類,它是,比如Service,Refrence,application,Monitor等配置類的父類,我們進入AbstractConfig類,發現它存在一個@PostConstruct註解標註的方法,它會在spring的bean初始化完成之後執行,我們進入該方法
@PostConstruct
public void addIntoConfigManager() {
ApplicationModel.getConfigManager().addConfig(this);
}
進入到ApplicationModel.getConfigManager()方法
public static ConfigManager getConfigManager() {
return (ConfigManager) LOADER.getExtension(ConfigManager.NAME);
}
看到這裡,上一篇的SPI知識就排上了用場了,這是一個用來獲取FrameworkExt介面的擴充套件實現的擴充套件點,同時ConfigManager.NAME指定了需要獲取的擴充套件點的名稱為config。所以就到FrameworkExt的實現類中查詢一個名為config的擴充套件點實現即可得到。其實這裡得到的就是ConfigManager本身。
所以在addIntoConfigManager方法中,實際上是將當前的bean儲存到了ConfigManager的物件中,最終儲存到了ConfigManager的configsCache中。ConfigManager主要是用來管理Dubbo中的所有繼承了AbstractConfig的配置
而在ServiceBean中,我們並沒有看到有任何任何有價值的東西,到這裡看起來似乎前路已斷,不知道怎麼樣入手了。
此時突然想起來我們在進行掃包的一系列操作之前,貌似註冊了一個監聽器,是不是可以從監聽器入手呢?
進入到前面註冊的DubboBootstrapApplicationListener
監聽器中
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
implements Ordered {
/**
* The bean name of {@link DubboBootstrapApplicationListener}
*
* @since 2.7.6
*/
public static final String BEAN_NAME = "dubboBootstrapApplicationListener";
private final DubboBootstrap dubboBootstrap;
public DubboBootstrapApplicationListener() {
this.dubboBootstrap = DubboBootstrap.getInstance();
}
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
private void onContextClosedEvent(ContextClosedEvent event) {
dubboBootstrap.stop();
}
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
}
可以看到,該監聽器監聽了容器的容器的重新整理和關閉,我們前面的操作已經將ServiceBean注入到了ioc容器中,根據ioc的容器初始化的幾個週期,可以知道在Refreshd容器時,我們所有的服務提供者對應的ServiceBean已經全部裝載到了容器中。
繼續往下,當產生ContextRefreshedEvent事件時,呼叫了onContextRefreshedEvent方法,該方法中呼叫dubboBootstrap.start();
到這裡,跟Spring相關的東西已經走完了,下面做一個總結
- 通過Spring Boot的自動裝配或@EnableDubbo註解自動注入一個ServiceAnnotationBeanPostProcessor傳入需要掃描的包的路徑
- 註冊了一個``DubboBootstrapApplicationListener`監聽
- 根據BeanPostProcessor的特性,呼叫postProcessBeanDefinitionRegistry方法,根據傳入的掃包路徑進行掃描,然後將所有的標註了@Service註解的bean注入到ioc容器中
- 繼續掃描包,獲得標註了Service/DubboService等註解的所有BeanDefinitionHolders
- 遍歷BeanDefinitionHolders,解析出每個BeanDefinition中的介面,標註的註解,及註解上定義的引數等。
- 通過解析出來的一系列資訊生成一個ServiceBean.然後將ServiceBean注入到ioc容器中。
- 同時在ServiceBean的父類AbstractConfig中,會存在一個標註了@PostConstruct註解的方法,它會在bean初始化完成之後,將當前bean儲存到一個ConfigManager物件中,它dubbo環境中是一個單例的存在。
- 在Spring進行Refresh容器時,會觸發一個事件,呼叫dubboBootstrap.start();方法,啟動
接下來就真正的進入到Dubbo的服務釋出,註冊的世界,一探究竟吧
Dubbo的服務註冊與釋出
在進入start()方法之前,首先需要看看dubboBootstrap的初始化過程,它是一個單例的物件,直接進入DubboBootstrap的構造方法
private DubboBootstrap() {
configManager = ApplicationModel.getConfigManager();
environment = ApplicationModel.getEnvironment();
DubboShutdownHook.getDubboShutdownHook().register();
ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
@Override
public void callback() throws Throwable {
DubboBootstrap.this.destroy();
}
});
}
可以看到,初始化時構建了configManager和environment,其中configManager主要用於管理Dubbo中的所有配置。Environment展示先不關注
根據以上的分析,我們現在進入到start()方法
public DubboBootstrap start() {
// 已經啟動過後,就不用再次啟動了
if (started.compareAndSet(false, true)) {
ready.set(false);
// 初始化方法,就是檢查一些配置,啟動配置中心等等
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
// 真正執行釋出服務的方法
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
呼叫了exportServices方法進行了服務的釋出和註冊,呼叫referServices方法進行服務的發現,服務發現將留到下一篇去,今天只對服務的註冊進行探索。
進入到exportServices方法
private void exportServices() {
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
遍歷我們在Spring Boot環節時新增到configManager的所有ServiceConfig,將當前的物件傳入到ServiceConfig中,同步或非同步呼叫ServiceConfig的export方法。
進入到export方法
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
//init serviceMetadata
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
exported();
}
該方法算是Dubbo服務釋出的入口流程方法了。
- 判斷是否應該釋出本服務
- 如果DubboBootstrap物件為null,初始化一個DubboBootstrap物件
- 檢查是否更新存根配置
- 初始化ServiceMetadata,將注入Bean時初始化的一些引數儲存到serviceMetadata中
- 延時或同步呼叫doExport
進入doExport
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
做了一系列判斷,標識等初始化之後,再呼叫doExportUrls方法
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
- 獲取到一個ServiceRepository,根據前面分析的經驗,可以看出這裡得到的就是一個ServiceRepository物件。
- 根據服務的介面名稱和位元組碼封裝一個ServiceDescriptor儲存到repository的services中。並返回ServiceDescriptor
- 呼叫registerProvider方法,將唯一服務名,服務的Provider,serviceDescriptor,當前物件,ServiceMetadata等傳入方法。
解析註冊中心的URLregistryURLs,這裡返回的是如下的URL
registry://192.168.100.127:2181/org.apache.dubbo.registry.RegistryService?application=spring-cloud-alibaba-boot-dubbo-provider&default=true&dubbo=2.0.2&pid=48053&preferred=true&qos.enable=false®istry=zookeeper&release=2.7.7&timeout=10000×tamp=1598411022992
- 遍歷protocols,根據遍歷到的協議拼接成不同的pathKey,呼叫registerService進行註冊,儲存服務源資訊
- 根據不同的協議,呼叫doExportUrlsFor1Protocol方法進行註冊
進入registerProvider方法,該方法會將傳入的物件構建成一個ProviderModel物件。儲存到相應的集合中,同時在ProviderModel物件初始化時,會呼叫將該介面的所有方法遍歷,構建成一個ProviderMethodModel儲存到methods中。
然後進入到doExportUrlsFor1Protocol,方法過長,這裡就不貼程式碼了。其主要完成了以下功能
- 根據初始化ServiceBean時傳入的各個引數,封裝成一個map
- 獲取當前伺服器的host
- 獲取當前服務需要監聽的port
- 根據封裝的引數,協議,host,port構建一個URL
- 釋出一個本地服務-injvm
- 獲取到配置的註冊中心的URL,可以存在多個註冊中心,這就是Dubbo對多註冊中心的支援
- 添加註冊中心的URL的引數
- 生成monitor的URL
- 再次封裝釋出服務的URL的引數
- 通過Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));將當前服務的url作為引數新增到註冊中心的url上,然後使用registryURL和當前服務介面位元組碼,服務實現構建一個invoker,這是一個屬於註冊中心的invoker;
- 使用Invoker和當前的ServiceConfig構建一個 DelegateProviderMetaDataInvoker物件
- 呼叫Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);進行服務的釋出
- 將返回的exporter新增到exporters中
接下來看看是怎麼獲取到Invoker的,根據我們的SPI的知識,在沒有引數中沒有指定擴充套件點時,會使用預設@SPI註解上預設指定的擴充套件點,由於在ProxyFactory類上的註解為@SPI("javassist"),所以可以知道這裡獲取到的擴充套件點為JavassistProxyFactory的物件,在進入JavassistProxyFactory的getInvoker()方法之前,根據我們學習SPI的知識,或許該擴充套件點存在一些包裝,這裡就不詳細說明了,主要講服務釋出的主要流程。進入該類的getInvoker()方法。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
根據傳入的服務介面的class物件,動態生成的一個包裝器,該包裝器繼承了Wrapper了,重寫了invokeMethod()方法。重寫方法如下
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl w;
try {
w = ((com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("hello".equals($2) && $3.length == 1) {
return ($w) w.hello((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl.");
}
所以,當AbstractProxyInvoker的doInvoke方法被呼叫的時候,會直接執行被傳入服務提供者的具體方法。這樣做的好處就是在服務啟動時就將方法呼叫準備好,在被遠端呼叫時,直接通過引用呼叫,而不需要通過反射呼叫。提高效能。回到doExportUrlsFor1Protocol方法,根據返回的invoker和當前物件包裝一個DelegateProviderMetaDataInvoker物件,接下來呼叫Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);方法。
現在先來確定PROCTOL的具體實現是什麼,PROCTOL是一個自適應的擴充套件點,它會生成一個Proctol&Adaptie的類,該類實現了Protocol介面,重寫了Protocol的export和refer()方法。這裡只討論生成的export方法,如下:
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
該方法會根據傳入的invoker物件中的 protocol作為副檔名,獲取Protocol的擴充套件實現,因為我上一步我們傳入的是registryURl,根據註解的協議可以知道,通過自適應擴充套件物件獲取到的擴充套件實現為RegistryProtocol的物件,進入該類的export方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
- 呼叫getRegistryUrl(originInvoker);方法獲取註冊中心的真正的URL,這裡會將registry替換成我們配置的zookeeper,如果配置的是nacos,則返回nacos
- 呼叫getProviderUrl方法獲取服務釋出的url,稍後會將該服務釋出到註冊中心上
- 呼叫doLocalExport(originInvoker, providerUrl);釋出一個本地服務
- 呼叫getRegistry(originInvoker);獲取真正配置的註冊中心的Registry.
- 將服務註冊到註冊中心
進入doLocalExport(originInvoker, providerUrl);方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
首先獲取服務的的key,格式如下:
dubbo://192.168.100.127:20880/com.wangx.dubbo.api.HelloService?anyhost=true&application=spring-cloud-alibaba-boot-dubbo-provider&bind.ip=192.168.100.127&bind.port=20880&deprecated=false&dubbo=2.0.2&generic=false&interface=com.wangx.dubbo.api.HelloService&methods=hello&pid=58949&qos.enable=false&release=2.7.7&revision=2.0.1&side=provider×tamp=1598493906319&version=2.0.1
包含了服務的協議,埠,服務名稱,應用名稱,版本等資訊。將入傳入的Invoker和providerUrl構建一個InvokerDelegate型別的物件。並將服務提供者URL賦值為InvokerDelegate物件的url屬性,將該物件傳入到protocol.export(invokerDelegate)方法中。根據SPI的原理,這裡會依賴注入一個Procotol$Adaptive的自適應擴充套件類,此時傳入的是Dubbo協議的URL,所以這裡實際會執行的是DubboProtocol中的方法。進入DubboProtocol類中的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
- 根據URL獲取一個service的key,構建一個DubboExporter物件,該物件包含了傳入的key,invoker,exporterMap等,在invoker中有嵌套了最開始生成的能夠實際執行服務提供者方法的代理物件。
- 將DubboExporter的物件儲存到exporterMap集合中。
- 呼叫openServer方法釋出服務
- 序列化url
進入openServer(url)
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
剛方法主要雙重檢查是否存在當前地址的ProtocolServer,不存在建立一個儲存到serverMap中。進入createServer方法該方法主要是呼叫Exchangers.bind(url, requestHandler);方法對地址和埠進行監聽,然後轉入一個requestHandler物件,當接收到請求時,呼叫requestHandler物件的reply方法進行處理
這裡不再深入到netty網路部分,直接來看看接收到請求的時候,是怎麼處理的,進入到reply方法
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
將接收到的message轉成Invocation物件,這是rpc在傳輸過程中對於呼叫端的一些引數的封裝,包含了服務名,埠,方法名,方法引數等,通過inv獲取到invoker,進入getInvoker該方法主要通過inv封裝成一個serviceKey,然後通過該serviceKey從exporterMap容器中獲取到對應的DubboExporter,然後從該DubboExporter中獲取到初始化時傳入的invoker回到reply方法,會呼叫返回的invoker.invoker方法,該invoker中最底層封裝了一個最原始的AbstractProxyInvoker的invoker,所以最終會呼叫到該型別的物件的invoker,如下:
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
然後這裡會呼叫一個doInvoker()方法,這是一個模板方法,正好對應前面建立的如下程式碼
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
所以最終呼叫wrapper.invokerMethod方法,然後呼叫對應服務的對應的方法。
到這裡服務本地監聽和當請求過來時的處理就大致聊完了,接下來重新回到RegistryProtocol類的export方法
進入到register(registryUrl, registeredProviderUrl);方法
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
這裡的registryUrl經過轉換,已經變成了我們配置的zokeeper協議的url,所以這裡我們將會獲得一個ZookeeperRegistry的物件。將registeredProviderUrl傳入到register方法中,在進入到ZookeeperRegistry中時,發現並沒有register方法,那麼只可能存在於它的父類中,在它的父類FailbackRegistry中,我們找到了這個方法,進入
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
該發放做了一系列操作之後,實際呼叫了doRegister()方法,該方法是一個抽象方法,由子類實現,這裡就是有zookeeperRegistry進行實現的。
進入到ZookeeperRegistry中的doRegister()方法
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
發現就是呼叫zkClient在註冊中心上建立一些節點,至此,我們的整個服務釋出和註冊功能都已經完成了
接下來總結一下在Dubbo的整個階段,有哪些主要的過程
- ServiceBean註冊到IOC容器時會將自身物件儲存到ConfigManager的物件中
- 在Dubbo呼叫DubboBoostrap.start()中的釋出方法時,遍歷ConfigManager物件中儲存的ServiceBean,開始了服務註冊之旅
- 呼叫ServiceBean的父類的export()方法,進行真正的服務註冊,該物件中會儲存一個ref屬性,這是一個服務真正的提供者
- 獲取配置的註冊中心,因為可能存在多個註冊中心,需要遍歷多個,這就是多註冊中心的實現,拿到註冊中心的url
- 構建服務提供這的URL並封裝各項引數,根據引數和註冊中心的url接服務提供者實現生成一個Invoker,該invoker中會儲存一個代理物件,該物件那個的invokerMethod()方法將會直接呼叫服務提供者的對應的方法
- 將該invoker進行一些列封裝,然後儲存到一個map集合中,使用netty開啟一個服務提供者的埠監聽,並將一個requestHandler進行繫結,當接收到請求時,呼叫該物件的reply方法,該方法最終會執行上一步驟生成的invokerMethod()方法,這就形成了一個釋出和呼叫的閉環
- 在釋出完成本地服務,開啟了埠的監聽之後,需要將服務提供者的URL註冊到註冊中心,根據註冊中心配置的協議,這裡獲取到了Zookeeper的註冊中心,然後將服務提供者的URL通過zkClint在註冊中心上建立一個node,表示註冊完成
搞定~