1. 程式人生 > >死磕Netty原始碼之服務端啟動原始碼解析

死磕Netty原始碼之服務端啟動原始碼解析

前言

本部落格講述的是Netty是如何繫結埠、啟動服務。啟動服務的過程中你將會了解到Netty各大核心元件

服務端啟動DEMO

先從一個簡單的服務端啟動DEMO開始,以下是一個標準的Netyy服務端程式碼

public final class NettyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new
NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override
public void initChannel(SocketChannel channel) { ChannelPipeline channelPipeline = channel.pipeline(); channelPipeline.addLast("decoder", new StringDecoder()); channelPipeline.addLast("encoder"
, new StringEncoder()); channelPipeline.addLast("handler", new ServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 注:ServerBootstrap.childHandler()用於指定處理新連線資料的讀寫處理邏輯,同時ServerBootstrap還提供handler()用於指定在服務端啟動過程中的一些邏輯,通常情況下我們用不著這個方法

ServerHandler程式碼如下:

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("channelActive");
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("channelRegistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println("handlerAdded");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("service receive msg:" + msg);
    }
}

當有新連線接入時,控制檯打印出

handlerAdded
channelRegistered
channelActive

但接收到新訊息時,控制檯打印出

service receive msg:xxx
channelReadComplete

本文主要分析服務端的啟動過程,而新連線接入 新訊息的讀取會在後續章節中說明

服務端啟動原始碼分析

ServerBootstrap是Netty為方便開發者使用而設計的一個啟動類,ServerBootstrap的核心程式碼入口在bind(),程式碼如下

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

通過埠號建立一個InetSocketAddress,然後繼續bind

public ChannelFuture bind(SocketAddress localAddress) {
    // 校驗引數
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    // Channel繫結邏輯
    return doBind(localAddress);
}

validate()驗證服務啟動需要的必要引數,然後呼叫doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}

在doBind()中我們關注兩個核心方法,initAndRegister()以及doBind0()

initAndRegister

final ChannelFuture initAndRegister() {
    Channel channel = null;
    // 新建Channel
    channel = channelFactory.newChannel();
    // 初始化Channel
    init(channel);
    // 將這個Channel Register到某個物件
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

新建Channel

Channel是Netty的核心概念之一,它是Netty網路通訊的主體由它負責同對端進行網路通訊、註冊和資料操作等功能。Channel的建立是由channelFactory.newChannel()完成的,ChannelFactory介面定義如下

public interface ChannelFactory<T extends Channel> {
    T newChannel();
}

接下來我們跟蹤此處的channelFactory是在何時被初始化,我們層層回溯最終發現是在這個函式中

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

在Demo程式呼叫.channel(NioServerSocketChannel.class)方法,所以channelFactory.newChannel()真正執行程式碼如下

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}

即在Netty服務端啟動的時候通過反射的方式來建立一個NioServerSocketChannel物件,最終建立Channel相當於呼叫預設建構函式new出一個 NioServerSocketChannel物件,接下來我們繼續跟進NioServerSocketChannel的預設建構函式

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        // 利用SelectorProvider產生一個ServerSocketChannel物件
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

通過newSocket(DEFAULT_SELECTOR_PROVIDER)建立一條server端channel,然後進入到以下方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

該方法主要完成兩個功能,首先是呼叫父類的構造方法然後初始化NioServerSocketChannelConfig屬性。我們繼續跟入super()

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    // 設定SelectionKey.OP_ACCEPT事件
    this.readInterestOp = readInterestOp;
    // 設定ServerSocketChannel為非阻塞的
    ch.configureBlocking(false);
}

這裡將前面通過provider.openServerSocketChannel()創建出來的ServerSocketChannel儲存到成員變數,然後呼叫將該channel為非阻塞模式,這是個標準的JDK NIO程式設計的玩法。這裡的readInterestOp即前面層層傳入的SelectionKey.OP_ACCEPT,接下來繼續跟進super(parent);(這裡的parent其實是null,由前面寫死傳入)

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

在AbstractChannel的構造方法中主要是初始化了id,unsafe,pipeline屬性

初始化Channel

在建立完Channel後,我們在init方法中對Channel進行初始化操作,程式碼如下

