Netty之旅三:Netty服務端啟動原始碼分析,一梭子帶走!
阿新 • • 發佈:2020-09-15
# Netty服務端啟動流程原始碼分析
![](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d5be8808ccb54c318b5abd21f97bd37f~tplv-k3u1fbpfcp-zoom-1.image)
## 前記
哈嘍,自從上篇《Netty之旅二:口口相傳的高效能Netty到底是什麼?》後,遲遲兩週才開啟今天的`Netty`原始碼系列。原始碼分析的第一篇文章,下一篇我會分享客戶端的啟動過程原始碼分析。通過原始碼的閱讀,我們將會知道,`Netty` 服務端啟動的呼叫鏈是非常長的,同時肯定也會發現一些新的問題,隨著我們原始碼閱讀的不斷深入,相信這些問題我們也會一一攻破。
廢話不多說,直接上號!
![](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c5964018204f4525ab0bc0ab7b0be09d~tplv-k3u1fbpfcp-zoom-1.image)
## 一、從EchoServer示例入手
![netty-example:EchoServer.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8547f81868154df588e991155fa90e69~tplv-k3u1fbpfcp-zoom-1.image)
示例從哪裡來?任何開源框架都會有自己的示例程式碼,Netty原始碼也不例外,如模組`netty-example`中就包括了最常見的`EchoServer`示例,下面通過這個示例進入服務端啟動流程篇章。
```java
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// 1. 宣告Main-Sub Reactor模式執行緒池:EventLoopGroup
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 建立 EchoServerHandler 物件
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 2. 宣告服務端啟動引導器,並設定相關屬性
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 3. 繫結埠即啟動服務端,並同步等待
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// 4. 監聽服務端關閉,並阻塞等待
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// 5. 優雅地關閉兩個EventLoopGroup執行緒池
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
```
1. [程式碼行18、19]宣告`Main-Sub Reactor`模式執行緒池:`EventLoopGroup`
建立兩個` EventLoopGroup` 物件。其中,`bossGroup`用於服務端接受客戶端的連線,`workerGroup`用於進行客戶端的 `SocketChannel `的資料讀寫。
(關於`EventLoopGroup`不是本文重點所以在後續文章中進行分析 )
2. [程式碼行23-39]宣告服務端啟動引導器,並設定相關屬性
`AbstractBootstrap`是一個幫助類,通過方法鏈(`method chaining`)的方式,提供了一個簡單易用的方式來配置啟動一個`Channel`。`io.netty.bootstrap.ServerBootstrap` ,實現` AbstractBootstrap` 抽象類,用於` Server` 的啟動器實現類。`io.netty.bootstrap.Bootstrap` ,實現 `AbstractBootstrap` 抽象類,用於 `Client` 的啟動器實現類。如下類圖所示:
![AbstractBootstrap類繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d877706356d4427a9afd4b13d7177142~tplv-k3u1fbpfcp-zoom-1.image)
(在`EchoServer`示例程式碼中,我們看到 `ServerBootstrap `的 `group`、`channel`、`option`、`childHandler` 等屬性鏈式設定都放到關於`AbstractBootstrap`體系程式碼中詳細介紹。 )
3. [程式碼行43]繫結埠即啟動服務端,並同步等待
先呼叫 `#bind(int port)` 方法,繫結埠,後呼叫 `ChannelFuture#sync()` 方法,阻塞等待成功。對於`bind`操作就是本文要詳細介紹的"服務端啟動流程"。
4. [程式碼行47]監聽服務端關閉,並阻塞等待
先呼叫 `#closeFuture()` 方法,監聽伺服器關閉,後呼叫 `ChannelFuture#sync()` 方法,阻塞等待成功。 注意,此處不是關閉伺服器,而是`channel`的監聽關閉。
5. [程式碼行51、52]優雅地關閉兩個`EventLoopGroup`執行緒池
`finally`程式碼塊中執行說明服務端將最終關閉,所以呼叫 `EventLoopGroup#shutdownGracefully()` 方法,分別關閉兩個` EventLoopGroup `物件,終止所有執行緒。
## 二、服務啟動過程
在服務啟動過程的原始碼分析之前,這裡回顧一下我們在通過`JDK NIO`程式設計在服務端啟動初始的程式碼:
```java
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
```
這5行程式碼標示一個最為熟悉的過程:
- 開啟`serverSocketChannel`
- 配置非阻塞模式
- 為`channel`的`socket`繫結監聽埠
- 建立`Selector`
- 將`serverSocketChannel`註冊到 `selector`
後面等分析完`Netty`的啟動過程後,會對這些步驟有一個新的認識。在`EchoServer`示例中,進入 `#bind(int port)` 方法,`AbstractBootstrap#bind()`其實有多個方法,方便不同地址引數的傳遞,實際呼叫的方法是`AbstractBootstrap#doBind(final SocketAddress localAddress)` 方法,程式碼如下:
```java
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
```
- [程式碼行2] :呼叫 `#initAndRegister()` 方法,初始化並註冊一個 `Channel` 物件。因為註冊是非同步的過程,所以返回一個 `ChannelFuture `物件。詳細解析,見 「`initAndRegister()`」。
- [程式碼行4-6]]:若發生異常,直接進行返回。
- [程式碼行9-34]:因為註冊是非同步的過程,有可能已完成,有可能未完成。所以實現程式碼分成了【第 10 至 14 行】和【第 15 至 36 行】分別處理已完成和未完成的情況。
- 核心在[第 11 、29行],呼叫 `#doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)` 方法,繫結 Channel 的埠,並註冊 Channel 到 `SelectionKey` 中。
- 如果非同步註冊對應的 `ChanelFuture` 未完成,則呼叫 `ChannelFuture#addListener(ChannelFutureListener)` 方法,新增監聽器,在註冊完成後,進行回撥執行 `#doBind0(...)` 方法的邏輯。
通過`doBind`方法可以知道服務端啟動流程大致如下幾個步驟:
![服務端啟動流程.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f1f7faa8b2224b279cb7b5c7ed3be51c~tplv-k3u1fbpfcp-zoom-1.image)
### 1. 建立Channel
![建立服務端channel.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d5cc116d92574c81bd11a6fc33d6af16~tplv-k3u1fbpfcp-zoom-1.image)
從`#doBind(final SocketAddress localAddress)`進入到`initAndRegister()`:
```java
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
```
[程式碼行4]呼叫 `ChannelFactory#newChannel()` 方法,建立` Channel `物件。 `ChannelFactory`類繼承如下:
![ChannelFactroy類繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e62d592b38d444dc8c15c9f87334231e~tplv-k3u1fbpfcp-zoom-1.image)
可以在`ChannelFactory`註釋看到`@deprecated Use {@link io.netty.channel.ChannelFactory} instead.`,這裡只是包名的調整,對於繼承結構不變。`netty`預設使用`ReflectiveChannelFactory`,我們可以看到過載方法:
```java
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
```
很明顯,正如其名是通過反射機制構造`Channel`物件例項的。`constructor`是在其構造方法初始化的:`this.constructor = clazz.getConstructor();`這個`clazz`按理說應該是我們要建立的`Channel`的Class物件。那`Class`物件是什麼呢?我們接著看`channelFactory`是怎麼初始化的。
首先在`AbstractBootstrap`找到如下程式碼:
```java
@Deprecated
public B channelFactory(ChannelFactory extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
```
呼叫這個方法的遞推向上看到:
```java
public B channel(Class extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
```
這個方法正是在`EchoServer`中`ServerBootstrap`鏈式設定時呼叫`.channel(NioServerSocketChannel.class)`的方法。我們看到,`channelClass`就是`NioServerSocketChannel.class`,`channelFactory`也是以`ReflectiveChannelFactory`作為具體例項,並且將`NioServerSocketChannel.class`作為構造引數傳遞初始化的,所以這回答了反射機制構造的是`io.netty.channel.socket.nio.NioServerSocketChannel`物件。
繼續看`NioServerSocketChannel`構造方法邏輯做了什麼事情,看之前先給出`NioServerSocketChannel`類繼承關係:
![Channel類繼承.jpg](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/cbb00ca166b2428caeb44a5892172550~tplv-k3u1fbpfcp-zoom-1.image)
`NioServerSocketChannel`與`NioSocketChannel`分別對應服務端和客戶端,公共父類都是`AbstractNioChannel`和`AbstractChannel`,下面介紹建立過程可以參照這個`Channel`類繼承圖。進入`NioServerSocketChannel`構造方法:
```java
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
```
點選`newSocket`進去:
```java
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See #2308.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
```
以上傳進來的`provider`是`DEFAULT_SELECTOR_PROVIDER`即預設的`java.nio.channels.spi.SelectorProvider`,[程式碼行9]就是熟悉的`jdk nio`建立`ServerSocketChannel`。這樣`newSocket(DEFAULT_SELECTOR_PROVIDER)`就返回了結果`ServerSocketChannel`,回到`NioServerSocketChannel()#this()`點進去:
```java
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
```
以上`super`代表父類`AbstractNioMessageChannel`構造方法,點進去看到:
```java
/**
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
```
以上`super`代表父類`AbstractNioChannel`構造方法,點進去看到:
```java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
```
以上[程式碼行3]將`ServerSocketChannel`儲存到了`AbstractNioChannel#ch`成員變數,在上面提到的`NioServerSocketChannel`構造方法的[程式碼行6]`javaChannel()`拿到的就是`ch`儲存的`ServerSocketChannel`變數。
以上[程式碼行6]就是熟悉的`jdk nio`程式設計設定`ServerSocketChannel`非阻塞方式。這裡還有`super`父類構造方法,點選進去看到:
```java
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
```
以上構造方法中:
- `parent` 屬性,代表父` Channel` 物件。對於` NioServerSocketChannel `的 `parent` 為`null`。
- `id` 屬性,`Channel `編號物件。在構造方法中,通過呼叫 `#newId()` 方法進行建立。(這裡不細展開Problem-1 )
- `unsafe` 屬性,`Unsafe` 物件。因為`Channel` 真正的具體操作,是通過呼叫對應的 `Unsafe` 物件實施。所以需要在構造方法中,通過呼叫 `#newUnsafe()` 方法進行建立。這裡的 `Unsafe` 並不是我們常說的 `jdk`自帶的`sun.misc.Unsafe` ,而是 `io.netty.channel.Channel#Unsafe`。(這裡不細展開Problem-2)
- `pipeline`屬性預設是`DefaultChannelPipeline`物件,賦值後在後面為channel繫結埠的時候會用到
通過以上建立`channel`原始碼過程分析,總結的流程時序圖如下:
![wwv6q1.jpg](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6aaa112e02e34082b4c4b73786abd493~tplv-k3u1fbpfcp-zoom-1.image)
### 2. 初始化Channel
![初始化channel.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5ca13ae3d05b4d79a0e3de03ba02f8c0~tplv-k3u1fbpfcp-zoom-1.image)
回到一開始建立`Channel`的`initAndRegister()`入口方法,在建立`Channel`後緊接著`init(channel)`進入初始化流程,因為是服務端初始化,所以是`ServerBootstrap#init(Channel channel)`,程式碼如下:
```java
@Override
void init(Channel channel) throws Exception {
final Map, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey