1. 程式人生 > >高效能網路通訊框架Netty-Netty客戶端底層與Java NIO對應關係

高效能網路通訊框架Netty-Netty客戶端底層與Java NIO對應關係

5.1 Netty客戶端底層與Java NIO對應關係

在講解Netty客戶端程式時候我們提到指定NioSocketChannel用於建立客戶端NIO套接字通道的例項,下面我們來看NioSocketChannel是如何建立一個Java NIO裡面的SocketChannel的。

首先我們來看NioSocketChannel的建構函式:

    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

其中DEFAULT_SELECTOR_PROVIDER定義如下:

    private static
final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

然後繼續看

//這裡的provider為DEFAULT_SELECTOR_PROVIDER
    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }

其中newSocket程式碼如下:

    private static SocketChannel newSocket(SelectorProvider provider)
{ try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } }

所以NioSocketChannel內部是管理一個客戶端的SocketChannel的,這個SocketChannel就是講Java NIO時候的SocketChannel,也就是建立NioSocketChannel例項物件時候相當於執行了Java NIO中:

SocketChannel socketChannel = SocketChannel.open();

另外在NioSocketChannel的父類AbstractNioChannel的建構函式裡面預設會記錄隊op_read事件感興趣,這個後面當連結完成後會使用到:


    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

另外在NioSocketChannel的父類AbstractNioChannel的建構函式裡面設定了該套接字為非阻塞的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
         ...
        }
    }

下面我們看Netty裡面是哪裡建立的NioSocketChannel例項,哪裡註冊到選擇器的。
下面我們看下Bootstrap的connect操作程式碼:

    public ChannelFuture connect(InetAddress inetHost, int inetPort) {
        return connect(new InetSocketAddress(inetHost, inetPort));
    }

類似Java NIO傳遞了一個InetSocketAddress物件用來記錄服務端ip和埠:

    public ChannelFuture connect(SocketAddress remoteAddress) {
       ...
        return doResolveAndConnect(remoteAddress, config.localAddress());
    }

下面我們看下doResolveAndConnect的程式碼:

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        //(1)
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
             //(2)
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } 
            ...
       }
}

首先我們來看程式碼(1)initAndRegister:

     final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //(1.1)
            channel = channelFactory.newChannel();
            //(1.2)
            init(channel);
        } catch (Throwable t) {
            ...
        }
        //(1.3)
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
}

其中(1.1)作用就是建立一個NioSocketChannel的例項,程式碼(1.2)是具體設定內部套接字的選項的。

程式碼(1.3)則是具體註冊客戶端套接字到選擇器的,其首先會呼叫NioEventLoop的register方法,最後呼叫NioSocketChannelUnsafe的register方法:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   ...
                }
            }
 }

其中 register0內部呼叫doRegister,其程式碼如下:

 protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                //註冊客戶端socket到當前eventloop的selector上
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
               ...
            }
        }
 }

到這裡程式碼(1)initAndRegister的流程講解完畢了,下面我們來看程式碼(2)的


  public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
          ...
            try {
                ...

                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                  。。。
                }
            } catch (Throwable t) {
               ...
            }
  }

其中doConnect程式碼如下:

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
       ...
        boolean success = false;
        try {
            //2.1
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            //2.2
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

其中2.1具體呼叫客戶端套接字的connect方法,等價於Java NIO裡面的。
程式碼2.2 由於connect 方法是非同步的,所以類似JavaNIO呼叫connect方法進行判斷,如果當前沒有完成連結則設定對op_connect感興趣。

最後一個點就是何處進行的從選擇器獲取就緒的事件的,具體是在該客戶端套接關聯的NioEventLoop裡面的做的,每個NioEventLoop裡面有一個執行緒用來迴圈從選擇器裡面獲取就緒的事件,然後進行處理:

 protected void run() {
        for (;;) {
            try {
                ...
               select(wakenUp.getAndSet(false));
                ...
                processSelectedKeys();
                ...
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ...
        }
    }

其中select程式碼如下:

 private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            ...
            for (;;) {
                ...
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

              ...
        } catch (CancelledKeyException e) {
            ...
        }
 }

可知會從選擇器選取就緒的事件,其中processSelectedKeys程式碼如下:

 private void processSelectedKeys() {
        ...
     processSelectedKeysPlain(selector.selectedKeys());
        ...
    }

可知會獲取已經就緒的事件集合,然後交給processSelectedKeysPlain處理,後者迴圈呼叫processSelectedKey具體處理每個事件,程式碼如下:

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
       ...
        try {
            //(3)如果是op_connect事件
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //3.1
                unsafe.finishConnect();
            }
            //4
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            //5
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

程式碼(3)如果當前事件key為op_connect則去掉op_connect,然後呼叫NioSocketChannel的doFinishConnect:

    protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

可知是呼叫了客戶端套接字的finishConnect方法,最後會呼叫NioSocketChannel的doBeginRead方法設定對op_read事件感興趣:

    protected void doBeginRead() throws Exception {
       ...
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

這裡interestOps為op_read,上面在講解NioSocketChannel的建構函式時候提到過。

程式碼(5)如果當前是op_accept事件說明是伺服器監聽套接字獲取到了一個連結套接字,如果是op_read,則說明可以讀取客戶端發來的資料了,如果是後者則會啟用管線裡面的所有handler的channelRead方法,這裡會啟用我們自定義的NettyClientHandler的channelRead讀取客戶端發來的資料,然後在向客戶端寫入資料。

5.2 總結

本節講解了Netty客戶端底層如何使用Java NIO進行實現的,可見與我們前面講解的Java NIO設計的客戶端程式碼步驟是一致的,只是netty對其進行了封裝,方便了我們使用,瞭解了這些對深入研究netty原始碼提供了一個骨架指導。

想了解JDK NIO和更多Netty基礎的可以單擊我
更多關於分散式系統中服務降級策略的知識可以單擊 單擊我
想系統學dubbo的單擊我
想學併發的童鞋可以 單擊我

image.png


加多

加多

高階 Java 攻城獅 at 阿里巴巴加多,目前就職於阿里巴巴,熱衷併發程式設計、ClassLoader,Spring等開源框架,分散式RPC框架dubbo,springcloud等;愛好音樂,運動。微信公眾號:技術原始積累。知識星球賬號:技術原始積累