1. 程式人生 > >Netty服務端的啟動原始碼分析

Netty服務端的啟動原始碼分析

ServerBootstrap的構造:

 1 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
 2     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
 3     private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap();
 4     private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap();
 5     private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
 6     private volatile EventLoopGroup childGroup;
 7     private volatile ChannelHandler childHandler;
 8 
 9     public ServerBootstrap() {
10     }
11     ......
12 }

隱式地執行了父類的無參構造:

 1 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
 2     volatile EventLoopGroup group;
 3     private volatile ChannelFactory<? extends C> channelFactory;
 4     private volatile SocketAddress localAddress;
 5     private final Map<ChannelOption<?>, Object> options = new LinkedHashMap();
 6     private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap();
 7     private volatile ChannelHandler handler;
 8 
 9     AbstractBootstrap() {
10     }
11     ......
12 }

只是初始化了幾個容器成員

在ServerBootstrap建立後,需要呼叫group方法,繫結EventLoopGroup,有關EventLoopGroup的建立在我之前部落格中寫過:Netty中NioEventLoopGroup的建立原始碼分析


ServerBootstrap的group方法:

 1 public ServerBootstrap group(EventLoopGroup group) {
 2     return this.group(group, group);
 3 }
 4 
 5 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
 6     super.group(parentGroup);
 7     if (childGroup == null) {
 8         throw new NullPointerException("childGroup");
 9     } else if (this.childGroup != null) {
10         throw new IllegalStateException("childGroup set already");
11     } else {
12         this.childGroup = childGroup;
13         return this;
14     }
15 }

首先呼叫父類的group方法繫結parentGroup:

 1 public B group(EventLoopGroup group) {
 2     if (group == null) {
 3         throw new NullPointerException("group");
 4     } else if (this.group != null) {
 5         throw new IllegalStateException("group set already");
 6     } else {
 7         this.group = group;
 8         return this.self();
 9     }
10 }
11 
12 private B self() {
13     return this;
14 }

將傳入的parentGroup繫結給AbstractBootstrap的group成員,將childGroup繫結給ServerBootstrap的childGroup成員。
group的繫結僅僅是交給了成員儲存。

再來看看ServerBootstrap的channel方法,,是在AbstractBootstrap中實現的:

1 public B channel(Class<? extends C> channelClass) {
2     if (channelClass == null) {
3         throw new NullPointerException("channelClass");
4     } else {
5         return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
6     }
7 }

使用channelClass構建了一個ReflectiveChannelFactory物件:

 1 public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
 2     private final Class<? extends T> clazz;
 3 
 4     public ReflectiveChannelFactory(Class<? extends T> clazz) {
 5         if (clazz == null) {
 6             throw new NullPointerException("clazz");
 7         } else {
 8             this.clazz = clazz;
 9         }
10     }
11 
12     public T newChannel() {
13         try {
14             return (Channel)this.clazz.getConstructor().newInstance();
15         } catch (Throwable var2) {
16             throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
17         }
18     }
19 
20     public String toString() {
21         return StringUtil.simpleClassName(this.clazz) + ".class";
22     }
23 }

可以看到ReflectiveChannelFactory的作用就是通過反射機制,產生clazz的例項(這裡以NioServerSocketChannel為例)。

在建立完ReflectiveChannelFactory物件後, 呼叫channelFactory方法:

 1 public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
 2     return this.channelFactory((ChannelFactory)channelFactory);
 3 }
 4 
 5 public B channelFactory(ChannelFactory<? extends C> channelFactory) {
 6     if (channelFactory == null) {
 7         throw new NullPointerException("channelFactory");
 8     } else if (this.channelFactory != null) {
 9         throw new IllegalStateException("channelFactory set already");
10     } else {
11         this.channelFactory = channelFactory;
12         return this.self();
13     }
14 }

將剛才建立的ReflectiveChannelFactory物件交給channelFactory成員,用於後續服務端NioServerSocketChannel的建立。

再來看ServerBootstrap的childHandler方法:

1 public ServerBootstrap childHandler(ChannelHandler childHandler) {
2     if (childHandler == null) {
3         throw new NullPointerException("childHandler");
4     } else {
5         this.childHandler = childHandler;
6         return this;
7     }
8 }

還是交給了childHandler成員儲存,可以看到上述這一系列的操作,都是為了填充ServerBootstrap,而ServerBootstrap真正的啟動是在bind時:
ServerBootstrap的bind方法,在AbstractBootstrap中實現:

 1 public ChannelFuture bind(int inetPort) {
 2     return this.bind(new InetSocketAddress(inetPort));
 3 }
 4 
 5 public ChannelFuture bind(String inetHost, int inetPort) {
 6 return this.bind(SocketUtils.socketAddress(inetHost, inetPort));
 7 }
 8 
 9 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
10     return this.bind(new InetSocketAddress(inetHost, inetPort));
11 }
12 
13 public ChannelFuture bind(SocketAddress localAddress) {
14     this.validate();
15     if (localAddress == null) {
16         throw new NullPointerException("localAddress");
17     } else {
18         return this.doBind(localAddress);
19     }
20 }

