1. 程式人生 > >死磕Netty原始碼之新連線接入原始碼解析

死磕Netty原始碼之新連線接入原始碼解析

前言

本部落格主要是介紹Netty在新連線接入後的相關處理

新連線建立

新連線建立可以分為以下三個步驟

1.檢測到有新的連線
2.將新的連線註冊到Worker執行緒組
3.註冊新連線的讀事件

在Reactor執行緒模型詳解部落格中我們已經知道當服務端讀取到IO事件(新連線接入事件)後,會呼叫processSelectedKey方法對事件進行處理,此處以新連線接入事件為例它最後會呼叫底層的unsafe進行read操作

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe
().recvBufAllocHandle(); do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } } while (allocHandle.continueReading()); int size = readBuf.size(); for
(int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); }

這裡有兩個主要的方法:

1.doReadMessages
2.pipeline.fireChannelRead

doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    try
{ if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { // ... } return 0; }

該方法主要作用是通過JDK底層的API獲取到SocketChannel,然後包裝成Netty自己的NioSocketChannel。NioSocketChannel與服務端啟動時建立的NioServerSocketChannel最主要的區別在於它們關注的事件不同,NioSocketChannel的構造方法如下

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

這裡我們看到一個SelectionKey.OP_READ,說明這個Channel關心讀事件而服務端的Channel關心ACCEPT事件。接下來呼叫父類AbstractNioChannel構造,後續過程與服務端啟動流程一致此處不再贅述

pipeline.fireChannelRead

接著來看pipeline.fireChannelRead(readBuf.get(i))方法,關於Pipeline我們將在下一篇部落格中詳細介紹。我們知道客戶端在啟動的過程中會往Pipeline中新增一個ServerBootstrapAcceptor(連線處理器的東西),所以到這裡服務端Channel對應的Pipeline的資料結構為:Hea⇋ServerBootstrapAcceptor⇋Tail,在呼叫pipeline.fireChannelRead時會依次觸發這三個節點上的channelRead方法,接下來我們重點關注ServerBootstrapAcceptor的channelRead方法,程式碼如下

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

首先獲取我們之前例項化的NioSocketChannel,然後將我們設定的chlidHandler新增到NioSocketChannel對應的Pipeline中(這裡的chlidHandler對應使用者通過.childHandler()設定的Handler),程式碼執行到這裡NioSocketChannel中Pipeline對應的資料結構為: head⇋ChannelInitializer⇋tail,接著設定NioSocketChannel對應的attr和option,然後進入到childGroup.register(child),這裡的childGroup就是WorkerGroup,接下來我們進入NioEventLoopGroup的register方法

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

這段程式碼和服務端啟動的時候像BossGroup註冊NioServerSocketChannel是類似的,通過next()方法獲取到NioEventLoop然後將Channel註冊到該NioEventLoop上(即將該Channel與NioEventLoop上的Selector進行繫結)。註冊的邏輯最終是交給unsafe物件完成的,我們繼續跟進unsafe的register方法,程式碼如下

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //...
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            //...
        }
    }
}

由於這裡是在Boss執行緒中執行的IO操作所以不會是跟Worker執行緒是同一個執行緒,所以eventLoop.inEventLoop()返回false,最後會通過eventLoop.execute的方式去執行註冊任務。在Reactor執行緒模型中我們講到在呼叫execute的時候,如果是首次新增任務那這個NioEventLoop執行緒會被啟動,所以從此Worker執行緒開始執行,接下來看下具體的註冊邏輯

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        //...
    }
}

和服務端啟動過程一樣,先是呼叫doRegister()執行真正的註冊過程

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            //...

        }
    }
}

該Channel繫結到NioEventLoop對應的Selector上去,後續該Channel的事件輪詢、事件處理、非同步Task執行都由此執行緒負責,繫結完Reactor執行緒之後呼叫pipeline.invokeHandlerAddedIfNeeded()程式碼如下

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        callHandlerAddedForAllHandlers();
    }
}

往下跟callHandlerAddedForAllHandlers方法

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;

        this.pendingHandlerCallbackHead = null;
    }

    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

這裡有個物件叫pendingHandlerCallbackHead,我們發現它是在callHandlerCallbackLater方法中被初始化的

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}

當我們在Channel註冊到之前新增或刪除Handler時,此時沒有EventExecutor可執行HandlerAdd或HandlerRemove事件,所以Netty為此事件生成一個相應任務等註冊完成後在呼叫執行任務。新增或刪除任務可能有很多個,DefaultChannelPipeline使用一個連結串列儲存,連結串列頭部為先前的欄位pendingHandlerCallbackHead

接下來我們繼續分析task.execute方法, 它主要是完成NioSocketChannel對應的Pipeline的初始化

void execute() {
    // ...
    callHandlerAdded0(ctx);
    // ...
}

通過上面對pendingHandlerCallbackHead的分析,這裡肯定會呼叫ChannelInitializer的handlerAdded方法

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

ChannelInitializer的initChannel主要完成兩個功能以下兩個功能

首先呼叫initChannel((C) ctx.channel())進入使用者自定義的程式碼完成Pipeline的初始化

.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {

     }
 })

然後在finally中呼叫remove方法將ChannelInitializer刪除

private void remove(ChannelHandlerContext ctx) {
    try {
        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
    } finally {
        initMap.remove(ctx);
    }
}

執行該方法前NioSocketChannel對應的Pipeline的資料結構為:head⇋ChannelInitializer⇋tail,執行該方法後ChannelInitializer被刪除,NioSocketChannel對應的Pipeline的資料結構為:head⇋自定義的HandlerContext⇋tail

到目前為止我們完成了新連線的註冊、pipeline的繫結,但是新連線註冊的時候的感興趣事件還是0還無法進行讀寫操作,新連線對讀事件的繫結是在pipeline.fireChannelActive方法中完成的,它最後會呼叫到AbstractNioChannel的doBeginRead

protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

前面register0()方法的時候向selector註冊的事件程式碼是0,而readInterestOp對應的事件程式碼是SelectionKey.OP_READ,所以本段程式碼的用處是將SelectionKey.OP_READ事件註冊到Selector中去,fireChannelActive的執行邏輯在服務端啟動過程中有詳細描述,至此已完成客戶端新連線接入的操作。下一篇部落格將介紹Pipeline相關的原始碼解析