1. 程式人生 > >netty4原始碼閱讀與分析----服務端啟動過程

netty4原始碼閱讀與分析----服務端啟動過程

本文是基於4.1.24-final版本,首先我們編寫個測試例子,然後根據例子一步步debug過程中閱讀原始碼。
EventLoopGroup bossGroup=new NioEventLoopGroup(1);
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try {
            ServerBootstrap b=new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        };
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f=b.bind(port).sync();
            f.channel().closeFuture().sync();
        }finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

首先看下NioEventLoopGroup這個類,其關係圖如下:

其建構函式最終會呼叫到MultithreadEventExecutorGroup:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
               ......
            } finally {
                ...省略
            }
        }
        chooser = chooserFactory.newChooser(children);
        ...省略
    }

預設的執行緒數為cpu processor size*2,主要看下newChild方法:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

主要是構造一個NioEventLoop例項,我們看下這個類:


一個NioEventLoop可以看成是一單執行緒,執行緒不停的從佇列中獲取任務執行,我們來看下其run方法:

protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ........
        }
    }
這是一個for迴圈,首先看下calculateStrategy:
 public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
如果佇列中沒有任務,則返回SELECT策略,這裡我們主要關注這個,接下來執行select方法:
private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                if (Thread.interrupted()) {
                    selectCnt = 1;
                    break;
                }
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    rebuildSelector();
                    selector = this.selector;
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
           ........
        }
首先selectDeadLineNanos=當前時間+1s,接著計算這個時間是否到期了,如果到了,直接selectNow,檢視感興趣的事件是否到來,這是一個非阻塞方法。如果未到期,接著看,
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
這裡是為了解決一個場景,如果此時wakeup被置為true,執行緒的task是沒有機會喚醒selector的,所以這裡需要check下這種情況,接著往下看,執行緒會在此阻塞timeoutMillis,等待感興趣的事件到來,如果有事件到來或者有任務到佇列中等等條件時,直接跳出迴圈返回。接下來的程式碼主要是為了解決nio epoll bug的問題,我們在另外一篇文章中詳細說這個問題。select方法返回後,接著往下看,ioRatio這裡我們採用預設的值50,即處理感興趣的事件和執行佇列中任務所花cpu時間各佔一半。回到我們的main執行緒中繼續看,EventLoopGroup用於管理這些EventLoop。所以上面前兩行程式碼我們可以理解為有一個執行緒在執行boss的工作,有processor_size*2個執行緒在執行worker的工作,可以看作是1個boss和N個worker在協作完成任務。接下來的部分,我們主要想知道boss和worker分別都在幹啥,為什麼要分boss和woker呢?

看下ServerBootstrap,我們這裡採用的預設建構函式,所以接著看程式碼下一行,我們看下這個group方法是幹啥的:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

父類持有bossGroup,子類ServerBootstrap的childGroup就是我們的workGroup,這裡採用了builder模式,在可選引數比較多的時候,builder模式能夠大大派上用場。

接著channel方法將NioServerSocketChannel類放入到ChannelFactory中,用於後續通過反射構造NioServerSocketChannel例項。

接下來就是設定ServerBootstrap 的childHandler,這裡我們標記為ChannelHandler-ChannelInitializer-1

接著ChannelOption.SO_BACKLOG用於構造服務端套接字ServerSocket物件,標識當伺服器請求處理執行緒全滿時,用於臨時存放已完成三次握手的請求的佇列的最大長度。

ChannelOption.SO_KEEPALIVE表示是否啟用心跳保活機制。在雙方TCP套接字建立連線後(即都進入ESTABLISHED狀態)並且在兩個小時左右上層沒有任何資料傳輸的情況下,這套機制才會被啟用。
 接下來我們看下bind方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//1
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

首先看下initAndRegister,省略部分程式碼:

final ChannelFuture initAndRegister() {
        Channel channel = null;
	channel = channelFactory.newChannel();
	init(channel);
	....
        ChannelFuture regFuture = config().group().register(channel);
        ....
}

首先第一步通過channelFactory建立NioServerSocketChannel例項,我們首先來看下這個類的關係圖:

我們來看下它的建構函式:

public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
其中newSocket就是呼叫jdk來建立一個ServerSocketChannel例項,接著看:
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);//感興趣的事件是accept
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
super最終會呼叫到AbstractChannel:
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

此時的parent=null,我們首先看下newUnsafe,主要是構造NioMessageUnsafe例項,它主要提供了read方法,用於讀事件,後面我們會看到。

接著看下pipeline,它是DefaultChannelPipeline類的例項,它維護了雙向一個連結串列,連結串列中元素型別為AbstractChannelHandlerContext,用於處理channelInbound和channeloutbound:

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }
}
final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
}