可以看到首先呼叫了ServerBootstrap的validate方法,:

 1 public ServerBootstrap validate() {
 2     super.validate();
 3     if (this.childHandler == null) {
 4         throw new IllegalStateException("childHandler not set");
 5     } else {
 6         if (this.childGroup == null) {
 7             logger.warn("childGroup is not set. Using parentGroup instead.");
 8             this.childGroup = this.config.group();
 9         }
10     
11         return this;
12     }
13 }

先呼叫了AbstractBootstrap的validate方法:

1 public B validate() {
2     if (this.group == null) {
3         throw new IllegalStateException("group not set");
4     } else if (this.channelFactory == null) {
5         throw new IllegalStateException("channel or channelFactory not set");
6     } else {
7         return this.self();
8     }
9 }


這個方法就是用來檢查是否綁定了group和channel以及childHandler,所以在執行bind方法前,無論如何都要執行group、channel和childHandler方法。

實際的bind交給了doBind來完成:

 1 private ChannelFuture doBind(final SocketAddress localAddress) {
 2     final ChannelFuture regFuture = this.initAndRegister();
 3     final Channel channel = regFuture.channel();
 4     if (regFuture.cause() != null) {
 5         return regFuture;
 6     } else if (regFuture.isDone()) {
 7         ChannelPromise promise = channel.newPromise();
 8         doBind0(regFuture, channel, localAddress, promise);
 9         return promise;
10     } else {
11         final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
12         regFuture.addListener(new ChannelFutureListener() {
13             public void operationComplete(ChannelFuture future) throws Exception {
14                 Throwable cause = future.cause();
15                 if (cause != null) {
16                     promise.setFailure(cause);
17                 } else {
18                     promise.registered();
19                     AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
20                 }
21             }
22         });
23         return promise;
24     }
25 }

首先呼叫initAndRegister,完成ServerSocketChannel的建立以及註冊:

 1 final ChannelFuture initAndRegister() {
 2     Channel channel = null;
 3 
 4     try {
 5         channel = this.channelFactory.newChannel();
 6         this.init(channel);
 7     } catch (Throwable var3) {
 8         if (channel != null) {
 9             channel.unsafe().closeForcibly();
10             return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
11         }
12 
13         return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
14     }
15 
16     ChannelFuture regFuture = this.config().group().register(channel);
17     if (regFuture.cause() != null) {
18         if (channel.isRegistered()) {
19             channel.close();
20         } else {
21             channel.unsafe().closeForcibly();
22         }
23     }
24 
25     return regFuture;
26 }

首先呼叫channelFactory的newChannel通過反射機制構建Channel例項,也就是NioServerSocketChannel,


NioServerSocketChannel的無參構造:

1 public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
2     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
3     
4     public NioServerSocketChannel() {
5         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
6     }
7     ......
8 }

SelectorProvider 是JDK的,關於SelectorProvider在我之前的部落格中有介紹:【Java】NIO中Selector的建立原始碼分析

在Windows系統下預設產生WindowsSelectorProvider,即DEFAULT_SELECTOR_PROVIDER,再來看看newSocket方法:

1 private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
2     try {
3         return provider.openServerSocketChannel();
4     } catch (IOException var2) {
5         throw new ChannelException("Failed to open a server socket.", var2);
6     }
7 }

使用WindowsSelectorProvider建立了一個ServerSocketChannelImpl,其實看到這裡就明白了,NioServerSocketChannel是為了封裝JDK的ServerSocketChannel

接著呼叫另一個過載的構造:

1 public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
2     super((Channel)null, channel, 16);
3     this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
4 }

首先呼叫父類的三參構造,其中16對應的是JDK中SelectionKey的ACCEPT狀態:

1 public static final int OP_ACCEPT = 1 << 4;

其父類的構造處於一條繼承鏈上:

AbstractNioMessageChannel:

1 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
2     super(parent, ch, readInterestOp);
3 }

AbstractNioChannel:

 1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
 2     super(parent);
 3     this.ch = ch;
 4     this.readInterestOp = readInterestOp;
 5 
 6     try {
 7         ch.configureBlocking(false);
 8     } catch (IOException var7) {
 9         try {
10             ch.close();
11         } catch (IOException var6) {
12             if (logger.isWarnEnabled()) {
13                 logger.warn("Failed to close a partially initialized socket.", var6);
14             }
15         }
16 
17         throw new ChannelException("Failed to enter non-blocking mode.", var7);
18     }
19 }

AbstractChannel:

 1 private final ChannelId id;
 2 private final Channel parent;
 3 private final Unsafe unsafe;
 4 private final DefaultChannelPipeline pipeline;
 5 
 6 protected AbstractChannel(Channel parent) {
 7     this.parent = parent;
 8     this.id = this.newId();
 9     this.unsafe = this.newUnsafe();
10     this.pipeline = this.newChannelPipeline();
11 }

