1. 程式人生 > >看 Netty 在 Dubbo 中如何應用

看 Netty 在 Dubbo 中如何應用

ebe cpn RF dad result prot thread mil pro

技術分享圖片

目錄:

  1. dubbo 的 Consumer 消費者如何使用 Netty
  2. dubbo 的 Provider 提供者如何使用 Netty
  3. 總結

前言

眾所周知,國內知名框架 Dubbo 底層使用的是 Netty 作為網絡通信,那麽內部到底是如何使用的呢?今天我們就來一探究竟。

1. dubbo 的 Consumer 消費者如何使用 Netty

註意:此次代碼使用了從 github 上 clone 的 dubbo 源碼中的 dubbo-demo 例子。

代碼如下:

    System.setProperty("java.net.preferIPv4Stack", "true"
); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); context.start(); // @1 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy int a = 0
; while (true) { try { Thread.sleep(1000); System.err.println( ++ a + " "); String hello = demoService.sayHello("world"); // call remote method System.out.println(hello); // get result } catch (Throwable throwable) { throwable.printStackTrace
(); } }

當代碼執行到 @1 的時候,會調用 Spring 容器的 getBean 方法,而 dubbo 擴展了 FactoryBean,所以,會調用 getObject 方法,該方法會創建代理對象。

這個過程中會調用 DubboProtocol 實例的 getClients(URL url) 方法,當這個給定的 URL 的 client 沒有初始化則創建,然後放入緩存,代碼如下:

技術分享圖片

這個 initClient 方法就是創建 Netty 的 client 的。

技術分享圖片

最終調用的就是抽象父類 AbstractClient 的構造方法,構造方法中包含了創建 Socket 客戶端,連接客戶端等行為。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    doOpen();
    connect();
}

doOpent 方法用來創建 Netty 的 bootstrap :

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}

connect 方法用來連接提供者:

protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
    if (ret && future.isSuccess()) {
        Channel newChannel = future.getChannel();
        newChannel.setInterestOps(Channel.OP_READ_WRITE);
    } 
}

上面的代碼中,調用了 bootstrap 的 connect 方法,熟悉的 Netty 連接操作。當然這裏使用的是 jboss 的 netty3,稍微有點區別。當連接成功後,註冊寫事件,準備開始向提供者傳遞數據。

當 main 方法中調用 demoService.sayHello("world") 的時候,最終會調用 HeaderExchangeChannel 的 request 方法,通過 channel 進行請求。

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    channel.send(req);
    return future;
}

send 方法中最後調用 jboss Netty 中繼承了 NioSocketChannel 的 NioClientSocketChannel 的 write 方法。完成了一次數據的傳輸。

2. dubbo 的 Provider 提供者如何使用 Netty

Provider demo 代碼:

System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
System.in.read(); // press any key to exit

Provider 作為被訪問方,肯定是一個 Server 模式的 Socket。如何啟動的呢?

當 Spring 容器啟動的時候,會調用一些擴展類的初始化方法,比如繼承了 InitializingBean,ApplicationContextAware,ApplicationListener 。而 dubbo 創建了 ServiceBean 繼承了一個監聽器。Spring 會調用他的 onApplicationEvent 方法,該類有一個 export 方法,用於打開 ServerSocket 。

然後執行了 DubboProtocol 的 createServer 方法,然後創建了一個 NettyServer 對象。NettyServer 對象的
構造方法同樣是 doOpen 方法和。代碼如下:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    channel = bootstrap.bind(getBindAddress());
}

該方法中,看到了熟悉的 boss 線程,worker 線程,和 ServerBootstrap,在添加了編解碼 handler 之後,添加一個 NettyHandler,最後調用 bind 方法,完成綁定端口的工作。和我們使用 Netty 是一摸一樣。

3. 總結

可以看到,dubbo 使用 Netty 還是挺簡單的,消費者使用 NettyClient,提供者使用 NettyServer,Provider 啟動的時候,會開啟端口監聽,使用我們平時啟動 Netty 一樣的方式。而 Client 在 Spring getBean 的時候,會創建 Client,當調用遠程方法的時候,將數據通過 dubbo 協議編碼發送到 NettyServer,然後 NettServer 收到數據後解碼,並調用本地方法,並返回數據,完成一次完美的 RPC 調用。

好,關於 dubbo 如何使用 Netty 就簡短的介紹到這裏。

good luck!!!!

看 Netty 在 Dubbo 中如何應用