Netty原始碼分析:1.4伺服器啟動流程
第一章節是主要是伺服器啟動的程式碼分析。章節目錄有:
|———1.1初始化NioEventLoopGroup
|———1.2初始化NioEventLoop
|———1.3初始化NioServerSocketChannel
|———1.4伺服器啟動流程
為什麼先從初始化開始瞭解伺服器啟動?
因為在我看伺服器啟動的相關原始碼的時候,有很多地方都是初始化的時候已經建立好的。所以我就從初始化的原始碼開始看起。這是我第一次看原始碼的筆記,仍有很多理解錯誤的地方和不解的地方。歡迎討論。
本篇目錄
- 啟動伺服器程式碼
- 程式碼分析
啟動伺服器程式碼
- 1 建立NioEventLoopGroup物件。上文已經介紹了NioEventLoopGroup的初始化已經內部執行緒NioEventLoop的初始化話。
- 2 建立ServerBootstrap物件,該類初始化主要是建立了幾個
LinkedHashMap
來儲存設定。例如childOptions
或者childAttrs
public void bind() {
//1
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
// 引導繫結和啟動伺服器
//2
ServerBootstrap b = new ServerBootstrap();
//3
b.group(boss, work);
// 建立NioEventLoopGroup物件來處理事件,如接受新連線、接收資料、寫資料等等
//4
b.channel(NioServerSocketChannel.class);
// 設定childHandler執行所有的連線請求
//5
b.childHandler(new ChildChannelHandler());
//6
b.option(ChannelOption.SO_BACKLOG, 100);
//繫結埠
//7
ChannelFuture future = b.bind(8080).sync();
//8
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//9
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
3 Group
賦值兩個執行緒池。
1 首先第一個boss執行緒池是賦值到父類AbstractBootstrap
的group變數中。
2 work執行緒池就賦值在ServerBootstrap的childGroup變數中。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//1
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
4 channel()
- 首先
ReflectiveChannelFactory
類是一個工廠類,裡面只有一個方法newChannel()
用來將傳入的class進行無參構造生成物件的。 - 然後channelFactory()是將ReflectiveChannelFactory賦予ServerBootstrap的父類的channelFactory引數
此時channel 還沒被建立,直到bind()方法中呼叫
initAndRegister()
方法的時候才會用該工廠類生成一個channel。下文會講。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
5 childHandler()
在呼叫這個“childHandler()“這個方法的時候,你需要自己“寫一個方法繼承ChannelInitializer類“。而“ChannelInitializer“類也是繼承“ChannelInboundHandlerAdapter“的。所以“childHandler()“方法的引數就是我們自己寫的類。然後賦值到childHandler引數。public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
6 Option()
將一些配置儲存到options變數中,該變數是一個LinkedHashMappublic <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
7 bind()
![image.png](https://upload-images.jianshu.io/upload_images/6212571-f500986ea414273c.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 之前的一些方法都只是對ServerBootstrap的配置,說白了就是用來set引數的。 bind()則是開始啟動伺服器了。 - 1 進行對“group“和“channelFactory“兩個引數進行非空驗證public ChannelFuture bind(SocketAddress localAddress) {
validate(); //1
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
doBind()
- 1 建立一個channel 並且初始化和註冊。
關鍵部分。程式碼分析看下面
- 2 判斷channel是否註冊成功。如果已經註冊成功那麼就進行
doBind0()
方法。如果還沒成功,那就新增一個監聽器,等返回成功的時候就進行doBind0()
方法。解析看下文。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //1
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//2
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind0()
- 1 該方法是執行在channel已經與selector註冊後的。給執行緒新增一個任務。該任務是繫結埠的。
到這個方法。伺服器啟動就已經完成了
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//1
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
下文都是對initAndRegister()
方法的程式碼解析
initAndRegister()
- 1 利用工廠建立一個channel物件。初始化內容看上一篇文章
初始化NioServerSocketChannel
- 2 對channel進行一個初始化,看下文。
- 3 對channel進行註冊,看下文
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1
channel = channelFactory.newChannel();
//2
init(channel);
} catch (Throwable t) {
}
//3
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
init()
- 1 對channel 的config變數裡新增options屬性
- 2 對channel 的config變數裡新增attrs屬性
- 3 獲取channel 物件的pipeline管道,然後在管道里面新增一個handler,該handler作用有:新增bootstrap裡的handler。對channel 新增一個任務。
- 4 新增bootstrap裡的handler。對channel 新增一個任務
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
//1
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//2
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//3
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//4
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
config().group().register(channel)
config().group()
是獲取到執行緒池,該執行緒池是ServerBootstrap
的父類AbstractBootstrap
儲存的boss執行緒池。是不是恍然大悟,這就是把channel放到boss執行緒池裡的一個執行緒裡面去執行任務啊。- 1 是
MultithreadEventLoopGroup
類的方法 也就是NioEventLoopGroup
的父類。 該方法就是從執行緒池裡獲取一個EventLoop.然後執行EventLoop裡的register()方法。
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel); //1
}
繼續看下去。
- 2 是SingleThreadEventLoop
裡的方法。`DefaultChannelPromise()
方法是給channel 與該執行緒 新增一個監聽器。
@Override
public ChannelFuture register(Channel channel) {
//2
return register(new DefaultChannelPromise(channel, this));
}
繼續往下看
- 3 繼續是SingleThreadEventLoop
裡的方法。promise是監聽器物件,promise.channel()
獲取到channel。unsafe()
方法是實現底層的register,read或者write操作
@Override
public ChannelFuture register(final ChannelPromise promise) {
//3
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
繼續往下分析,該register()方法是AbstractChannel
類的。也就是初始化NioServerSocketChannel
的時候,建立pipeline和unsafe的類。
- 4 進行引數驗證
- 5 將該eventLoop執行緒賦值於channel引數。
- 6 eventLoop.inEventLoop() 判斷,如果現在的執行緒是EventLoop()的 執行緒,那麼執行任務,如果不是那麼就用執行器執行任務。在這裡debug,會返回false,會呼叫execute()方法。
下面繼續探討
- 7 下面探討下register0()
方法
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//4
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//5
AbstractChannel.this.eventLoop = eventLoop;
//6
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
//7
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
首先在研究register0()之前有一個小知識。
在版本4.1.x 的時候,在初始化
eventLoop
的時候,還沒有建立執行緒,而是儲存了Executor
這個變數。這個變數在4.0.x版本的時候是沒有的。那麼4.1.x版本在什麼時候建立執行緒呢?在eventLoop呼叫execute()方法的時候建立執行緒。下文可以看到
- 1 繼續判斷是否現在的執行緒是EventLoop物件的執行緒,肯定返回false,因為EventLoop執行緒裡的thread變數是
null
嘛。
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
[圖片上傳失敗…(image-1a4a0b-1523959190717)]
- 2 在
startThread()
方法中,才建立一個執行緒。 - 3 在任務佇列裡新增一個任務。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//1
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//2
startThread();
//3
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
register0(ChannelPromise promise)
- 1 判斷EventLoop執行緒是否還存活。
- 2 這個是記錄是否註冊過的。
neverRegistered
預設是true; - 3 這個是
核心程式碼
,到這裡才進行將channel與selector註冊在一起。
private void register0(ChannelPromise promise) {
try {
//1
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
//2
boolean firstRegistration = neverRegistered;
doRegister();//3
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}