void init(Channel channel) throws Exception {
    // 設定option
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    // 設定attr
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    // 設定handler到pipeline上
    ChannelPipeline p = channel.pipeline();
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        // 設定新接入channel的options
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        // 設定新接入channel的attrs
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 這裡的handler()返回的就是.handler()所設定的值
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // p.addLast()向serverChannel的流水線處理器中加入了一個ServerBootstrapAcceptor
                    // 從名字上就可以看出來這是一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器
                    pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

以上程式碼主要完成如下功能:先呼叫options0()以及attrs0()獲取到伺服器啟動時設定的一些引數,然後將得到的options和attrs注入到channelConfig或者channel中;然後在當前Channel的pipeline中添加了一個ChannelInitializer,在ChannelInitializer中往pipeline中添加了一個handler,並通過NioEventLoop.execute()方法往pipeline中添加了一個ServerBootstrapAcceptor(請求接入器),此處的NioEventLoop.execute()方法為Netty Reactor執行緒執行的入口,關於Netty Reactor執行緒我們將在下一篇部落格中介紹。我們總結一下發現程式碼執行到這裡Netty並未真正啟動服務,只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器用來專門接受新連線

完成Channel註冊

完成Channel註冊的程式碼如下

ChannelFuture regFuture = config().group().register(channel);

它呼叫到MultithreadEventLoopGroup中的register方法

@Override
public ChannelFuture register(Channel channel) {
    // 呼叫了NioEvenLoop物件中的register方法
    // EventLoopGroup extends SingleThreadEventLoop
    return next().register(channel);
}   

在next方法中返回一個EventLoop物件,每一個EventLoop都與一個selector繫結,在之前的程式碼中EventLoop中的Selector一直沒有任何Channel註冊,所以每次select操作都是空,但從這行程式碼開始這個selector中開始有Channel註冊

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

這裡可以看到register操作是委託給Channel中的Unsafe物件來執行的,這裡的Unsafe物件對上文稍有印象的同學應該能知道這個就是建立NioServerSocketChannel的時候建立的Unsafe物件,繼續跟進Unsafe物件的register方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // ...
    AbstractChannel.this.eventLoop = eventLoop;
    // ...
    register0(promise);
}

先將EventLoop事件迴圈器繫結到該NioServerSocketChannel上,然後呼叫register0()程式碼如下

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

這一段其實也很清晰,先呼叫doRegister()進行註冊

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            // ...
        }
    }
}

在這裡我們終於看到JDK底層的Channel註冊到Selector的過程,但是這裡的OPS為0即不關心任何事件,而我們期望OPS的值為SelectionKey.OP_ACCEPT,所以到了這裡程式碼還沒有結束。在執行完Channel註冊後接著執行了幾個pipeline相關的方法,我們後面詳細剖析pipeline的時候再講,然後我們回到doBind0方法中

doBind0

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在dobind0()方法中通過EventLoop執行一個任務,接下來我們進入到channel.bind()方法

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

關於Pipeline相關的內容將在後續部落格中介紹,當前一個比較好的方式就是Debug單步進入。最後我們來到了如下區域

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    unsafe.bind(localAddress, promise);
}

這裡的unsafe就是前面提到的AbstractUnsafe, 準確點應該是NioMessageUnsafe

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // ...
    boolean wasActive = isActive();
    // ...
    doBind(localAddress);

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

在doBind方法中完成繫結操作,程式碼如下

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

最終呼叫到了JDK裡面的bind方法真正進行了埠的繫結。按照正常流程我們前面已經分析到isActive()方法返回false,進入到 doBind()之後如果channel被激活了,就發起pipeline.fireChannelActive()呼叫。接著我們跟進pipeline.channelActive方法

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
    readIfIsAutoRead();
}

pipeline.channelActive會逐一呼叫pipeline中每一個節點的channelActive方法,並且在HeadContext中呼叫了readIfIsAutoRead

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

最終這個方法會呼叫到AbstractNioChannel的doBeginRead方法

protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

在最後一行中的readInterestOp即在上文中提到的SelectionKey.OP_ACCEPT,至此完成了Channel對ACCEPT事件的註冊過程

總結

到目前為止我們看到的程式碼相當於傳統NIO程式設計中的如下程式碼

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); => 建立NioServerSocketChannel
serverSocketChannel.configureBlocking(false); => AbstractNioChannel中ch.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888)); => NioServerSocketChannel.doBind()
Selector selector = Selector.open(); => NioEventLoop.openSelector()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) => AbstractNioChannel.doBeginRead()

服務端啟動完成的主要功能為建立一個Channel,並且將Channel註冊到NioEventLoop的Selector上