Netty新連線接入與NioSocketChannel分析
原文連結: ofollow,noindex">wangwei.one/posts/netty…
前面的一些章節,我們分析了Netty的三大元件 ——Channel 、 EventLoop 、Pipeline ,對Netty的工作原理有了深入的瞭解。在此基礎上,我們來分析一下當Netty服務端啟動後,Netty是如何處理新連線接入的。
本文內容主要分為以下四部分:
- 新連線檢測
- NioSocketChannel建立
- NioSocketChannel初始化與註冊
- NioSocketChannel註冊READ興趣集
新連線檢測
前面,我們在講 EventLoop的啟動過程原始碼分析 時,解讀過下面這段程式碼:
public final class NioEventLoop extends SingleThreadEventLoop { ... private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... try { ... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 讀取read事件 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } ... } ... } 複製程式碼
我們還是以服務端 NioServerSocketChannel 為例,它繫結的unsafe例項為 NioMessageUnsafe 。上面的 unsafe.read()
介面,會向下呼叫到 NioMessageUnsafe.read() 介面,如下:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { // 用於儲存新建立的 NioSocketChannel 的集合 private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { // 確保在當前執行緒與EventLoop中的一致 assert eventLoop().inEventLoop(); // 獲取 NioServerSocketChannel config配置 final ChannelConfig config = config(); // 獲取 NioServerSocketChannel 繫結的 pipeline final ChannelPipeline pipeline = pipeline(); // 獲取RecvByteBuf 分配器 Handle // 當channel在接收資料時,allocHandle 會用於分配ByteBuf來儲存資料 // 關於allocHandle後面再去做詳細介紹 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); // 重置已累積的所有計數器,併為下一個讀取迴圈讀取多少訊息/位元組資料提供建議 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 呼叫後面的 doReadMessages 介面,讀取到message則返回1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // 對當前read迴圈所讀取到的message數量計數+1 allocHandle.incMessagesRead(localRead); // 判斷是否繼續讀取message } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 呼叫pipeline傳播ChannelRead事件 pipeline.fireChannelRead(readBuf.get(i)); } // 清空readBuf readBuf.clear(); allocHandle.readComplete(); // 呼叫pipeline傳播 ChannelReadComplete 事件 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } } ... } 複製程式碼
對於 doReadMessages(...)
的分析:
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { ... // 讀取訊息 @Override protected int doReadMessages(List<Object> buf) throws Exception { // 獲取 SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { // 使用SocketChannel建立NioSocketChannel,將其存入buf list中 // 關於NioSocketChannel的建立請看後面的分析 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } ... } 複製程式碼
對於 continueReading()
介面的分析,至於結果為什麼返回false,後面會單獨分析:
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator { private volatile int maxMessagesPerRead; private volatile boolean respectMaybeMoreData = true; ... public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; // 每次讀取最大的訊息數 private int maxMessagePerRead; private int totalMessages; private int totalBytesRead; private int attemptedBytesRead; private int lastBytesRead; private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData; private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { return attemptedBytesRead == lastBytesRead; } }; ... // 判斷是否繼續讀取message @Override public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } // 判斷是否繼續讀取message @Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { // 預設情況下 config.isAutoRead() 為true // respectMaybeMoreData 預設為 true // maybeMoreDataSupplier.get() 為false // totalMessages第一次迴圈則為1 // maxMessagePerRead為16 // 結果返回false return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; } ... } ... } 複製程式碼
NioSocketChannel建立
上面分析新連線接入,提到了 NioSocketChannel 的建立,我們這裡來詳細分析一下,NioSocketChannel的建立過程與此前我們分析 NioServerSocketChannel建立 大體類似。
構造器
先來看看 NioSocketChannel 的建構函式:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { ... public NioSocketChannel(Channel parent, SocketChannel socket) { // 呼叫父類構造器 super(parent, socket); // 建立NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()); } ... } 複製程式碼
父類 AbstractNioByteChannel 構造器:
public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { // 呼叫父類構造器,並設定興趣集為SelectionKey.OP_READ,對read事件感興趣 super(parent, ch, SelectionKey.OP_READ); } ... } 複製程式碼
父類 AbstractNioChannel 構造器:
public abstract class AbstractNioChannel extends AbstractChannel { ... protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 呼叫父類構造器 super(parent); // 設定channel this.ch = ch; // 設定興趣集 this.readInterestOp = readInterestOp; try { // 設定為非阻塞 ch.configureBlocking(false); } catch (IOException e) { ... } } } 複製程式碼
父類 AbstractChannel 構造器:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... protected AbstractChannel(Channel parent) { // 設定parent this.parent = parent; // 建立channelId id = newId(); // 建立unsafe unsafe = newUnsafe(); // 建立pipeline pipeline = newChannelPipeline(); } ... } 複製程式碼
ChannelConfig建立
接著我們看看 NioSocketChannelConfig 的建立邏輯:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { ... private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { // 呼叫父類構造器 super(channel, javaSocket); calculateMaxBytesPerGatheringWrite(); } ... } 複製程式碼
父類 DefaultSocketChannelConfig 構造器:
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { ... public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { // 呼叫父類構造器,繫結socketchannel super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } // 繫結java socket this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. // netty一般執行在伺服器上,不在Android上,canEnableTcpNoDelayByDefault返回true if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { // 開啟 TCP_NODELAY ,開啟TCP的nagle演算法 // 儘量不要等待,只要傳送緩衝區中有資料,並且傳送視窗是開啟的,就儘量把資料傳送到網路上去。 setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } } ... } 複製程式碼
NioSocketChannel初始化與註冊
上面小節分析了NioSocketChannel的建立邏輯,建立完成之後,我們來分析一下NioSocketChannel是如何註冊到NioEventLoop上去的。
在前面小節分析新連線檢測的有如下小段程式碼:
private final class NioMessageUnsafe extends AbstractNioUnsafe { ... int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 呼叫pipeline傳播ChannelRead事件 pipeline.fireChannelRead(readBuf.get(i)); } ... } 複製程式碼
呼叫pipeline傳播ChannelRead事件,這裡的Pipeline是服務端Channel,也就是NioServerSocketChannel所繫結的Pipeline,此時的Pipeline的內部結構是怎麼樣子的呢?

那這個 ServerBootstrapAcceptor 是從哪裡來的呢?
在此前,我們分析 NioServerSocketChannel初始化 時,有過下面這段程式碼:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... // NioServerSocketChannel初始化 void init(Channel channel) throws Exception { // 獲取啟動器 啟動時配置的option引數,主要是TCP的一些屬性 final Map<ChannelOption<?>, Object> options = options0(); // 將獲得到 options 配置到 ChannelConfig 中去 synchronized (options) { setChannelOptions(channel, options, logger); } // 獲取 ServerBootstrap 啟動時配置的 attr 引數 final Map<AttributeKey<?>, Object> attrs = attrs0(); // 配置 Channel attr,主要是設定使用者自定義的一些引數 synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 獲取channel中的 pipeline,這個pipeline使我們前面在channel建立過程中設定的 pipeline ChannelPipeline p = channel.pipeline(); // 將啟動器中配置的 childGroup 儲存到區域性變數 currentChildGroup final EventLoopGroup currentChildGroup = childGroup; // 將啟動器中配置的 childHandler 儲存到區域性變數 currentChildHandler final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; // 儲存使用者設定的 childOptions 到區域性變數 currentChildOptions synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } // 儲存使用者設定的 childAttrs 到區域性變數 currentChildAttrs synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 獲取啟動器上配置的handler ChannelHandler handler = config.handler(); if (handler != null) { // 新增 handler 到 pipeline 中 pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // 用child相關的引數創建出一個新連線接入器ServerBootstrapAcceptor // 通過 ServerBootstrapAcceptor 可以將一個新連線繫結到一個執行緒上去 // 每次有新的連線進來 ServerBootstrapAcceptor 都會用child相關的屬性對它們進行配置,並註冊到ChaildGroup上去 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ... } 複製程式碼
ServerBootstrapAcceptor
NioServerSocketChannel初始化時,向NioServerSocketChannel所繫結的Pipeline添加了一個InboundHandler節點 —— ServerBootstrapAcceptor ,其程式碼如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { // 子EventLoopGroup,即為workGroup private final EventLoopGroup childGroup; // ServerBootstrap啟動時配置的 childHandler private final ChannelHandler childHandler; // ServerBootstrap啟動時配置的 childOptions private final Entry<ChannelOption<?>, Object>[] childOptions; // ServerBootstrap啟動時配置的 childAttrs private final Entry<AttributeKey<?>, Object>[] childAttrs; private final Runnable enableAutoReadTask; // 建構函式 ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; // Task which is scheduled to re-enable auto-read. // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may // not be able to load the class because of the file limit it already reached. // // See https://github.com/netty/netty/issues/1328 enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } }; } // 處理Pipeline所傳播的channelRead事件 // 也就是前面新連線檢測時看到的那段程式碼 // pipeline.fireChannelRead(readBuf.get(i)); // ServerBootstrapAcceptor的channelRead介面將會被呼叫,用於處理channelRead事件 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { // 獲取傳播事件的物件資料,即為前面的readBuf.get(i) // readBuf.get(i)取出的物件為 NioSocketChannel final Channel child = (Channel) msg; // 向 NioSocketChannel 新增childHandler,也就是我們常看到的 // ServerBootstrap在啟動時配置的程式碼: // ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} ) // 最終的結果就是向NioSocketChannel的Pipeline新增使用者自定義的ChannelHandler // 用於處理客戶端的channel連線 child.pipeline().addLast(childHandler); // 配置 NioSocketChannel的TCP屬性 setChannelOptions(child, childOptions, logger); // 配置 NioSocketChannel 一些使用者自定義資料 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } // 將NioSocketChannel註冊到childGroup,也就是Netty的WorkerGroup當中去 try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ... } ... } 複製程式碼
關於 ChannelInitializer 的講解,可以看此前Pipeline原始碼分析 文章。
後面的register邏輯,就與我們前面講解 NioServerSocketChannel註冊 大體類似了,這裡簡單介紹一下。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... // 註冊NioSocketChannel // eventLoop為childGroup @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... // 繫結eventLoop到NioSocketChannel上 AbstractChannel.this.eventLoop = eventLoop; // 現在分析的邏輯是在服務端的執行緒上,eventLoop與主執行緒不同,返回false if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { // 這裡來呼叫register0方法 register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // 註冊 private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; // 呼叫 doRegister() doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 服務端的NioServerSocketChannel已經與客戶端的NioSocketChannel建立了連線 // 所以,NioSocketChannel是處於啟用狀態,isActive()返回ture if (isActive()) { // 對於新連線,是第一次註冊 if (firstRegistration) { // 傳播ChannelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } ... } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } ... } 複製程式碼
呼叫到NioSocketChannel中的doRegister()方法:
public abstract class AbstractNioChannel extends AbstractChannel { ... @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 將selector註冊到底層JDK channel上,並附加了NioSocketChannel物件 // 興趣集設定為0,表示不關心任何事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ... } } } ... } 複製程式碼
NioSocketChannel 註冊OP_READ興趣集
緊接著上面的分析,傳播ChannelActive事件之後的邏輯,主要就是向客戶端的NioSocketChannel註冊一個Read興趣集
if (isActive()) { // 對於新連線,是第一次註冊 if (firstRegistration) { // 傳播ChannelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } 複製程式碼
通過Pipeline的傳播機制 ,最終會呼叫到doBeginRead()介面,如下:
public abstract class AbstractNioChannel extends AbstractChannel { ... protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { ... @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called // 儲存selectionKey到區域性變數 final SelectionKey selectionKey = this.selectionKey; // 判斷有效性 if (!selectionKey.isValid()) { return; } readPending = true; // 獲取selectionKey的興趣集 // 前面小結分析doRegister()介面提到,selectionKey的興趣集設定為0 final int interestOps = selectionKey.interestOps(); // 這裡的 readInterestOp 是前面講NioSocketChannel建立時設定的值 // 為 SelectionKey.OP_READ,也就是1 if ((interestOps & readInterestOp) == 0) { // 這樣,selectionKey最終設定的興趣集為SelectionKey.OP_READ // 表示對讀事件感興趣 selectionKey.interestOps(interestOps | readInterestOp); } } ... } ... } 複製程式碼