在AbstractChannel中使用newUnsafe和newChannelPipeline分別建立了一個Unsafe和一個DefaultChannelPipeline物件,
在前面的部落格介紹NioEventLoopGroup時候,在NioEventLoop的run方法中,每次輪詢完呼叫processSelectedKeys方法時,都是通過這個unsafe根據SelectedKey來完成資料的讀或寫,unsafe是處理基礎的資料讀寫
(unsafe在NioServerSocketChannel建立時,產生NioMessageUnsafe例項,在NioSocketChannel建立時產生NioSocketChannelUnsafe例項)

而pipeline的實現是一條雙向責任鏈,負責處理unsafe提供的資料,進而進行使用者的業務邏輯 (Netty中的ChannelPipeline原始碼分析)

在AbstractNioChannel中呼叫configureBlocking方法給JDK的ServerSocketChannel設定為非阻塞模式,且讓readInterestOp成員賦值為16用於未來註冊ACCEPT事件。

在呼叫完繼承鏈後回到NioServerSocketChannel構造,呼叫了javaChannel方法:

1 protected java.nio.channels.ServerSocketChannel javaChannel() {
2     return (java.nio.channels.ServerSocketChannel)super.javaChannel();
3 }

其實這個javaChannel就是剛才出傳入到AbstractNioChannel中的ch成員:

1 protected SelectableChannel javaChannel() {
2     return this.ch;
3 }

也就是剛才建立的JDK的ServerSocketChannelImpl,用其socket方法,得到一個ServerSocket物件,然後產生了一個NioServerSocketChannelConfig物件,用於封裝相關資訊。

 

NioServerSocketChannel構建完畢,回到initAndRegister方法,使用剛建立的NioServerSocketChannel呼叫init方法,這個方法是在ServerBootstrap中實現的:

 1 void init(Channel channel) throws Exception {
 2     Map<ChannelOption<?>, Object> options = this.options0();
 3     synchronized(options) {
 4         setChannelOptions(channel, options, logger);
 5     }
 6 
 7     Map<AttributeKey<?>, Object> attrs = this.attrs0();
 8     synchronized(attrs) {
 9         Iterator var5 = attrs.entrySet().iterator();
10 
11         while(true) {
12             if (!var5.hasNext()) {
13                 break;
14             }
15 
16             Entry<AttributeKey<?>, Object> e = (Entry)var5.next();
17             AttributeKey<Object> key = (AttributeKey)e.getKey();
18             channel.attr(key).set(e.getValue());
19         }
20     }
21 
22     ChannelPipeline p = channel.pipeline();
23     final EventLoopGroup currentChildGroup = this.childGroup;
24     final ChannelHandler currentChildHandler = this.childHandler;
25     Map var9 = this.childOptions;
26     final Entry[] currentChildOptions;
27     synchronized(this.childOptions) {
28         currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(0));
29     }
30 
31     var9 = this.childAttrs;
32     final Entry[] currentChildAttrs;
33     synchronized(this.childAttrs) {
34         currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(0));
35     }
36 
37     p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
38         public void initChannel(final Channel ch) throws Exception {
39             final ChannelPipeline pipeline = ch.pipeline();
40             ChannelHandler handler = ServerBootstrap.this.config.handler();
41             if (handler != null) {
42                 pipeline.addLast(new ChannelHandler[]{handler});
43             }
44 
45             ch.eventLoop().execute(new Runnable() {
46                 public void run() {
47                     pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
48                 }
49             });
50         }
51     }});
52 }

首先對attrs和options這兩個成員進行了填充屬性配置,這不是重點,然後獲取剛才建立的NioServerSocketChannel的責任鏈pipeline,通過addLast將ChannelInitializer加入責任鏈,在ChannelInitializer中重寫了initChannel方法,首先根據handler是否是null(這個handler是ServerBootstrap呼叫handler方法新增的,和childHandler方法不一樣),若是handler不是null,將handler加入責任鏈,無論如何,都會非同步將一個ServerBootstrapAcceptor物件加入責任鏈(後面會說為什麼是非同步)

 

這個ChannelInitializer的initChannel方法的執行需要等到後面註冊時才會被呼叫,在後面pipeline處理channelRegistered請求時,此initChannel方法才會被執行 (Netty中的ChannelPipeline原始碼分析)

ChannelInitializer的channelRegistered方法:

1 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
2     if (initChannel(ctx)) {
3         ctx.pipeline().fireChannelRegistered();
4     } else {
5         ctx.fireChannelRegistered();
6     }
7 }

首先呼叫initChannel方法(和上面的initChannel不是一個):

 1 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
 2     if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
 3         try {
 4             initChannel((C) ctx.channel());
 5         } catch (Throwable cause) {
 6             exceptionCaught(ctx, cause);
 7         } finally {
 8             remove(ctx);
 9         }
10         return true;
11     }
12     return false;
13 }

