1. 程式人生 > >Netty框架學習之路(六)—— 引導

Netty框架學習之路(六)—— 引導

前言

之前的博文介紹了Channel、EventLoop及由其衍生出來的相關概念。但這些都是靜態的概念,為了能使Netty程式執行起來,必須要有一個全域性的元件將上述內容串聯起來,這便就是本文所講內容——引導。

引導類

引導類的層次結構如下:
這裡寫圖片描述
主要包括一個抽象父類(AbstractBootstrap)和兩個具體的子類(Bootstrap、ServerBootstrap)。從類名可以看出,Bootstrap主要用於客戶端的引導,只需要一個單獨的、沒有父Channel的Channel來參與網路互動;ServerBootstrap用於服務端、父Channel接收客戶端連線,子Channel處理與客戶端的通訊。

本文以ServerBootstrap為例,介紹Netty如何發揮引導作用的。

ServerBootstrap

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler
(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } }); ChannelFuture f = b.bind
(port).sync();

這是之前Netty示例程式中的一段程式碼,實現的功能就是建立一個ServerBootstrap服務端引導類,初始化並繫結至指定埠,等待客戶端連線請求。

Netty服務端初始化過程在ServerBoostrap的bind(port)方法中,而bind 方法會觸發如下的呼叫鏈:

AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister

從方法名稱可以看出,initAndRegister方法包含了最核心的處理邏輯,具體程式碼如下:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //1,使用反射方式,呼叫預設構造方法,建立一個channel
            channel = channelFactory.newChannel();
            //2,初始化channel
            init(channel);
        } 
        ……
        //3,註冊channel
        ChannelFuture regFuture = config().group().register(channel);
        ……
     }

initAndRegister方法通過反射方式建立一個Channel,而Channel的型別是ServerBootstrap的channel()方法所設定的,此處型別為NioServerSocketChannel,之前的博文中提過,通過呼叫 SelectorProvider.openSocketChannel() 來開啟一個新的 Java NIO SocketChannel,同時將這個Channel與selector相關聯並關注OP_ACCEPT事件,同時又建立一個ChannelPipeline與之相關聯。

之後呼叫init方法對Channe進行初始化操作,具體程式碼如下:

        //此處省去很多跟主邏輯關係不大的程式碼
        ChannelPipeline p = channel.pipeline();

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

針對Channel初始化過程中建立的ChannelPipeline新增ChannelHandler,此處新增的ChannelInitializer是一個ChannelHandler的介面卡類,通過此匿名類設定回撥方法等待回撥。之前我們講過Netty採用了事件機制,當Channel成功註冊到一個eventLoop的Selector中, 並且將當前Channel作為attachment時,便產生了一個Inbound事件觸發了ChannelHandlerContext.fireChannelRegistered()方法,那Channel何時註冊到一個eventLoop的Selector呢?我們繼續往下看。

channel初始化完成後,便是註冊處理,之前關於Netty執行緒模型的博文中提到,channel會和一個EventLoop繫結:

    ChannelFuture regFuture = config().group().register(channel);

     public ChannelFuture register(Channel channel) {
        return next().register(channel);
     }

     public ChannelFuture register(Channel channel) {
         return register(new DefaultChannelPromise(channel, this));
     }
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

group()方法返回的是主EventLoopGroup,上述程式碼的過程就是將之前建立的NioServerSocketChannel註冊到主EventLoopGroup中的某一個EventLoop上。next()方法返回一個EventLoop,沿著呼叫鏈,最後進入到AbstractUnsafe的register0方法,程式碼如下:

        private void register0(ChannelPromise promise) {
            ……
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            ……
        }


        protected void doRegister() throws Exception {
            // 省略錯誤處理
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

此處AbstractNioChannel.doRegister 方法將之前建立的NioServerSocketChannel註冊到一個eventLoop的Selector中, 並且將當前 Channel 作為 attachment。

同時我們看到了前文涉及到的fireChannelRegistered方法,以此觸發Channel自身的初始化新增ChannelHandler的過程,具體過程之前的博文已經介紹過,不再贅述。

再回到主流程,當完成對Channel的初始化和註冊工作之後,便開始繫結操作,

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

可以看到此處是非同步呼叫Channel的bind方法實現非同步繫結,之前的博文介紹過,Channel的bind方法會產生一個出站事件,那我們進入到bind方法內部看一下,

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

這部分程式碼看起來好像很熟悉,的確如此,之前博文中有關於Inbound事件觸發InboundChannelHandler初始化,這裡便是Outbound事件觸發OutboundChannelHandler初始化,過程基本相同,不再贅述。

到這裡,Netty服務端程式初始化完畢,等待客戶端連線。當一個客戶端連線到服務端時, Java 底層的 NIOServerSocketChannel 會有產生一個 SelectionKey.OP_ACCEPT 就緒事件, 接著就會呼叫到 NioServerSocketChannel.doReadMessages方法,

    protected int doReadMessages(List<Object> buf) throws Exception {
        ……

        SocketChannel ch = SocketUtils.accept(javaChannel());

        buf.add(new NioSocketChannel(this, ch));

        ……
    }

在 doReadMessages 中,先獲取客戶端新連線的 SocketChannel,接著例項化一個 NioSocketChannel,接下來就經由 Netty 的 ChannelPipeline 機制,將讀取事件逐級傳送到各個 handler 中,於是就會觸發前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。

總結

Netty的引導過程比較複雜,本文只是大概梳理了一下過程,希望能幫助讀者理解此過程。