Dubbo原始碼分析(五)服務暴露的具體流程(下)
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); //首先嚐試從快取中獲取 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //從export中拿到之前的url 即為dubbo協議的url //建立 Invoker 為委託類物件 final Invoker<?> invokerDelegete = new InvokerDelegete<T>( originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); //寫入快取 bounds.put(key, exporter); } } } return exporter; } 複製程式碼
如上程式碼,它先嚐試從快取中獲取,如果沒有則呼叫 protocol.export
去暴露。
在這裡的 protocol
物件其實是一個自適應擴充套件類物件 Protocol$Adaptive
,我們呼叫它的 export
方法,它會根據協議名稱獲取對應的擴充套件實現類,在這裡它是 DubboProtocol
。
不知諸位是否還有印象,我們在第二章節已經說過。通過 ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
這句程式碼獲取到的其實是Wrapper包裝類的物件, ProtocolListenerWrapper
1、服務暴露監聽
ProtocolListenerWrapper.export
方法主要是獲取服務暴露監聽器,在服務暴露和取消服務暴露時可以獲得通知。
public class ProtocolListenerWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if ("registry".equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //獲取ExporterListener型別的擴充套件點載入器 ExtensionLoader<ExporterListener> extensionLoader = ExtensionLoader.getExtensionLoader(ExporterListener.class); //獲取監聽器 List<ExporterListener> activateExtension = extensionLoader. getActivateExtension(invoker.getUrl(), "exporter.listener"); //呼叫ProtocolFilterWrapper.export繼續暴露 Exporter<T> export = protocol.export(invoker); List<ExporterListener> exporterListeners = Collections.unmodifiableList(activateExtension); //迴圈監聽器 通知方法。返回ListenerExporterWrapper物件 ListenerExporterWrapper<T> listenerExporterWrapper = new ListenerExporterWrapper<>(export, exporterListeners); return listenerExporterWrapper; } } 複製程式碼
比如,我們可以建立一個自定義的監聽器。
public class MyExporterListener1 implements ExporterListener { public void exported(Exporter<?> exporter) throws RpcException { System.out.println("111111111111111-------服務暴露"); } public void unexported(Exporter<?> exporter) { System.out.println("111111111111111-------取消服務暴露"); } } 複製程式碼
然後建立擴充套件點配置檔案,檔名稱為: org.apache.dubbo.rpc.ExporterListener
內容為: listener1=org.apache.dubbo.demo.provider.MyExporterListener1
然後在Dubbo配置檔案中,這樣定義: <dubbo:provider listener="listener1" />
那麼,當服務暴露完成後,你將會獲得通知。
2、構建呼叫鏈
上一步在 ProtocolListenerWrapper.export
方法中,返回之前還呼叫了 ProtocolFilterWrapper.export
。它主要是為了建立包含各種Filter的呼叫鏈。
public class ProtocolFilterWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //建立Filter 過濾鏈的 Invoker Invoker<T> tInvoker = buildInvokerChain(invoker, "service.filter","provider"); //呼叫DubboProtocol繼續暴露 Exporter<T> export = protocol.export(tInvoker); //返回 return export; } } 複製程式碼
這裡的重點是buildInvokerChain方法,它來建立呼叫鏈攔截器。每次遠端方法執行,該攔截都會被執行,在Dubbo中已知的Filter有
org.apache.dubbo.rpc.filter.EchoFilter org.apache.dubbo.rpc.filter.GenericFilter org.apache.dubbo.rpc.filter.GenericImplFilter org.apache.dubbo.rpc.filter.TokenFilter org.apache.dubbo.rpc.filter.AccessLogFilter org.apache.dubbo.rpc.filter.CountFilter org.apache.dubbo.rpc.filter.ActiveLimitFilter org.apache.dubbo.rpc.filter.ClassLoaderFilter org.apache.dubbo.rpc.filter.ContextFilter org.apache.dubbo.rpc.filter.ConsumerContextFilter org.apache.dubbo.rpc.filter.ExceptionFilter org.apache.dubbo.rpc.filter.ExecuteLimitFilter org.apache.dubbo.rpc.filter.DeprecatedFilter 複製程式碼
此時的invoker經過各種Filter的包裝,就變成了下面這個樣子:

當然了,我們也可以自定義Filter。比如像下面這樣:
public class MyFilter1 implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { System.out.println("呼叫之前:"+invoker.getUrl().toFullString()); Result result = invoker.invoke(invocation); System.out.println("呼叫之後:"+invoker.getUrl().toFullString()); return result; } } 複製程式碼
然後建立擴充套件點配置檔案,檔名稱為: resources\META-INF\dubbo\com.alibaba.dubbo.rpc.Filter
內容為: myfilter1=org.apache.dubbo.demo.provider.MyFilter1
然後在Dubbo配置檔案中,這樣定義: <dubbo:provider filter="myfilter1"/>
需要注意的是,這樣配置之後,myfilter1會在預設的Filter之後。如果你希望在預設的Filter前面,那麼你可以這樣配置 <dubbo:provider filter="myfilter1,default"/>
3、DubboProtocol
經過上面各種的搞來搞去,終於可以真正的暴露服務了。呼叫 DubboProtocol.export
,我們重點兩部分:建立DubboExporter和啟動伺服器。
public class DubboProtocol extends AbstractProtocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); //服務標識 //例如:com.viewscenes.netsupervisor.service.InfoUserService:20880 String key = serviceKey(url); //建立 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //將 <key, exporter> 鍵值對放入快取中 exporterMap.put(key, exporter); //省略無關程式碼... // 啟動通訊伺服器 openServer(url); //優化序列化 optimizeSerialization(url); return exporter; } } 複製程式碼
3.1、建立DubboExporter
事實上,建立DubboExporter的過程非常簡單,就是呼叫建構函式賦值而已。
public class DubboExporter<T> extends AbstractExporter<T> { public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; } } 複製程式碼
3.2、啟動伺服器
private void openServer(URL url) { //獲取IP:埠 ,並將它作為伺服器例項的key String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { //先從快取中獲取 ExchangeServer server = serverMap.get(key); if (server == null) { //建立伺服器例項 serverMap.put(key, createServer(url)); } else { //重置伺服器 server.reset(url); } } } 複製程式碼
如上程式碼,Dubbo先從快取中獲取已啟動的伺服器例項,未命中的話就去建立。如果已經存在伺服器例項,就根據url的內容重置伺服器。我們重點分析建立的過程。
private ExchangeServer createServer(URL url) { //伺服器關閉時 傳送readonly事件 url = url.addParameterIfAbsent("channel.readonly.sent","true"); //設定心跳檢測 url = url.addParameterIfAbsent("heartbeat", "60000"); //獲取伺服器引數 預設為netty String str = url.getParameter("server","netty"); //通過 SPI 檢測是否存在 server 引數所代表的 Transporter 拓展,不存在則丟擲異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)){ throw new RpcException("Unsupported server type: " + str + ", url: " + url); } //設定伺服器編解碼器為dubbo url = url.addParameter("codec", "dubbo"); ExchangeServer server; try { //建立ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader. getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; } 複製程式碼
上面的程式碼主要分為兩部分:設定預設引數和建立伺服器例項。設定引數沒什麼好說的,下面呼叫到 HeaderExchanger.bind
方法,它只是設定封裝Handler處理器。
public class HeaderExchanger implements Exchanger { public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //封裝Handler處理器 HeaderExchangeHandler headerExchangeHandler = new HeaderExchangeHandler(handler); DecodeHandler decodeHandler = new DecodeHandler(headerExchangeHandler); //建立伺服器 Server bind = Transporters.bind(url, decodeHandler); //封裝為HeaderExchangeServer物件返回 HeaderExchangeServer server = new HeaderExchangeServer(bind); return server; } } 複製程式碼
我們只需關注 Transporters.bind
,它負責啟動伺服器。
public class Transporters { public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //獲取自適應 Transporter 例項 Transporter adaptiveExtension = ExtensionLoader. getExtensionLoader(Transporter.class).getAdaptiveExtension(); //呼叫NettyServer.bind return adaptiveExtension.bind(url, handler); } } 複製程式碼
如上程式碼,它首先獲取自適應 Transporter 例項,即 TransporterAdaptive
。然後根據傳入的url引數來載入哪個Transporter,在Dubbo中預設是NettyTransporter。需要注意的是,根據Dubbo版本的不同,有可能使用Netty的版本也不一樣。
比如,筆者在Dubbo2.7快照版本中(還未發行),看到的Netty配置檔案是這樣,說明它預設使用的就是Netty4:
netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter netty= org.apache.dubbo.remoting.transport.netty4.NettyTransporter 複製程式碼
在Dubbo2.6版本中,看到的Netty配置檔案是這樣,說明你只要不指定Netty4,那就使用Netty3
netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter 複製程式碼
不過這些都無傷大雅,我們以Netty3接著看....
public class NettyTransporter implements Transporter { public Server bind(URL url, ChannelHandler listener){ //建立 NettyServer return new NettyServer(url, listener); } } public class NettyServer extends AbstractServer implements Server { public NettyServer(URL url, ChannelHandler handler) { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, "DubboServerHandler"))); } } 複製程式碼
我們看到, 在 NettyTransporter.bind
方法裡,它呼叫的是 NettyServer
建構函式,緊接著又呼叫父類的建構函式。
public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); //獲取 ip 和埠 String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost()); int bindPort = getUrl().getParameter("bind.port", getUrl().getPort()); if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) { // 設定 ip 為 0.0.0.0 bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter("accepts", 0); this.idleTimeout = url.getParameter("idle.timeout", 600000); try { //呼叫子類方法 開啟伺服器 doOpen(); } } } 複製程式碼
如上程式碼,在父類的建構函式裡面主要是設定了一些引數,無需多說。接著我們再看子類的doOpen實現。
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); // 建立 boss 和 worker 執行緒池 // 設定執行緒的名稱 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory( boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); //建立 ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // 設定 PipelineFactory bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // 繫結到指定的 ip 和埠上 channel = bootstrap.bind(getBindAddress()); } 複製程式碼
以上方法就通過Netty啟動了通訊伺服器。熟悉Netty的朋友對這段程式碼一定不陌生,如果想了解更多,我們需要關注一下它的處理器。
處理器
ChannelHandler是Netty中的核心元件之一。在這裡,Dubbo使用 NettyHandler
作為訊息處理器。它繼承自 SimpleChannelHandler
,這說明Netty接收到的事件都會由此類來處理。比如: 客戶端連線、客戶端斷開連線、資料讀取、網路異常...
我們重點來看資料讀取方法。
@Sharable public class NettyHandler extends SimpleChannelHandler { public NettyHandler(URL url, ChannelHandler handler) { this.url = url; this.handler = handler; } //接收到訊息 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } } 複製程式碼
當Netty的Selector輪詢到資料讀取事件後,將呼叫 messageReceived
方法。在這裡,它呼叫的是 handler.received
,由建構函式可得知,此處的 handler
物件其實是 NettyServer
物件的例項。
中間它會經過 AllChannelHandler
,在這裡會線上程池中分配一個執行緒去處理。
public class AllChannelHandler extends WrappedChannelHandler { public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } } 複製程式碼
ChannelEventRunnable實現Runnable介面,我們看它的run方法。其實也很簡單,就是根據事件狀態,繼續往下呼叫。
public class ChannelEventRunnable implements Runnable { public void run() { switch (state) { case CONNECTED: try { handler.connected(channel); } break; case DISCONNECTED: try { handler.disconnected(channel); } break; case SENT: try { handler.sent(channel, message); } break; case RECEIVED: try { handler.received(channel, message); } break; case CAUGHT: try { handler.caught(channel, exception); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } } 複製程式碼
再深入的過程我想不必再深究了,無非是業務邏輯處理。不過還有另外一個問題,這個執行緒池是什麼樣的?大小多少呢? 通過跟蹤,我們發現它是在其父類中被初始化的。它也是通過ExtensionLoader載入的
public class WrappedChannelHandler implements ChannelHandlerDelegate { protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; ExtensionLoader<ThreadPool> extensionLoader = ExtensionLoader.getExtensionLoader(ThreadPool.class); ThreadPool adaptiveExtension = extensionLoader.getAdaptiveExtension(); executor = (ExecutorService) adaptiveExtension.getExecutor(url); } } 複製程式碼
然後我們看 ThreadPool
介面標註了預設實現 @SPI("fixed")
,它是一個固定數量的執行緒池。
public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { //設定執行緒池引數 String name = url.getParameter("threadname", "Dubbo"); int threads = url.getParameter("threads", 200); int queues = url.getParameter("queues",0); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } 複製程式碼
由此我們可以回答上面的問題了,Dubbo中的執行緒池是固定執行緒數量大小為200的執行緒池。如果執行緒池滿了怎麼辦?我們再看下它的拒絕策略。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } 複製程式碼
學到了嗎?
- 列印錯誤資訊
- 匯出執行緒棧資訊
- 丟擲異常
到此,關於服務暴露的過程就分析完了。整個過程比較複雜,大家在分析的過程中耐心一些。並且多寫 Demo 進行斷點除錯,以便能夠更好的理解程式碼邏輯。
二、服務註冊
服務註冊就是把已經暴露的服務資訊註冊到第三方平臺,以供消費者使用。我們把目光回到 RegistryProtocol.export
方法,我們以zookeeper註冊中心為例。
1、建立註冊中心
首先,需要根據配置檔案的資訊獲取到註冊中心的url,比如以zookeeper為例: zookeeper://192.168.139.131:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo_producer1&client=zkclient&dubbo=2.6.2......
我們直接來到 ZookeeperRegistry
,這裡的重點是呼叫 connect
方法建立Zookeeper 客戶端。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { //省略部分程式碼... //建立zookeeper客戶端 zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { //重新連線事件 recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } 複製程式碼
在這裡有一點需要注意,Dubbo官網說,連線zookeeper預設使用zkclient。
從 2.2.0
版本開始預設為 zkclient 實現,以提升 zookeeper 客戶端的健狀性。
但從程式碼上看,它預設使用的是curator客戶端。 @SPI("curator")
這一點比較費解,所以如果想使用zkclient,要在配置檔案中指定: <dubbo:registry address="zookeeper://192.168.139.131:2181?client=zkclient"/>
然後我們接著往下繼續看,最終呼叫zkclient的方法完成zookeeper客戶端的建立。
public ZkclientZookeeperClient(URL url) { //非同步呼叫ZkClient建立客戶端 client = new ZkClientWrapper(url.getBackupAddress(), 30000); //監聽zookeeper狀態 client.addListener(new IZkStateListener() { @Override public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; if (state == KeeperState.Disconnected) { stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } @Override public void handleNewSession() throws Exception { stateChanged(StateListener.RECONNECTED); } }); client.start(); } 複製程式碼
2、建立節點
建立節點很簡單,就是將服務配置資料寫入到 Zookeeper 的某個路徑的節點下。
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } 複製程式碼
我們看下zookeeper中已經建立好的節點資訊:

三、總結
至此,Dubbo中的服務暴露全過程我們已經分析完了。由於篇幅問題,筆者將它們分為了上下兩篇。字數比較多,邏輯也較為複雜,如果文章有不妥錯誤之處,希望大家提出寶貴意見。
我們再回憶一下整個流程:
- 通過Spring介面呼叫初始化方法
- 配置資訊檢查以及預設值設定
- 建立服務類ref Invoker
- 服務暴露監聽、構建呼叫鏈
- 本地暴露
- 遠端暴露
- 啟動Netty通訊伺服器,監聽埠
- 連線zookeeper建立節點,將已暴露的服務資訊寫入註冊中心