可以看到,這個ChannelInitializer只會在pipeline中初始化一次,僅用於Channel的註冊,在完成註冊後,會呼叫remove方法將其從pipeline中移除:
remove方法:

 1 private void remove(ChannelHandlerContext ctx) {
 2     try {
 3         ChannelPipeline pipeline = ctx.pipeline();
 4         if (pipeline.context(this) != null) {
 5             pipeline.remove(this);
 6         }
 7     } finally {
 8         initMap.remove(ctx);
 9     }
10 }

在移除前,就會回撥用剛才覆蓋的initChannel方法,異步向pipeline添加了ServerBootstrapAcceptor,用於後續的NioServerSocketChannel偵聽到客戶端連線後,完成在服務端的NioSocketChannel的註冊。

回到initAndRegister,在對NioServerSocketChannel初始化完畢,接下來就是註冊邏輯:

1 ChannelFuture regFuture = this.config().group().register(channel);

首先呼叫config().group(),這個就得到了一開始在ServerBootstrap的group方法傳入的parentGroup,呼叫parentGroup的register方法,parentGroup是NioEventLoopGroup,這個方法是在子類MultithreadEventLoopGroup中實現的:

1 public ChannelFuture register(Channel channel) {
2     return this.next().register(channel);
3 }

首先呼叫next方法:

1 public EventLoop next() {
2     return (EventLoop)super.next();
3 }

實際上呼叫父類MultithreadEventExecutorGroup的next方法:

1 public EventExecutor next() {
2     return this.chooser.next();
3 }

關於chooser在我之前部落格:Netty中NioEventLoopGroup的建立原始碼分析 介紹過,在NioEventLoopGroup建立時,預設會根據cpu個數建立二倍個NioEventLoop,而chooser就負責通過取模,每次選擇一個NioEventLoop使用

所以在MultithreadEventLoopGroup的register方法實際呼叫了NioEventLoop的register方法:

NioEventLoop的register方法在子類SingleThreadEventLoop中實現:

1 public ChannelFuture register(Channel channel) {
2     return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
3 }
4 
5 public ChannelFuture register(ChannelPromise promise) {
6    ObjectUtil.checkNotNull(promise, "promise");
7     promise.channel().unsafe().register(this, promise);
8     return promise;
9 }

先把channel包裝成ChannelPromise,預設是DefaultChannelPromise (Netty中的ChannelFuture和ChannelPromise),用於處理非同步操作

呼叫過載方法,而在過載方法裡,可以看到,實際上的register操作交給了channel的unsafe來實現:

unsafe的register方法在AbstractUnsafe中實現:

 1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 2     if (eventLoop == null) {
 3         throw new NullPointerException("eventLoop");
 4     } else if (AbstractChannel.this.isRegistered()) {
 5         promise.setFailure(new IllegalStateException("registered to an event loop already"));
 6     } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
 7         promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
 8     } else {
 9         AbstractChannel.this.eventLoop = eventLoop;
10         if (eventLoop.inEventLoop()) {
11             this.register0(promise);
12         } else {
13             try {
14                 eventLoop.execute(new Runnable() {
15                     public void run() {
16                         AbstractUnsafe.this.register0(promise);
17                     }
18                 });
19             } catch (Throwable var4) {
20                 AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
21                 this.closeForcibly();
22                 AbstractChannel.this.closeFuture.setClosed();
23                 this.safeSetFailure(promise, var4);
24             }
25         }
26 
27     }
28 }

前面的判斷做了一些檢查就不細說了,直接看到else塊
首先給當前Channel綁定了eventLoop,即通過剛才chooser選擇的eventLoop,該Channel也就是NioServerSocketChannel
由於Unsafe的操作是在輪詢執行緒中非同步執行的,所裡,這裡需要判斷inEventLoop是否處於輪詢中
在之前介紹NioEventLoopGroup的時候說過,NioEventLoop在沒有呼叫doStartThread方法時並沒有啟動輪詢的,所以inEventLoop判斷不成立

那麼就呼叫eventLoop的execute方法,實際上的註冊方法可以看到呼叫了AbstractUnsafe的register0方法,而將這個方法封裝為Runnable交給eventLoop作為一個task去非同步執行
先來看eventLoop的execute方法實現,是在NioEventLoop的子類SingleThreadEventExecutor中實現的:

 1 public void execute(Runnable task) {
 2     if (task == null) {
 3         throw new NullPointerException("task");
 4     } else {
 5         boolean inEventLoop = this.inEventLoop();
 6         this.addTask(task);
 7         if (!inEventLoop) {
 8             this.startThread();
 9             if (this.isShutdown() && this.removeTask(task)) {
10                 reject();
11             }
12         }
13 
14         if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
15             this.wakeup(inEventLoop);
16         }
17 
18     }
19 }

這裡首先將task,即剛才的註冊事件放入阻塞任務佇列中,然後呼叫startThread方法:

 1 private void startThread() {
 2     if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
 3         try {
 4             this.doStartThread();
 5         } catch (Throwable var2) {
 6             STATE_UPDATER.set(this, 1);
 7             PlatformDependent.throwException(var2);
 8         }
 9     }
10 
11 }