TailContext是一個inbound處理器,HeadContext是一個outbound處理器。所以此時pipeline中所維護的channelHandlerContext連結串列如下:


回到NioServerSocketChannel的建構函式,接下來就是構造NioServerSocketChannelConfig例項,這裡我們先看下這個類的關係圖,

接著往下看,回到initAndRegister方法,接下來看init方法:

void init(Channel channel) throws Exception {
        .........
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;//即workerGroup
        final ChannelHandler currentChildHandler = childHandler;//即ChannelHandler-ChannelInitializer-1
        .........
        p.addLast(new ChannelInitializer<Channel>() {//這裡我們標記為ChannelHandler-ChannelInitializer-2
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
這裡向pipeline新添加了一個DefaultChannelHandlerContext,新增後,此時pipeline中維護的連結串列如下:

上圖括號中是我加入的標記,用以區分過程中產生的匿名類例項.接下來回到initAndRegister方法中,看這行程式碼:

ChannelFuture regFuture = config().group().register(channel);
config().group()這個返回的是bossGroup,channel是之前建立的NioServerSocketChannel例項,下面看下register方法,注意這裡有個next方法,是通過chooser.next()來獲取EventExecutor(其實就是NioEventLoop例項)。還記得上面的陣列children = new EventExecutor[nThreads]吧,這裡的next方法是通過index&(children.leng-1)獲取一個NioEventLoop例項,這裡boss執行緒只有一個。接著往下看,最終會呼叫到Unsafe.register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           .......
            AbstractChannel.this.eventLoop = eventLoop;//boss對應的NioEventLoop

            if (eventLoop.inEventLoop()) {//初始時執行緒還沒有啟動
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ......
                }
            }
        }
可以看到,其做法是main執行緒將register包裝成一個任務,然後丟給boss對應的執行緒去處理,然後main執行緒返回,這是個非同步的過程。接下來我們看下register0都幹了些啥:
private void register0(ChannelPromise promise) {
            try {
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                pipeline.invokeHandlerAddedIfNeeded();
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                .....
            }
        }

doRegister就是呼叫 ServerSocket進行註冊(jdk),就是把當前的NioServerSocketChannel例項與selector建立一定的關聯關係。接著往下看,pipeline.invokeHandlerAddedIfNeeded最終會往boss對應的執行緒中新增任務,該任務就是往pipeline中新增新的context,這個步驟也是非同步的,最終會進入到下面的呼叫:

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

initChannel這個函式會呼叫到標記"ChannelHandler-ChannelInitializer-2"的initChannel方法,在這個任務中,向boss執行緒添加了一個任務,這個任務是向pipeline中新增一個context ServerBootstrapAcceptor,它也是一個inboundHandler,然後把"ChannelHandler-ChannelInitializer-2",從pipeline中刪除,ServerBootstrapAcceptor還在任務佇列中還沒有新增到pipeline中,此時pipeline中維護的連結串列如下:

接著往下看,pipeline.fireChannelRegistered(),最終會呼叫到invokeChannelRegistered(findContextInbound()),此時pipeline中只有一個inboundHandler,那就是TailContext,會呼叫到它的channelRegistered方法,這裡什麼都沒做。接下來就死bind了主要在doBind0方法中,最終會呼叫unsafe.bind方法,

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

            boolean wasActive = isActive();
            try {
                doBind(localAddress);
            } catch (Throwable t) {
               ......
            }
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
		.....
        }
dobind就是呼叫jdk提供的bind方法,接下來就是把通知active的事件作為一個任務提交給執行緒執行。然後這個任務會呼叫到HeadContext.channelActive方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

注意,此時pipeline中維護的連結串列如下:

ctx.fireChannelActive會找出當前的第一個inboundHandler(從左到右),然後執行其channelActive,實際來看,這裡目前啥都沒做。繼續看下一行readIfIsAutoRead(),會呼叫到AbstractNioUnsafe.doBeginRead:

protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
        readPending = true;
        final int interestOps = selectionKey.interestOps();//此時的值為0
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);//readInterestOp就是OP_ACCEPT
        }
    }

其實就是註冊感興趣的事件accept等待連線到來,即開啟的通道NioServerSocket感興趣的事件是accept!

至此,netty服務端的流程算是啟動完成了,下面我們來總結一下:

1,netty啟動時主要是boss執行緒在執行一系列的操作,其初始化與註冊等都是封裝成一個個任務扔到執行緒佇列中執行。執行緒主要處理兩件事,一是處理感興趣的事件,二是執行佇列中的任務

2,netty的nio主要是將NioServerSocketChannel與selector繫結,然後向通道上註冊感興趣的事件,然後在boss執行緒中不斷輪尋是否有感興趣的事件到來。