1. 程式人生 > >Netty之旅三:Netty服務端啟動原始碼分析,一梭子帶走!

Netty之旅三:Netty服務端啟動原始碼分析,一梭子帶走!

# Netty服務端啟動流程原始碼分析 ![](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d5be8808ccb54c318b5abd21f97bd37f~tplv-k3u1fbpfcp-zoom-1.image) ## 前記 哈嘍,自從上篇《Netty之旅二:口口相傳的高效能Netty到底是什麼?》後,遲遲兩週才開啟今天的`Netty`原始碼系列。原始碼分析的第一篇文章,下一篇我會分享客戶端的啟動過程原始碼分析。通過原始碼的閱讀,我們將會知道,`Netty` 服務端啟動的呼叫鏈是非常長的,同時肯定也會發現一些新的問題,隨著我們原始碼閱讀的不斷深入,相信這些問題我們也會一一攻破。 廢話不多說,直接上號! ![](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c5964018204f4525ab0bc0ab7b0be09d~tplv-k3u1fbpfcp-zoom-1.image) ## 一、從EchoServer示例入手 ![netty-example:EchoServer.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8547f81868154df588e991155fa90e69~tplv-k3u1fbpfcp-zoom-1.image) 示例從哪裡來?任何開源框架都會有自己的示例程式碼,Netty原始碼也不例外,如模組`netty-example`中就包括了最常見的`EchoServer`示例,下面通過這個示例進入服務端啟動流程篇章。 ```java public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // 1. 宣告Main-Sub Reactor模式執行緒池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 建立 EchoServerHandler 物件 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 宣告服務端啟動引導器,並設定相關屬性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // 3. 繫結埠即啟動服務端,並同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync(); // 4. 監聽服務端關閉,並阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 優雅地關閉兩個EventLoopGroup執行緒池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ``` 1. [程式碼行18、19]宣告`Main-Sub Reactor`模式執行緒池:`EventLoopGroup` 建立兩個` EventLoopGroup` 物件。其中,`bossGroup`用於服務端接受客戶端的連線,`workerGroup`用於進行客戶端的 `SocketChannel `的資料讀寫。 (關於`EventLoopGroup`不是本文重點所以在後續文章中進行分析
) 2. [程式碼行23-39]宣告服務端啟動引導器,並設定相關屬性 `AbstractBootstrap`是一個幫助類,通過方法鏈(`method chaining`)的方式,提供了一個簡單易用的方式來配置啟動一個`Channel`。`io.netty.bootstrap.ServerBootstrap` ,實現` AbstractBootstrap` 抽象類,用於` Server` 的啟動器實現類。`io.netty.bootstrap.Bootstrap` ,實現 `AbstractBootstrap` 抽象類,用於 `Client` 的啟動器實現類。如下類圖所示: ![AbstractBootstrap類繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d877706356d4427a9afd4b13d7177142~tplv-k3u1fbpfcp-zoom-1.image) (在`EchoServer`示例程式碼中,我們看到 `ServerBootstrap `的 `group`、`channel`、`option`、`childHandler` 等屬性鏈式設定都放到關於`AbstractBootstrap`體系程式碼中詳細介紹。
) 3. [程式碼行43]繫結埠即啟動服務端,並同步等待 先呼叫 `#bind(int port)` 方法,繫結埠,後呼叫 `ChannelFuture#sync()` 方法,阻塞等待成功。對於`bind`操作就是本文要詳細介紹的"服務端啟動流程"。 4. [程式碼行47]監聽服務端關閉,並阻塞等待 先呼叫 `#closeFuture()` 方法,監聽伺服器關閉,後呼叫 `ChannelFuture#sync()` 方法,阻塞等待成功。 注意,此處不是關閉伺服器,而是`channel`的監聽關閉。 5. [程式碼行51、52]優雅地關閉兩個`EventLoopGroup`執行緒池 `finally`程式碼塊中執行說明服務端將最終關閉,所以呼叫 `EventLoopGroup#shutdownGracefully()` 方法,分別關閉兩個` EventLoopGroup `物件,終止所有執行緒。 ## 二、服務啟動過程 在服務啟動過程的原始碼分析之前,這裡回顧一下我們在通過`JDK NIO`程式設計在服務端啟動初始的程式碼: ```java serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); ``` 這5行程式碼標示一個最為熟悉的過程: - 開啟`serverSocketChannel` - 配置非阻塞模式 - 為`channel`的`socket`繫結監聽埠 - 建立`Selector` - 將`serverSocketChannel`註冊到 `selector` 後面等分析完`Netty`的啟動過程後,會對這些步驟有一個新的認識。在`EchoServer`示例中,進入 `#bind(int port)` 方法,`AbstractBootstrap#bind()`其實有多個方法,方便不同地址引數的傳遞,實際呼叫的方法是`AbstractBootstrap#doBind(final SocketAddress localAddress)` 方法,程式碼如下: ```java private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } ``` - [程式碼行2] :呼叫 `#initAndRegister()` 方法,初始化並註冊一個 `Channel` 物件。因為註冊是非同步的過程,所以返回一個 `ChannelFuture `物件。詳細解析,見 「`initAndRegister()`」。 - [程式碼行4-6]]:若發生異常,直接進行返回。 - [程式碼行9-34]:因為註冊是非同步的過程,有可能已完成,有可能未完成。所以實現程式碼分成了【第 10 至 14 行】和【第 15 至 36 行】分別處理已完成和未完成的情況。 - 核心在[第 11 、29行],呼叫 `#doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)` 方法,繫結 Channel 的埠,並註冊 Channel 到 `SelectionKey` 中。 - 如果非同步註冊對應的 `ChanelFuture` 未完成,則呼叫 `ChannelFuture#addListener(ChannelFutureListener)` 方法,新增監聽器,在註冊完成後,進行回撥執行 `#doBind0(...)` 方法的邏輯。 通過`doBind`方法可以知道服務端啟動流程大致如下幾個步驟: ![服務端啟動流程.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f1f7faa8b2224b279cb7b5c7ed3be51c~tplv-k3u1fbpfcp-zoom-1.image) ### 1. 建立Channel ![建立服務端channel.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d5cc116d92574c81bd11a6fc33d6af16~tplv-k3u1fbpfcp-zoom-1.image) 從`#doBind(final SocketAddress localAddress)`進入到`initAndRegister()`: ```java final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } ``` [程式碼行4]呼叫 `ChannelFactory#newChannel()` 方法,建立` Channel `物件。 `ChannelFactory`類繼承如下: ![ChannelFactroy類繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e62d592b38d444dc8c15c9f87334231e~tplv-k3u1fbpfcp-zoom-1.image) 可以在`ChannelFactory`註釋看到`@deprecated Use {@link io.netty.channel.ChannelFactory} instead.`,這裡只是包名的調整,對於繼承結構不變。`netty`預設使用`ReflectiveChannelFactory`,我們可以看到過載方法: ```java @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } ``` 很明顯,正如其名是通過反射機制構造`Channel`物件例項的。`constructor`是在其構造方法初始化的:`this.constructor = clazz.getConstructor();`這個`clazz`按理說應該是我們要建立的`Channel`的Class物件。那`Class`物件是什麼呢?我們接著看`channelFactory`是怎麼初始化的。 首先在`AbstractBootstrap`找到如下程式碼: ```java @Deprecated public B channelFactory(ChannelFactory channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); } ``` 呼叫這個方法的遞推向上看到: ```java public B channel(Class channelClass) { return channelFactory(new ReflectiveChannelFactory( ObjectUtil.checkNotNull(channelClass, "channelClass") )); } ``` 這個方法正是在`EchoServer`中`ServerBootstrap`鏈式設定時呼叫`.channel(NioServerSocketChannel.class)`的方法。我們看到,`channelClass`就是`NioServerSocketChannel.class`,`channelFactory`也是以`ReflectiveChannelFactory`作為具體例項,並且將`NioServerSocketChannel.class`作為構造引數傳遞初始化的,所以這回答了反射機制構造的是`io.netty.channel.socket.nio.NioServerSocketChannel`物件。 繼續看`NioServerSocketChannel`構造方法邏輯做了什麼事情,看之前先給出`NioServerSocketChannel`類繼承關係: ![Channel類繼承.jpg](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/cbb00ca166b2428caeb44a5892172550~tplv-k3u1fbpfcp-zoom-1.image) `NioServerSocketChannel`與`NioSocketChannel`分別對應服務端和客戶端,公共父類都是`AbstractNioChannel`和`AbstractChannel`,下面介紹建立過程可以參照這個`Channel`類繼承圖。進入`NioServerSocketChannel`構造方法: ```java /** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } ``` 點選`newSocket`進去: ```java private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See #2308. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } ``` 以上傳進來的`provider`是`DEFAULT_SELECTOR_PROVIDER`即預設的`java.nio.channels.spi.SelectorProvider`,[程式碼行9]就是熟悉的`jdk nio`建立`ServerSocketChannel`。這樣`newSocket(DEFAULT_SELECTOR_PROVIDER)`就返回了結果`ServerSocketChannel`,回到`NioServerSocketChannel()#this()`點進去: ```java /** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } ``` 以上`super`代表父類`AbstractNioMessageChannel`構造方法,點進去看到: ```java /** * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int) */ protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } ``` 以上`super`代表父類`AbstractNioChannel`構造方法,點進去看到: ```java protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } ``` 以上[程式碼行3]將`ServerSocketChannel`儲存到了`AbstractNioChannel#ch`成員變數,在上面提到的`NioServerSocketChannel`構造方法的[程式碼行6]`javaChannel()`拿到的就是`ch`儲存的`ServerSocketChannel`變數。 以上[程式碼行6]就是熟悉的`jdk nio`程式設計設定`ServerSocketChannel`非阻塞方式。這裡還有`super`父類構造方法,點選進去看到: ```java protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } ``` 以上構造方法中: - `parent` 屬性,代表父` Channel` 物件。對於` NioServerSocketChannel `的 `parent` 為`null`。 - `id` 屬性,`Channel `編號物件。在構造方法中,通過呼叫 `#newId()` 方法進行建立。(這裡不細展開Problem-1
) - `unsafe` 屬性,`Unsafe` 物件。因為`Channel` 真正的具體操作,是通過呼叫對應的 `Unsafe` 物件實施。所以需要在構造方法中,通過呼叫 `#newUnsafe()` 方法進行建立。這裡的 `Unsafe` 並不是我們常說的 `jdk`自帶的`sun.misc.Unsafe` ,而是 `io.netty.channel.Channel#Unsafe`。(這裡不細展開Problem-2) - `pipeline`屬性預設是`DefaultChannelPipeline`物件,賦值後在後面為channel繫結埠的時候會用到 通過以上建立`channel`原始碼過程分析,總結的流程時序圖如下: ![wwv6q1.jpg](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6aaa112e02e34082b4c4b73786abd493~tplv-k3u1fbpfcp-zoom-1.image) ### 2. 初始化Channel ![初始化channel.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5ca13ae3d05b4d79a0e3de03ba02f8c0~tplv-k3u1fbpfcp-zoom-1.image) 回到一開始建立`Channel`的`initAndRegister()`入口方法,在建立`Channel`後緊接著`init(channel)`進入初始化流程,因為是服務端初始化,所以是`ServerBootstrap#init(Channel channel)`,程式碼如下: ```java @Override void init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map, Object> attrs = attrs0(); synchronized (attrs) { for (Entry, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ``` - [程式碼 3 - 6 行]:` options0()`方法返回的`options`儲存了使用者在`EchoServer`中設定自定義的可選項集合,這樣`ServerBootstrap`將配置的選項集合,設定到了 `Channel` 的可選項集合中。 - [程式碼 8 - 15 行]:` attrs0()`方法返回的`attrs`儲存了使用者在`EchoServer`中設定自定義的屬性集合,這樣`ServerBootstrap`將配置的屬性集合,設定到了 `Channel` 的屬性集合中。 - [程式碼21-28行]:通過區域性變數`currentChildOptions`和`currentChildAttrs`儲存了使用者自定義的`childOptions`和`childAttrs`,用於[程式碼43行] `ServerBootstrapAcceptor` 構造方法。 - [程式碼30-47]]:建立`ChannelInitializer `物件,新增到 `pipeline` 中,用於後續初始化 `ChannelHandler `到` pipeline` 中,包括使用者在`EchoServer`配置的`LoggingHandler`和建立的建立 `ServerBootstrapAcceptor` 物件。 - [程式碼行34-37]:新增啟動器配置的 `LoggingHandler`到`pipeline` 中。 - [程式碼行39-45]:建立 `ServerBootstrapAcceptor` 物件,新增到 `pipeline` 中。從名字上就可以看出來,`ServerBootstrapAcceptor` 也是一個 `ChannelHandler `實現類,專門用於接受客戶端的新連線請求,把新的請求扔給某個事件迴圈器,我們先不做過多分析。我們發現是使用`EventLoop.execute` 執行新增的過程,這是為什麼呢?同樣記錄問題(Problem-3) - 需要說明的是`pipeline` 在之前介紹Netty核心元件的時候提到是一個包含`ChannelHandlerContext`的雙向連結串列,每一個`context`對於唯一一個`ChannelHandler`,這裡初始化後,`ChannelPipeline`裡就是如下一個結構: ![ChannelPipeline內部結構.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6e88ec1e3ca341099e5fdc0d2596cd1a~tplv-k3u1fbpfcp-zoom-1.image) ### 3. 註冊Channel ![註冊channel.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5490f6a09b5c4978a0deb8d2ca1dfd0b~tplv-k3u1fbpfcp-zoom-1.image) 初始化`Channel`一些基本配置和屬性完畢後,回到一開始建立`Channel`的`initAndRegister()`入口方法,在初始化`Channel`後緊接著[程式碼行17]` ChannelFuture regFuture = config().group().register(channel);`明顯這裡是通過`EventLoopGroup`進入註冊流程(`EventLoopGroup`體系將在後續文章講解) 在`EchoServer`中啟動器同樣通過`ServerBootstrap#group()`設定了`NioEventLoopGroup`,它繼承自`MultithreadEventLoopGroup`,所以註冊流程會進入`MultithreadEventLoopGroup`過載的`register(Channel channel)`方法,程式碼如下: ```java @Override public ChannelFuture register(Channel channel) { return next().register(channel); } ``` 這裡會呼叫 `next()` 方法選擇出來一個 `EventLoop` 來註冊 `Channel`,裡面實際上使用的是一個叫做 `EventExecutorChooser` 的東西來選擇,它實際上又有兩種實現方式 ——`PowerOfTwoEventExecutorChooser` 和 `GenericEventExecutorChooser`,本質上就是從 `EventExecutor` 陣列中選擇一個` EventExecutor`,我們這裡就是 `NioEventLoop`,那麼,它們有什麼區別呢?(Problem-4:在介紹`EventLoopGroup`體系的後續文章中將會詳細講解,這裡簡單地提一下,本質都是按陣列長度取餘數 ,不過,2 的 N 次方的形式更高效。) 接著,來到 `NioEventLoop` 的 `register(channel)` 方法,你會不會問找不到該方法?提示`NioEventLoop` 繼承`SingleThreadEventLoop`,所以父類方法: ```java @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } ``` 可以看到,先建立了一個叫做 `ChannelPromise` 的東西,它是` ChannelFuture` 的子類。[程式碼行9]又調回了 `Channel` 的 `Unsafe` 的 `register ()` 方法,這裡第一個引數是 `this`,也就是` NioEventLoop`,第二個引數是剛建立的 `ChannelPromise`。 點選 `AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)` 方法進去,程式碼如下: ```java public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } ``` [程式碼行15]這行程式碼是設定 `Channel` 的 `eventLoop` 屬性。這行前面的程式碼主要是在校驗傳入的 `eventLoop` 引數非空,校驗是否有註冊過以及校驗 `Channel` 和 `eventLoop` 型別是否匹配。 [程式碼18、24]接著,跟蹤到 `AbstractUnsafe#register0(ChannelPromise promise)` 方法中: ```java private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } ``` [程式碼行9]進入 `AbstractNioChannel#doRegister()` 方法: ```java protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } ``` [程式碼行5]關鍵一行程式碼,將 Java 原生` NIO Selector `與 Java 原生 `NIO` 的 `Channel `物件(`ServerSocketChannel`) 繫結在一起,並將當前 Netty 的` Channel `通過 `attachment `的形式繫結到 `SelectionKey `上: - 呼叫 `#unwrappedSelector()` 方法,返回 Java 原生 `NIO Selector `物件,而且每個` NioEventLoop `與` Selector `唯一一對應。 - 呼叫 `SelectableChannel#register(Selector sel, int ops, Object att)` 方法,註冊 Java 原生`NIO` 的 `Channel `物件到 ` NIO Selector `物件上。 通過以上註冊channel原始碼分析,總結流程的時序圖如下: ![](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/712003e995ad46bfbc0f2b7a87d5a2d5~tplv-k3u1fbpfcp-zoom-1.image) ### 4. 繫結埠 ![繫結埠.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5b4c8729a7844e64807123e56a03ab28~tplv-k3u1fbpfcp-zoom-1.image) 註冊完`Channel`最後回到`AbstractBootstrap#doBind()` 方法,分析` Channel` 的埠繫結邏輯。進入`doBind0`程式碼如下: ```java private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } ``` - [程式碼行7]:在前面`Channel` 註冊成功的條件下,呼叫` EventLoop`執行 `Channel` 的埠繫結邏輯。但是,實際上當前執行緒已經是 ` EventLoop`所在的執行緒了,為何還要這樣操作呢?答案在【第 5 至 6 行】的英語註釋,這裡作為一個問題記著(Problem-5)。 - [程式碼行11]:進入`AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)`,同樣立即非同步返回並新增`ChannelFutureListener.CLOSE_ON_FAILURE`監聽事件。 - [程式碼行13]:如果繫結埠之前的操作並沒有成功,自然也就不能進行埠繫結操作了,通過promise記錄異常原因。 `AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)`方法如下: ```java public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } ``` `pipeline`是之前建立`channel`的時候建立的`DefaultChannelPipeline`,進入該方法: ```java public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } ``` [在分析初始化流程的時候最後畫一個`DefaultChannelPipeline`內部的結構,能夠便於分析後面進入`DefaultChannelPipeline`一系列`bind`方法。] 首先,`tail`代表`TailContext`,進入`AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)`方法: ```java public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //省略部分程式碼 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } ``` [程式碼行3]:`findContextOutbound`方法裡主要是執行`ctx = ctx.prev;`那麼得到的`next`就是繫結`LoggingHandler`的`context` [程式碼行6]:進入`invokeBind(localAddress, promise)`方法並直接執行`LoggingHandler#bind(this, localAddress, promise)`,進入後的方法如下: ```java public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND", localAddress)); } ctx.bind(localAddress, promise); } ``` 設定了`LoggingHandler`的日誌基本級別為預設的INFO後,進行繫結操作的資訊列印。接著,繼續迴圈到`AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)`方法執行`ctx = ctx.prev`取出`HeadContext`進入到bind方法: ```java public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); } ``` 兜兜轉轉,最終跳出了`pipeline`輪迴到`AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)` 方法,Channel 的埠繫結邏輯。程式碼如下: ```java public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //此處有省略... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //此處有省略... } ``` 做實事方法`doBind`進入後如下: ```java @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } ``` 到了此處,服務端的 Java 原生` NIO ServerSocketChannel` 終於繫結上了埠。 ## 三、問題歸納 - Problem-1: 建立`Channel`流程中`AbstractChannel`建構函式中為`channel`分配ID的演算法如何實現? - Problem-2: `AbstractChannel`內部類`AbstractUnsafe`的作用? - Problem-3: 初始化`channel`流程中`pipeline` 新增`ServerBootstrapAcceptor` 是通過`EventLoop.execute` 執行新增的過程,這是為什麼呢? - Problem-4:註冊`channel`流程中`PowerOfTwoEventExecutorChooser` 和 `GenericEventExecutorChooser`的區別和優化原理? - Problem-5:繫結埠流程中呼叫` EventLoop`執行 `Channel` 的埠繫結邏輯。但是,實際上當前執行緒已經是 ` EventLoop`所在的執行緒了,為何還要這樣操作呢? ## 小結 通過對Netty服務端啟動流程原始碼分析,我們發現了在使用`NIO`的模式下,服務端啟動流程其實就是封裝了`JDK NIO`程式設計在服務端啟動的流程。只不過對原生`JDK NIO`進行了增強和優化,同時從架構設計上簡化了服務端流程的編寫。 最重要的是感謝彤哥、艿艿和俞超-閃電俠這些大佬前期的分享,能夠讓更多人學習原始碼的旅途少走很多彎路,謝謝! 歡迎關注: ![原創乾貨分享.png](https://user-gold-cdn.xitu.io/2020/6/1/1726da96eac3c41b?w=1804&h=768&f=png&s