NioEventLoop此時還沒有輪詢,所以狀態是1,對應ST_NOT_STARTED,此時利用CAS操作,將狀態修改為2,即ST_STARTED ,標誌著NioEventLoop要啟動輪詢了,果然,接下來就呼叫了doStartThread開啟輪詢執行緒:

  1 private void doStartThread() {
  2     assert this.thread == null;
  3 
  4     this.executor.execute(new Runnable() {
  5         public void run() {
  6             SingleThreadEventExecutor.this.thread = Thread.currentThread();
  7             if (SingleThreadEventExecutor.this.interrupted) {
  8                 SingleThreadEventExecutor.this.thread.interrupt();
  9             }
 10 
 11             boolean success = false;
 12             SingleThreadEventExecutor.this.updateLastExecutionTime();
 13             boolean var112 = false;
 14 
 15             int oldState;
 16             label1907: {
 17                 try {
 18                     var112 = true;
 19                     SingleThreadEventExecutor.this.run();
 20                     success = true;
 21                     var112 = false;
 22                     break label1907;
 23                 } catch (Throwable var119) {
 24                     SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119);
 25                     var112 = false;
 26                 } finally {
 27                     if (var112) {
 28                         int oldStatex;
 29                         do {
 30                             oldStatex = SingleThreadEventExecutor.this.state;
 31                         } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3));
 32 
 33                         if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) {
 34                             SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
 35                         }
 36 
 37                         try {
 38                             while(!SingleThreadEventExecutor.this.confirmShutdown()) {
 39                                 ;
 40                             }
 41                         } finally {
 42                             try {
 43                                 SingleThreadEventExecutor.this.cleanup();
 44                             } finally {
 45                                 SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
 46                                 SingleThreadEventExecutor.this.threadLock.release();
 47                                 if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) {
 48                                     SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
 49                                 }
 50 
 51                                 SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
 52                             }
 53                         }
 54 
 55                     }
 56                 }
 57 
 58                 do {
 59                     oldState = SingleThreadEventExecutor.this.state;
 60                 } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));
 61 
 62                 if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) {
 63                     SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
 64                 }
 65 
 66                 try {
 67                     while(!SingleThreadEventExecutor.this.confirmShutdown()) {
 68                         ;
 69                     }
 70 
 71                     return;
 72                 } finally {
 73                     try {
 74                         SingleThreadEventExecutor.this.cleanup();
 75                     } finally {
 76                         SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
 77                         SingleThreadEventExecutor.this.threadLock.release();
 78                         if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) {
 79                             SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
 80                         }
 81 
 82                         SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
 83                     }
 84                 }
 85             }
 86 
 87             do {
 88                 oldState = SingleThreadEventExecutor.this.state;
 89             } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));
 90 
 91             if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) {
 92                 SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");
 93             }
 94 
 95             try {
 96                 while(!SingleThreadEventExecutor.this.confirmShutdown()) {
 97                     ;
 98                 }
 99             } finally {
100                 try {
101                     SingleThreadEventExecutor.this.cleanup();
102                 } finally {
103                     SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);
104                     SingleThreadEventExecutor.this.threadLock.release();
105                     if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) {
106                         SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')');
107                     }
108 
109                     SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);
110                 }
111             }
112 
113         }
114     });
115 }

關於doStartThread方法,我在 Netty中NioEventLoopGroup的建立原始碼分析 中已經說的很細了,這裡就不再一步一步分析了

因為此時還沒真正意義上的啟動輪詢,所以thread等於null成立的,然後呼叫executor的execute方法,這裡的executor是一個執行緒池,在之前說過的,所以裡面的run方法是處於一個執行緒裡面的,然後給thread成員賦值為當前執行緒,表明正式進入了輪詢。
而SingleThreadEventExecutor.this.run()才是真正的輪詢邏輯,這在之前也說過,這個run的實現是在父類NioEventLoop中:

 1 protected void run() {
 2     while(true) {
 3         while(true) {
 4             try {
 5                 switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {
 6                 case -2:
 7                     continue;
 8                 case -1:
 9                     this.select(this.wakenUp.getAndSet(false));
10                     if (this.wakenUp.get()) {
11                         this.selector.wakeup();
12                     }
13                 default:
14                     this.cancelledKeys = 0;
15                     this.needsToSelectAgain = false;
16                     int ioRatio = this.ioRatio;
17                     if (ioRatio == 100) {
18                         try {
19                             this.processSelectedKeys();
20                         } finally {
21                             this.runAllTasks();
22                         }
23                     } else {
24                         long ioStartTime = System.nanoTime();
25                         boolean var13 = false;
26 
27                         try {
28                             var13 = true;
29                             this.processSelectedKeys();
30                             var13 = false;
31                         } finally {
32                             if (var13) {
33                                 long ioTime = System.nanoTime() - ioStartTime;
34                                 this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
35                             }
36                         }
37 
38                         long ioTime = System.nanoTime() - ioStartTime;
39                         this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
40                     }
41                 }
42             } catch (Throwable var21) {
43                 handleLoopException(var21);
44             }
45 
46             try {
47                 if (this.isShuttingDown()) {
48                     this.closeAll();
49                     if (this.confirmShutdown()) {
50                         return;
51                     }
52                 }
53             } catch (Throwable var18) {
54                 handleLoopException(var18);
55             }
56         }
57     }
58 }

