Dubbo分析之Transport層
前言
上一篇文章ofollow,noindex" target="_blank">Dubbo分析之Serialize層 ,介紹了最底層的序列化/反序列化層,本文繼續分析Serialize層的上一層transport網路傳輸層,此層使用了現有的一些通訊開源框架(ex:netty,mina,grizzly)來做底層通訊,上文也做了簡單介紹,本文將做更深入的瞭解;
Transporter類分析
dubbo為通訊框架提供了統一的bind和connet介面,方便進行管理和擴充套件,封裝在介面類:Transporter中:
@SPI("netty") public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; }
提供了bind和connect介面,分別對應這伺服器端和客戶端,具體有哪些實現類,如下圖所示:
以預設使用的netty框架為例,程式碼如下:
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
具體的伺服器端封裝在NettyServer中,客戶端封裝在NettyClient;url引數是包含了xml配置的資訊(包括:對外的介面,使用的協議,使用的序列化方式,使用的通訊框架等),listener是一個Handler,在解碼之後將資料交給它做後續的業務處理;對應以上的幾種通訊開源框架,分別提供了對應的Transporter包括:NettyTransporter,NettyTransporter(netty4),MinaTransporter以及GrizzlyTransporter,具體使用哪種型別的Transporter,在Transporters類中提供了getTransporter方法:
public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
這裡並沒有像在獲取具體serialization類一樣,通過在url指定transporter引數,然後載入具體的transporter類,而是生成了一個動態的transporter,由此動態transporter去載入具體的類;
因為Server端和Client可以分別設定成不同的通訊框架,一次獲取唯一的Transporter不能滿足此需求;具體的生成動態程式碼的方法在ExtensionLoader的createAdaptiveExtensionClassCode方法中,此處不在列出原始碼,在此展示一下預設生成的動態程式碼擴充套件類:
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Transporter$Adaptive implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Server bind( com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { if (arg0 == null) { throw new IllegalArgumentException("url == null"); } com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if (extName == null) { throw new IllegalStateException( "Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); } com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class) .getExtension(extName); return extension.bind(arg0, arg1); } public com.alibaba.dubbo.remoting.Client connect( com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { if (arg0 == null) { throw new IllegalArgumentException("url == null"); } com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if (extName == null) { throw new IllegalStateException( "Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); } com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class) .getExtension(extName); return extension.connect(arg0, arg1); } }
可以發現Server端可以通過transporter和server兩個引數來設定擴充套件類,而且server引數設定的值是可以覆蓋transporter引數的值,同理Client也類似;最後不管是bind()還是connet()都是通過ExtensionLoader的getExtension方法來獲取具體的transporter類;同serialize層,相關的transporter也同樣定義在META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Transporter檔案中:
netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter mina=com.alibaba.dubbo.remoting.transport.mina.MinaTransporter grizzly=com.alibaba.dubbo.remoting.transport.grizzly.GrizzlyTransporter
Server端和Client分析
1.Server端
在例項化具體的Server類時,會首先呼叫父類的構造器,進行引數初始化,同時呼叫bind()方法,啟動伺服器;父類AbstractServer構造器如下:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
主要從url獲取啟動引數包括:ip,port,accepts(可接受的連線數,0表示不受限制數量,預設為0),idleTimeout等;然後呼叫doOpen方法通過具體的通訊框架繫結埠啟動服務;已預設使用的Netty為例,檢視doOpen()方法如下:
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
以上是常規的啟動netty程式,需要指定編解碼器,nettyHandler;編解碼已經在上文中介紹過了,此處不在詳細介紹,重點介紹nettyHandler;server端在資料經過解碼之後就交給NettyHandler來處理,NettyHandler繼承於Netty的SimpleChannelHandler類,重寫了channelConnected,channelDisconnected,messageReceived,writeRequested以及exceptionCaught方法,基本上就是常規的幾種操作:建立連線,斷開連線,接收訊息,傳送訊息,異常處理;看一下部分原始碼:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); } handler.connected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress())); handler.disconnected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { super.writeRequested(ctx, e); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.sent(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.caught(channel, e.getCause()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
將netty原生的channel包裝成dubbo的NettyChannel,同時將NettyChannel儲存在NettyChannel的內部靜態變數channelMap中;這裡的方法都統一呼叫了getOrAddChannel方法,先新增進去,最後在finally中判定channel是否已經關閉,如果關閉從channelMap中移除;中間部分呼叫了handler對應的方法,此處的handler就是在例項化時傳入的NettyServer,NettyServer本身也是一個ChannelHandler,可以看一下channelHandler介面類:
public interface ChannelHandler { void connected(Channel channel) throws RemotingException; void disconnected(Channel channel) throws RemotingException; void sent(Channel channel, Object message) throws RemotingException; void received(Channel channel, Object message) throws RemotingException; void caught(Channel channel, Throwable exception) throws RemotingException; }
具體的server類中也可以做一些處理,比如connected時判段是否超過accepts,如果超過拒絕連線;處理完之後交給例項化Server時傳入的ChannelHandler處理,此類具體是在HeaderExchanger中被初始化的:
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
可以發現這裡具體的ChannelHandler是DecodeHandler,注這裡的Decode和Netty本身的decode不一樣,Netty本身的decode在執行NettyHandler之前就執行解碼了;後續的操作在Exchange層進行處理,本文暫時先不做介紹;
2.Client端
同樣檢視父類AbstractClient,構造方法如下:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); try { doOpen(); } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } try { // connect. connect(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { if (url.getParameter(Constants.CHECK_KEY, true)) { close(); throw t; } else { logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); } } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); }
客戶端需要提供重連機制,所以初始化的幾個引數都和重連有關,send_reconnect表示在傳送訊息時發現連線已經斷開是否發起重連,reconnect_warning_period表示多久報一次重連警告,shutdown_timeout表示連線伺服器一直連線不上的超時時間;接下來就是呼叫doOpen()方法,同樣已Netty為例:
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); }
Netty客戶端的常規程式碼,設定了和Server端相同的NettyHandler,decoder和encoder;下面重點看看connect方法:
protected void connect() throws RemotingException { connectLock.lock(); try { if (isConnected()) { return; } initConnectStatusCheckCommand(); doConnect(); if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getTimeout() + "ms."); } else { if (logger.isInfoEnabled()) { logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", channel is " + this.getChannel()); } } reconnect_count.set(0); reconnect_error_log_flag.set(false); } catch (RemotingException e) { throw e; } catch (Throwable e) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); } finally { connectLock.unlock(); } }
首先判定是否已經連線,如果連線直接return;接下來初始化連線狀態檢查器,定期檢查channel是否連線,連線斷開會進行重連操作,具體程式碼如下:
private synchronized void initConnectStatusCheckCommand() { //reconnect=false to close reconnect int reconnect = getReconnectParam(getUrl()); if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) { Runnable connectStatusCheckCommand = new Runnable() { @Override public void run() { try { if (!isConnected()) { connect(); } else { lastConnectedTime = System.currentTimeMillis(); } } catch (Throwable t) { String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl(); // wait registry sync provider list if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) { if (!reconnect_error_log_flag.get()) { reconnect_error_log_flag.set(true); logger.error(errorMsg, t); return; } } if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) { logger.warn(errorMsg, t); } } } }; reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); } }
建立了一個Runnable,用來檢測是否連線,如果連線斷開,呼叫connect方法;定時排程交給ScheduledThreadPoolExecutor來執行;初始化之後就呼叫具體Client的doConnect操作,也是通訊框架的一些常規程式碼,此處不列出了;後續關於NettyChannel的介紹和Server端類似,不過多進行介紹;
總結
本文重點分析了dubbo架構中的transport層,具體圍繞Transporter, Client, Server,ChannelHandler幾個類展開,關於後續的處理將在exchange資訊交換層;