1. 程式人生 > >Netty 原始碼閱讀 —— 服務端建立

Netty 原始碼閱讀 —— 服務端建立

之前專案中用過netty,這次趁著面試空閒時間,重新梳理一遍netty原始碼,從服務端建立開始,話不多說,直接上程式碼

先看看netty服務端建立的整體程式碼,大概如下所示:

public void bind(int port) throws Exception {
    EventLoopGroup workGroup = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,24) .childHandler(new ChildChannelHandler()); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully();
} } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { System.out.println("server initChannel.."); socketChannel.pipeline().addLast(new ServerHandler()); } }

ok , 我們先來看看new NioEventLoopGroup() 的過程中發生了什麼:

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (ThreadFactory)null);}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, new Object[]{selectorProvider});
}
可以看到首先通過層層呼叫後 最終呼叫了這一個方法,其實最後是呼叫了父類的構造方法,傳入了三個引數,第一個是執行緒數,第二個 threadFactory 為空,第三個是一個SelectorProvider,顧名思義其實就是一個Selector提供類,我們可以看一下這個類的原始碼,果然不出所料,如果看到這裡你還不熟悉的話,建議先去看看java NIO 的知識。
super(nThreads, threadFactory, new Object[]{selectorProvider})
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}


public abstract AbstractSelector openSelector()
        throws IOException;


public abstract ServerSocketChannel openServerSocketChannel()
        throws IOException;


    
public abstract SocketChannel openSocketChannel()
        throws IOException;

ok,剛剛說到通過super 關鍵字呼叫父類的構造方法,NioEventLoopGroup 父類是 MultithreadEventLoopGroup ,那我們來看看父類構造方法的實現

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args);
}

發現MultithreadEventLoopGroup這個類又呼叫了它父類的構造方法,ok,層層往上找 ,找到MultithreadEventLoopGroup 的父類 MultithreadEventExecutorGroup,構造方法如下:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
}

    children = new SingleThreadEventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
} else {
        chooser = new GenericEventExecutorChooser();
}

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
            // TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
}

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
}
                }
            }
        }
    }

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
}
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
}
}
這裡是不是有點熟悉,回顧一下netty 的原理,這段程式碼就是netty reactor 模式的實現。chooser 用來隨機選擇一個child 執行緒執行。children 即為工作執行緒,型別為SingleThreadEventExecutor,我們來看看這個類的原始碼

                                                               

通過這個類的方法我們可以看到,SingleThreadEventExecutor這個類就是具體的task 執行類了。是不是豁然開朗

接著我們繼續看一下客戶端的連線過程

還記得 MultithreadEventExecutorGroup 類構造的時候有一行 這個程式碼嗎? 

children[i] = newChild(threadFactory, args);

我們看一下它發生了什麼

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
}
    provider = selectorProvider;
selector = openSelector();
}

可以看到 這行程式碼建立了一個 一個NioEventLoop , 這個物件裡面 打開了一個 selector, 客戶端的連線動作基本都是由這個類管理。我們看一下run() 方法:

protected void run() {
    for (;;) {
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try {
            if (hasTasks()) {
                selectNow();
} else {
                select(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
                // before calling 'selector.wakeup()' to reduce the wake-up
                // overhead. (Selector.wakeup() is an expensive operation.)
                //
                // However, there is a race condition in this approach.
                // The race condition is triggered when 'wakenUp' is set to
                // true too early.
                //
                // 'wakenUp' is set to true too early if:
                // 1) Selector is waken up between 'wakenUp.set(false)' and
                //    'selector.select(...)'. (BAD)
                // 2) Selector is waken up between 'selector.select(...)' and
                //    'if (wakenUp.get()) { ... }'. (OK)
                //
                // In the first case, 'wakenUp' is set to true and the
                // following 'selector.select(...)' will wake up immediately.
                // Until 'wakenUp' is set to false again in the next round,
                // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                // any attempt to wake up the Selector will fail, too, causing
                // the following 'selector.select(...)' call to block
                // unnecessarily.
                //
                // To fix this problem, we wake up the selector again if wakenUp
                // is true immediately after selector.select(...).
                // It is inefficient in that it wakes up the selector for both
                // the first case (BAD - wake-up required) and the second case
                // (OK - no wake-up required).
if (wakenUp.get()) {
                    selector.wakeup();
}
            }

            cancelledKeys = 0;
needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
runAllTasks();
} else {
                final long ioStartTime = System.nanoTime();
processSelectedKeys();
                final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
}
            }
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
try {
                Thread.sleep(1000);
} catch (InterruptedException e) {
                // Ignore.
}
        }
    }
}