首先由於task已經有一個了,就是剛才的註冊事件,所以選擇策略calculateStrategy最終呼叫selectNow(也是之前說過的):

 1 private final IntSupplier selectNowSupplier = new IntSupplier() {
 2     public int get() throws Exception {
 3         return NioEventLoop.this.selectNow();
 4     }
 5 };
 6 
 7 int selectNow() throws IOException {
 8     int var1;
 9     try {
10         var1 = this.selector.selectNow();
11     } finally {
12         if (this.wakenUp.get()) {
13             this.selector.wakeup();
14         }
15 
16     }
17 
18     return var1;
19 }

使用JDK原生Selector進行selectNow,由於此時沒有任何Channel的註冊,所以selectNow會立刻返回0,此時就進入default邏輯,由於沒有任何註冊,processSelectedKeys方法也做不了什麼,所以在這一次的輪詢實質上只進行了runAllTasks方法,此方法會執行阻塞佇列中的task的run方法(還是在之前部落格中介紹過),由於輪詢是線上程池中的一個執行緒中執行的,所以task的執行是一個非同步操作。(在執行完task,將task移除阻塞對立,執行緒繼續輪詢)

這時就可以回到AbstractChannel的register方法中了,由上面可以知道task實際上非同步執行了:

1 AbstractUnsafe.this.register0(promise);

register0方法:

 1 private void register0(ChannelPromise promise) {
 2     try {
 3         if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
 4             return;
 5         }
 6 
 7         boolean firstRegistration = this.neverRegistered;
 8         AbstractChannel.this.doRegister();
 9         this.neverRegistered = false;
10         AbstractChannel.this.registered = true;
11         AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
12         this.safeSetSuccess(promise);
13         AbstractChannel.this.pipeline.fireChannelRegistered();
14         if (AbstractChannel.this.isActive()) {
15             if (firstRegistration) {
16                 AbstractChannel.this.pipeline.fireChannelActive();
17             } else if (AbstractChannel.this.config().isAutoRead()) {
18                 this.beginRead();
19             }
20         }
21     } catch (Throwable var3) {
22         this.closeForcibly();
23         AbstractChannel.this.closeFuture.setClosed();
24         this.safeSetFailure(promise, var3);
25     }
26 
27 }

可以看到實際上的註冊邏輯又交給了AbstractChannel的doRegister,而這個方法在AbstractNioChannel中實現:

 1 protected void doRegister() throws Exception {
 2     boolean selected = false;
 3 
 4     while(true) {
 5         try {
 6             this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
 7             return;
 8         } catch (CancelledKeyException var3) {
 9             if (selected) {
10                 throw var3;
11             }
12 
13             this.eventLoop().selectNow();
14             selected = true;
15         }
16     }
17 }

javaChannel就是之前產生的JDK的ServerSocketChannel,unwrappedSelector在之前說過,就是未經修改的JDK原生Selector,這個Selector和eventLoop是一對一繫結的,可以看到呼叫JDK原生的註冊方法,完成了對ServerSocketChannel的註冊,但是註冊的是一個0狀態(預設值),而傳入的this,即AbstractNioChannel物件作為了一個附件,用於以後processSelectedKeys方法從SelectionKey中得到對應的Netty的Channel(還是之前部落格說過)
關於預設值,是由於AbstractNioChannel不僅用於NioServerSocketChannel的註冊,還用於NioSocketChannel的註冊,只有都使用預設值註冊才不會產生異常  【Java】NIO中Channel的註冊原始碼分析 ,並且,在以後processSelectedKeys方法會對0狀態判斷,再使用unsafe進行相應的邏輯處理。

 

在完成JDK的註冊後,呼叫pipeline的invokeHandlerAddedIfNeeded方法(Netty中的ChannelPipeline原始碼分析),處理ChannelHandler的handlerAdded的回撥,即呼叫使用者新增的ChannelHandler的handlerAdded方法。
呼叫safeSetSuccess,標誌非同步操作完成:

1 protected final void safeSetSuccess(ChannelPromise promise) {
2     if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
3         logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
4     }
5 }

關於非同步操作我在之前的部落格中說的很清楚了:Netty中的ChannelFuture和ChannelPromise


接著呼叫pipeline的fireChannelRegistered方法,也就是在責任鏈上呼叫channelRegistered方法,這時,就會呼叫之在ServerBootstrap中向pipeline新增的ChannelInitializer的channelRegistered,進而回調initChannel方法,完成對ServerBootstrapAcceptor的新增。

回到register0方法,在處理完pipeline的責任鏈後,根據當前AbstractChannel即NioServerSocketChannel的isActive:

1 public boolean isActive() {
2     return this.javaChannel().socket().isBound();
3 }

獲得NioServerSocketChannel繫結的JDK的ServerSocketChannel,進而獲取ServerSocket,判斷isBound:

1 public boolean isBound() {
2    // Before 1.3 ServerSockets were always bound during creation
3     return bound || oldImpl;
4 }

這裡實際上就是判斷ServerSocket是否呼叫了bind方法,前面說過register0方法是一個非同步操作,在多執行緒環境下不能保證執行順序,若是此時已經完成了ServerSocket的bind,根據firstRegistration判斷是否需要pipeline傳遞channelActive請求,首先會執行pipeline的head即HeadContext的channelActive方法:

1 @Override
2 public void channelActive(ChannelHandlerContext ctx) throws Exception {
3     ctx.fireChannelActive();
4 
5     readIfIsAutoRead();
6 }

在HeadContext通過ChannelHandlerContext 傳遞完channelActive請求後,會呼叫readIfIsAutoRead方法:

1 private void readIfIsAutoRead() {
2     if (channel.config().isAutoRead()) {
3         channel.read();
4     }
5 }

此時呼叫AbstractChannel的read方法:

1 public Channel read() {
2     pipeline.read();
3     return this;
4 }

最終在請求鏈由HeadContext執行read方法:

1 public void read(ChannelHandlerContext ctx) {
2     unsafe.beginRead();
3 }

終於可以看到此時呼叫unsafe的beginRead方法:

 1 public final void beginRead() {
 2     assertEventLoop();
 3 
 4     if (!isActive()) {
 5         return;
 6     }
 7 
 8     try {
 9         doBeginRead();
10     } catch (final Exception e) {
11         invokeLater(new Runnable() {
12             @Override
13             public void run() {
14                 pipeline.fireExceptionCaught(e);
15             }
16         });
17         close(voidPromise());
18     }
19 }

最終執行了doBeginRead方法,由AbstractNioChannel實現:

 1 protected void doBeginRead() throws Exception {
 2     final SelectionKey selectionKey = this.selectionKey;
 3     if (!selectionKey.isValid()) {
 4         return;
 5     }
 6     
 7     readPending = true;
 8     
 9     final int interestOps = selectionKey.interestOps();
10     if ((interestOps & readInterestOp) == 0) {
11         selectionKey.interestOps(interestOps | readInterestOp);
12     }
13 }

這裡,就完成了向Selector註冊readInterestOp事件,從前面來看就是ACCEPT事件

 

回到AbstractBootstrap的doBind方法,在initAndRegister邏輯結束後,由上面可以知道,實際上的register註冊邏輯是一個非同步操作,在register0中完成
根據ChannelFuture來判斷非同步操作是否完成,如果isDone,則表明非同步操作先完成,即完成了safeSetSuccess方法,
然後呼叫newPromise方法:

1 public ChannelPromise newPromise() {
2     return pipeline.newPromise();
3 }

給channel的pipeline繫結非同步操作ChannelPromise
然後呼叫doBind0方法完成ServerSocket的繫結,若是register0這個非同步操作還沒完成,就需要給ChannelFuture產生一個非同步操作的偵聽ChannelFutureListener物件,等到register0方法呼叫safeSetSuccess時,在promise的trySuccess中會回撥ChannelFutureListener的operationComplete方法,進而呼叫doBind0方法

doBind0方法:

 1 private static void doBind0(
 2         final ChannelFuture regFuture, final Channel channel,
 3         final SocketAddress localAddress, final ChannelPromise promise) {
 4     channel.eventLoop().execute(new Runnable() {
 5         @Override
 6         public void run() {
 7             if (regFuture.isSuccess()) {
 8                 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 9             } else {
10                 promise.setFailure(regFuture.cause());
11             }
12         }
13     });
14 }

向輪詢執行緒提交了一個任務,非同步處理bind,可以看到,只有在regFuture非同步操作成功結束後,呼叫channel的bind方法:

1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
2    return pipeline.bind(localAddress, promise);
3 }

實際上的bind又交給pipeline,去完成,pipeline中就會交給責任鏈去完成,最終會交給HeadContext完成:

1 public void bind(
2                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
3                 throws Exception {
4     unsafe.bind(localAddress, promise);
5 }

可以看到,繞了一大圈,交給了unsafe完成:

 1 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
 2     assertEventLoop();
 3 
 4     if (!promise.setUncancellable() || !ensureOpen(promise)) {
 5         return;
 6     }
 7     
 8     if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
 9         localAddress instanceof InetSocketAddress &&
10         !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
11         !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
12         logger.warn(
13                 "A non-root user can't receive a broadcast packet if the socket " +
14                 "is not bound to a wildcard address; binding to a non-wildcard " +
15                 "address (" + localAddress + ") anyway as requested.");
16     }
17 
18     boolean wasActive = isActive();
19     try {
20         doBind(localAddress);
21     } catch (Throwable t) {
22         safeSetFailure(promise, t);
23         closeIfClosed();
24         return;
25     }
26 
27     if (!wasActive && isActive()) {
28         invokeLater(new Runnable() {
29             @Override
30             public void run() {
31                 pipeline.fireChannelActive();
32             }
33         });
34     }
35 
36     safeSetSuccess(promise);
37 }

