1. 程式人生 > >Netty-在-Dubbo-中如何應用

Netty-在-Dubbo-中如何應用

前言

眾所周知,國內知名框架 Dubbo 底層使用的是 Netty 作為網路通訊,那麼內部到底是如何使用的呢?今天我們就來一探究竟。

1. dubbo 的 Consumer 消費者如何使用 Netty

注意:此次程式碼使用了從 github 上 clone 的 dubbo 原始碼中的 dubbo-demo 例子。

程式碼如下:

System.setProperty("java.net.preferIPv4Stack", "true");

ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});

context.start();

// @1

DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

int a = 0;

while (true) {

try {

Thread.sleep(1000);

System.err.println( ++ a + " ");

String hello = demoService.sayHello("world"); // call remote method

System.out.println(hello); // get result

} catch (Throwable throwable) {

throwable.printStackTrace();

}

}

當代碼執行到 @1 的時候,會呼叫 Spring 容器的 getBean 方法,而 dubbo 擴充套件了 FactoryBean,所以,會呼叫 getObject 方法,該方法會建立代理物件。

這個過程中會呼叫 DubboProtocol 例項的 getClients(URL url) 方法,當這個給定的 URL 的 client 沒有初始化則建立,然後放入快取,程式碼如下:

這個 initClient 方法就是建立 Netty 的 client 的。

最終呼叫的就是抽象父類 AbstractClient 的構造方法,構造方法中包含了建立 Socket 客戶端,連線客戶端等行為。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {

doOpen();

connect();

}

doOpent 方法用來建立 Netty 的 bootstrap :

protected void doOpen() throws Throwable {

NettyHelper.setNettyLoggerFactory();

bootstrap = new ClientBootstrap(channelFactory);

bootstrap.setOption("keepAlive", true);

bootstrap.setOption("tcpNoDelay", true);

bootstrap.setOption("connectTimeoutMillis", getTimeout());

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

public ChannelPipeline getPipeline() {

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);

ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("decoder", adapter.getDecoder());

pipeline.addLast("encoder", adapter.getEncoder());

pipeline.addLast("handler", nettyHandler);

return pipeline;

}

});

}

connect 方法用來連線提供者:

protected void doConnect() throws Throwable {

long start = System.currentTimeMillis();

ChannelFuture future = bootstrap.connect(getConnectAddress());

boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

if (ret && future.isSuccess()) {

Channel newChannel = future.getChannel();

newChannel.setInterestOps(Channel.OP_READ_WRITE);

}

}

上面的程式碼中,呼叫了 bootstrap 的 connect 方法,熟悉的 Netty 連線操作。當然這裡使用的是  jboss 的 netty3,稍微有點區別。當連線成功後,註冊寫事件,準備開始向提供者傳遞資料。

當 main 方法中呼叫 demoService.sayHello(“world”) 的時候,最終會呼叫 HeaderExchangeChannel 的 request 方法,通過 channel 進行請求。

public ResponseFuture request(Object request, int timeout) throws RemotingException {

Request req = new Request();

req.setVersion("2.0.0");

req.setTwoWay(true);

req.setData(request);

DefaultFuture future = new DefaultFuture(channel, req, timeout);

channel.send(req);

return future;

}

send 方法中最後呼叫 jboss  Netty 中繼承了  NioSocketChannel 的 NioClientSocketChannel 的 write 方法。完成了一次資料的傳輸。

2. dubbo 的 Provider 提供者如何使用 Netty

Provider demo 程式碼:

System.setProperty("java.net.preferIPv4Stack", "true");

ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});

context.start();

System.in.read(); // press any key to exit

Provider 作為被訪問方,肯定是一個 Server 模式的 Socket。如何啟動的呢?

當 Spring 容器啟動的時候,會呼叫一些擴充套件類的初始化方法,比如繼承了 InitializingBean,ApplicationContextAware,ApplicationListener 。

而 dubbo 建立了 ServiceBean 繼承了一個監聽器。Spring 會呼叫他的 onApplicationEvent 方法,該類有一個 export 方法,用於開啟 ServerSocket 。

然後執行了 DubboProtocol 的 createServer 方法,然後建立了一個 NettyServer 物件。NettyServer 物件的 構造方法同樣是  doOpen 方法和。程式碼如下:

protected void doOpen() throws Throwable {

NettyHelper.setNettyLoggerFactory();

ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));

ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));

ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));

bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

channels = nettyHandler.getChannels();

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

public ChannelPipeline getPipeline() {

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);

ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("decoder", adapter.getDecoder());

pipeline.addLast("encoder", adapter.getEncoder());

pipeline.addLast("handler", nettyHandler);

return pipeline;

}

});

channel = bootstrap.bind(getBindAddress());

}

該方法中,看到了熟悉的 boss 執行緒,worker 執行緒,和 ServerBootstrap,在添加了編解碼 handler  之後,新增一個 NettyHandler,最後呼叫 bind 方法,完成繫結埠的工作。和我們使用 Netty 是一摸一樣。

3. 總結

可以看到,dubbo 使用 Netty 還是挺簡單的,消費者使用 NettyClient,提供者使用 NettyServer,Provider  啟動的時候,會開啟埠監聽,使用我們平時啟動 Netty 一樣的方式。

而 Client 在 Spring getBean 的時候,會建立 Client,當呼叫遠端方法的時候,將資料通過 dubbo 協議編碼傳送到 NettyServer,然後 NettServer 收到資料後解碼,並呼叫本地方法,並返回資料,完成一次完美的 RPC 呼叫。

如果想學習Java工程化、高效能及分散式、深入淺出。效能調優、Spring,MyBatis,Netty原始碼分析的朋友可以加我的Java高階架構進階群:180705916,群裡有阿里大牛直播講解技術,以及Java大型網際網路技術的視訊免費分享給大家