這段程式碼邏輯大概是,先判斷一下當前有沒有任務需要執行,如果有任務,則執行任務下發邏輯,我們可以看一下processSelectedKeys() 這個方法

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
} else {
        processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
}
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
} else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
for (;;) {
                if (selectedKeys[i] == null) {
                    break;
}
                selectedKeys[i] = null;
i++;
}

            selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
            // and reset the index to -1 which will then set to 0 on the for loop
            // to start over again.
            //
            // See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
    }
}

我們看一下 processSelectedKey 這個方法做了什麼操作,如下

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
        return;
}

    try {
        int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
return;
}
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
}
}

我們可以看到,當註冊物件是 SelectionKey.OP_READ 這個時,執行unsafe.read() 操作,我們看看這個操作幹了什麼事情,我們發現這是一個抽象方法,看一下NioMessageUnsafe 這個子類的實現

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    if (!config.isAutoRead() && !isReadPending()) {
        // ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
        return;
}

    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    final ChannelPipeline pipeline = pipeline();
    boolean closed = false;
Throwable exception = null;
    try {
        try {
            for (;;) {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
}
                if (localRead < 0) {
                    closed = true;
                    break;
}

                // stop reading and remove op
if (!config.isAutoRead()) {
                    break;
}

                if (readBuf.size() >= maxMessagesPerRead) {
                    break;
}
            }
        } catch (Throwable t) {
            exception = t;
}
        setReadPending(false);
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            pipeline.fireChannelRead(readBuf.get(i));
}

        readBuf.clear();
pipeline.fireChannelReadComplete();
        if (exception != null) {
            if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                // ServerChannel should not be closed even on IOException because it can often continue
                // accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}

            pipeline.fireExceptionCaught(exception);
}

        if (closed) {
            if (isOpen()) {
                close(voidPromise());
}
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
            removeReadOp();
}
    }
}

我們看一下 這行程式碼 int localRead = doReadMessages(readBuf);

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
}
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
} catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
}
    }

    return 0;
}

其實就是 接收客戶端連線的邏輯。

final ChannelPipeline pipeline = pipeline(); 這一行程式碼看一下,我們發現,

@Override
public ChannelPipeline pipeline() {
    return pipeline;
}
private final ChannelPipeline pipeline;
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
    static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();
    static {
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}

    private MessageSizeEstimator.Handle estimatorHandle;
    private final Channel parent;
    private final long hashCode = ThreadLocalRandom.current().nextLong();
    private final Unsafe unsafe;
    private final ChannelPipeline pipeline;
    private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
    private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);
    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
    private volatile EventLoop eventLoop;
    private volatile boolean registered;
/** Cache for the string representation of this channel */
private boolean strValActive;
    private String strVal;

我們發現了 pipeline 這個物件是final ,而且 一個Channel 封裝了 很多全域性物件,這些物件都是全域性不可更改的。看到這個物件,是不是對Netty 的設計有了更多的一些感悟呢

拿到了 pipeline 之後,執行了pipeline.fireChannelRead(readBuf.get(i)); 這一行程式碼,我們深入看一下

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
}

    final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
} else {
        executor.execute(new OneTimeTask() {
            @Override
public void run() {
                next.invokeChannelRead(msg);
}
        });
}
    return this;
}

然後我們發現,next.invokeChannelRead(msg); 這裡回調了  channelRead 方法 , 所以你明白為什麼 netty 建立連線之後會回撥channelRead 方法了吧

private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
        notifyHandlerException(t);
}
}

如果你跟我一樣,一直對如何回撥這個邏輯比較好奇,那你可能會有額外發現,我們跟蹤一下這一行程式碼

final AbstractChannelHandlerContext next = findContextInbound();

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
} while (!ctx.inbound);
    return ctx;
}

我們突然發現了ChannelHandlerContext 的設計,是一個雙向連結串列。也就是說netty是通過一個雙向連結串列來實現通訊過程中上下文管理的。這裡你是不是又想到了linkedlist 呢

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {

    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    private final boolean inbound;
    private final boolean outbound;
    private final AbstractChannel channel;
    private final DefaultChannelPipeline pipeline;
    private final String name;
    private boolean removed;
// Will be set to null if no child executor should be used, otherwise it will be set to the
    // child executor.
final EventExecutor executor;
    private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
    // These needs to be volatile as otherwise an other Thread may see an half initialized instance.
    // See the JMM for more details
private volatile Runnable invokeChannelReadCompleteTask;
    private volatile Runnable invokeReadTask;
    private volatile Runnable invokeChannelWritableStateChangedTask;
    private volatile Runnable invokeFlushTask;

ok,回到我們的channelRead 方法,這裡執行的是ServerBootstrapAdapter 的 channelRead 方法,我們可以看到這裡完成了 新增 childHandler ,設定客戶端引數,以及註冊到多路複用器的邏輯。到這裡,整個連線過程清晰無疑。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
}
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
}
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
}
            }
        });
} catch (Throwable t) {
        forceClose(child, t);
}
}