然而,真正的bind還是回調了doBind方法,最終是由NioServerSocketChannel來實現:

1 @Override
2 protected void doBind(SocketAddress localAddress) throws Exception {
3     if (PlatformDependent.javaVersion() >= 7) {
4         javaChannel().bind(localAddress, config.getBacklog());
5     } else {
6         javaChannel().socket().bind(localAddress, config.getBacklog());
7     }
8 }

在這裡終於完成了對JDK的ServerSocketChannel的bind操作


在上面的

1 if (!wasActive && isActive()) {
2     invokeLater(new Runnable() {
3         @Override
4         public void run() {
5             pipeline.fireChannelActive();
6         }
7     });
8 }

這個判斷,就是確保在register0中isActive時,還沒完成繫結,也就沒有beginRead操作來向Selector註冊ACCEPT事件,那麼就在這裡進行註冊,進而讓ServerSocket去偵聽客戶端的連線


在服務端ACCEPT到客戶端的連線後,在NioEventLoop輪詢中,就會呼叫processSelectedKeys處理ACCEPT的事件就緒,然後交給unsafe的read去處理  Netty中NioEventLoopGroup的建立原始碼分析

 

在服務端,由NioMessageUnsafe實現:

 1 public void read() {
 2         assert eventLoop().inEventLoop();
 3         final ChannelConfig config = config();
 4         final ChannelPipeline pipeline = pipeline();
 5         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 6         allocHandle.reset(config);
 7 
 8         boolean closed = false;
 9         Throwable exception = null;
10         try {
11             try {
12                 do {
13                     int localRead = doReadMessages(readBuf);
14                     if (localRead == 0) {
15                         break;
16                     }
17                     if (localRead < 0) {
18                         closed = true;
19                         break;
20                     }
21 
22                     allocHandle.incMessagesRead(localRead);
23                 } while (allocHandle.continueReading());
24             } catch (Throwable t) {
25                 exception = t;
26             }
27 
28             int size = readBuf.size();
29             for (int i = 0; i < size; i ++) {
30                 readPending = false;
31                 pipeline.fireChannelRead(readBuf.get(i));
32             }
33             readBuf.clear();
34             allocHandle.readComplete();
35             pipeline.fireChannelReadComplete();
36 
37             if (exception != null) {
38                 closed = closeOnReadError(exception);
39 
40                 pipeline.fireExceptionCaught(exception);
41             }
42 
43             if (closed) {
44                 inputShutdown = true;
45                 if (isOpen()) {
46                     close(voidPromise());
47                 }
48             }
49         } finally {
50             if (!readPending && !config.isAutoRead()) {
51                 removeReadOp();
52             }
53         }
54     }
55 }

核心在doReadMessages方法,由NioServerSocketChannel實現:

 1 protected int doReadMessages(List<Object> buf) throws Exception {
 2     SocketChannel ch = SocketUtils.accept(javaChannel());
 3 
 4     try {
 5         if (ch != null) {
 6             buf.add(new NioSocketChannel(this, ch));
 7             return 1;
 8         }
 9     } catch (Throwable t) {
10         logger.warn("Failed to create a new channel from an accepted socket.", t);
11 
12         try {
13             ch.close();
14         } catch (Throwable t2) {
15             logger.warn("Failed to close a socket.", t2);
16         }
17     }
18 
19     return 0;
20 }

SocketUtils的accept方法其實就是用來呼叫JDK中ServerSocketChannel原生的accept方法,來得到一個JDK的SocketChannel物件,然後通過這個SocketChannel物件,將其包裝成NioSocketChannel物件新增在buf這個List中

由此可以看到doReadMessages用來偵聽所有就緒的連線,包裝成NioSocketChannel將其放在List中
然後遍歷這個List,呼叫 NioServerSocketChannel的pipeline的fireChannelRead方法,傳遞channelRead請求,、
在前面向pipeline中添加了ServerBootstrapAcceptor這個ChannelHandler,此時,它也會響應這個請求,回撥channelRead方法:

 1 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 2     final Channel child = (Channel) msg;
 3 
 4     child.pipeline().addLast(childHandler);
 5 
 6     setChannelOptions(child, childOptions, logger);
 7 
 8     for (Entry<AttributeKey<?>, Object> e: childAttrs) {
 9         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
10     }
11 
12     try {
13         childGroup.register(child).addListener(new ChannelFutureListener() {
14             @Override
15             public void operationComplete(ChannelFuture future) throws Exception {
16                 if (!future.isSuccess()) {
17                     forceClose(child, future.cause());
18                 }
19             }
20         });
21     } catch (Throwable t) {
22         forceClose(child, t);
23     }
24 }

msg就是偵聽到的NioSocketChannel物件,給該物件的pipeline新增childHandler,也就是我們在ServerBootstrap中通過childHandler方法新增的
然後通過register方法完成對NioSocketChannel的註冊(和NioServerSocketChannel註冊邏輯一樣)


至此Netty服務端的啟動結