Dubbo Provider啟動流程原始碼分析
阿新 • • 發佈:2019-02-19
簡單的官方demo:
provider的java程式碼:
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
System.in.read(); // 按任意鍵退出
}
provider的spring配置:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd" >
<!-- 提供方應用資訊,用於計算依賴關係 -->
<dubbo:application name="demo-provider"/>
<!-- 使用multicast廣播註冊中心暴露服務地址 -->
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<!-- 用dubbo協議在20880埠暴露服務 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 和本地bean一樣實現服務 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- 宣告需要暴露的服務介面 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
</beans>
通過配置檔案,以及spring的mvc初始化流程,我們可以假設下服務啟動流程:
1. spring配置檔案的載入以及初始化
2. bean的單例生成
3. dubbo管理所有服務實現單例物件
4. 向配置中心註冊服務資訊
spring自定義標籤
dubbo標籤初始化實現了Spring提供的NamespaceHandler介面,所以下面先看看DubboNamespaceHandler類:
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
// DubboBeanDefinitionParser定義瞭如何解析dubbo節點資訊
// DubboBeanDefinitionParser的第一個引數是beanclass
// 應用相關配置
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
// 模組相關配置
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
// 註冊中心相關配置
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
//
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
// 服務提供者配置
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
// 服務消費者配置
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
// 網路協議相關配置
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
// 服務bean配置
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
//
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
//
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
在DubboBeanDefinitionParser中的parse中,解析設定了大部分配置資訊以及服務資訊。
我們可以關注下其中beanclass的原始碼,因為這章主要分析的是provider,這裡從provider進行分析:
首先是beanName叫做ServiceBean的bean例項。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
...
// 初始化bean的時候執行
public void afterPropertiesSet() throws Exception {
...// 初始化各種配置
// 釋出服務
if (!isDelay()) {
export();
}
}
}
釋出程式碼:
public synchronized void export() {
...
// 延遲匯出
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
匯出程式碼:
protected synchronized void doExport() {
...
doExportUrls();
}
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
// method => invoker => exporter
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
...
// 匯出服務
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 註冊中心可以是zk,consul等
// 註冊中心host
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// 註冊中心port
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 獲取暴露範圍配置
String scope = url.getParameter(Constants.SCOPE_KEY);
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// 如果暴露本地服務
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 如果暴露遠端服務,走服務發現流程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 註冊地址
URL monitorUrl = loadMonitor(registryURL);
// 動態代理,將class+method包裝位invoker,ref是服務的具體例項物件obj,invoker是個可執行物件
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// invoker=>exporter,最終在服務端儲存下來的是exporter,對服務的暴露和引用都是通過這個物件實現的,而這個物件的實現由協議決定
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
invoker 生成,動態代理的過程
定義程式碼:
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
@SPI("javassist") //使用javassist位元組碼技術生成物件
public interface ProxyFactory {
/**
* create proxy.
* 定義生成代理物件的方法
*
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create invoker.
* getProxy呼叫的引數生成
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
// jdk 動態代理
public class JdkProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
}
// javassist動態代理
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper類不能正確處理帶$的類名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
private final T proxy; //在proxyFactory.getInvoker的時候被設定,即
private final Class<T> type;
private final URL url;
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 呼叫的是jdkproxyfactory和javassistproxyfactory定義的方法
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}
實際函式呼叫:
// 泛型呼叫
public class ExtensionLoader<T> {
// 快取
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
// class->value
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
// 獲取loader,優先快取
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
// 每個class綁定了一個value,獲取這個value,優先快取
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
}
}
}
}
return (T) instance;
}
}
exporter 的生成
抽象類
public abstract class AbstractProtocol implements Protocol {
// 一個協議對應著多個exporter
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
// 對應著一堆invoker
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
}
以thrift作為交換資料協議為例
public class ThriftProtocol extends AbstractProtocol {
// thrift port
public static final int DEFAULT_PORT = 40880;
// 對應的資料交換方
// ip:port -> ExchangeServer
private final ConcurrentMap<String, ExchangeServer> serverMap =
new ConcurrentHashMap<String, ExchangeServer>();
// 服務釋出的函式呼叫
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 只能使用 thrift codec
URL url = invoker.getUrl().addParameter(Constants.CODEC_KEY, ThriftCodec.NAME);
// find server.
String key = url.getAddress();
//client 也可以暴露一個只有server可以呼叫的服務。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer && !serverMap.containsKey(key)) {
serverMap.put(key, getServer(url));
}
// export service.
key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 快取起來
exporterMap.put(key, exporter);
return exporter;
}
public void destroy() {
// 銷燬invoker
super.destroy();
// 移除消費ip,關閉server
for (String key : new ArrayList<String>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
server.close(getServerShutdownTimeout());
} // ~ end of if ( server != null )
} // ~ end of loop serverMap
} // ~ end of method destroy
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
ThriftInvoker<T> invoker = new ThriftInvoker<T>(type, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
}