1. 程式人生 > >Netty 服務端啟動過程

Netty 服務端啟動過程

tag debug 分析 accept src bootstra print don lse

  在 Netty 中創建 1 個 NioServerSocketChannel 在指定的端口監聽客戶端連接,這個過程主要有以下 個步驟:

  1. 創建 NioServerSocketChannel
  2. 初始化並註冊 NioServerSocketChannel
  3. 綁定指定端口

  首先列出一個簡易服務端的啟動代碼:

 1 public void start() {
 2     EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 3     EventLoopGroup workerGroup = new NioEventLoopGroup();
4 try { 5 ServerBootstrap sbs = new ServerBootstrap() 6 //添加 group 7 .group(bossGroup, workerGroup) 8 //指定服務端 Channel 類型 9 .channel(NioServerSocketChannel.class) 10 //添加服務端 Channel 的 Handler 11 .handler(new
HelloWorldServerHandler()) 12 //添加客戶端 Channel 的 Handler 13 .childHandler(new ChannelInitializer<NioSocketChannel>() { 14 @Override 15 protected void initChannel(NioSocketChannel ch) throws Exception { 16 //為後續接入的客戶端 Channel 準備的字符串
編解碼 Handler 17 ch.pipeline().addLast(new StringDecoder()); 18 ch.pipeline().addLast(new StringEncoder()); 19 } 20 }); 21 //監聽指定的端口 22 ChannelFuture future = sbs.bind(port).sync(); 23 System.out.println("Server start listen at " + port); 24 future.channel().closeFuture().sync(); 25 } catch (Exception e) { 26 bossGroup.shutdownGracefully(); 27 workerGroup.shutdownGracefully(); 28 } 29 }

  下面就從 ServerBootstrap 的 bind(int port)方法開始分析服務端的 NioServerSocketChannel 的創建過程。

1. 創建 NioServerSocketChannel

  跟隨 bind 方法的調用,最終在 AbstractBootstrap 類的 doBind()方法找到了初始化,註冊和綁定方法調用:

 1 private ChannelFuture doBind(final SocketAddress localAddress) {
 2     //初始化並註冊
 3     final ChannelFuture regFuture = initAndRegister();
 4     final Channel channel = regFuture.channel();
 5     if (regFuture.cause() != null) {
 6         return regFuture;
 7     }
 8 
 9     if (regFuture.isDone()) {
10         // At this point we know that the registration was complete and successful.
11         ChannelPromise promise = channel.newPromise();
12         //綁定本地端口
13         doBind0(regFuture, channel, localAddress, promise);
14         return promise;
15     } else {
16         //....
17     }
18 }

2.

  • 初始化並註冊 NioServerSocketChannel

  首先來看一下這個 initAndRegister()方法:

 1 final ChannelFuture initAndRegister() {
 2     Channel channel = null;
 3     try {
 4         //創建 Channel
 5         channel = channelFactory.newChannel();
 6         //初始化 Channel
 7         init(channel);
 8     } catch (Throwable t) {
 9         //...
10     }
11 
12     //註冊
13     ChannelFuture regFuture = config().group().register(channel);
14     if (regFuture.cause() != null) {
15         if (channel.isRegistered()) {
16             channel.close();
17         } else {
18             channel.unsafe().closeForcibly();
19         }
20     }
21     //...
22 }

  Channel 也是通過工廠類來創建的,這個工廠默認是 ReflectiveChannelFactory,是在前面啟動代碼中,設置服務端 Channel 類型時創建的。通過名字可以知道,是用反射的方式創建了 Channel 對象。

  init()方法有兩種實現,這裏分析的是 ServerBootstrap 的實現:

 1 @Override
 2 void init(Channel channel) throws Exception {
 3     //... option 的設置省略掉
 4     //pipeline 的創建,默認使用的 DefaultPipeline
 5     ChannelPipeline p = channel.pipeline();
 6 
 7     //... 客戶端 Channel 相關配置的保存
 8 
 9     p.addLast(new ChannelInitializer<Channel>() {
10         @Override
11         public void initChannel(Channel ch) throws Exception {
12             final ChannelPipeline pipeline = ch.pipeline();
13             //這裏添加的是啟動代碼中,服務端的 Handler
14             ChannelHandler handler = config.handler();
15             if (handler != null) {
16                 pipeline.addLast(handler);
17             }
18 
19             // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
20             // In this case the initChannel(...) method will only be called after this method returns. Because
21             // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
22             // placed in front of the ServerBootstrapAcceptor.
23             ch.eventLoop().execute(new Runnable() {
24                 @Override
25                 public void run() {
26                     //這裏添加了一個 Accepter,用來處理新連接的接入
27                     pipeline.addLast(new ServerBootstrapAcceptor(
28                             currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
29                 }
30             });
31         }
32     });
33 }

  初始化 Channel 這個動作,主要做了 4 件事:

  1. 創建 pipeline
  2. 為 Channel 添加用戶創建的 Handler
  3. 添加 Accepter
  4. 其他屬性的設置

  接下來分析 Channel 的註冊,需要關註的是這行代碼:

1 ChannelFuture regFuture = config().group().register(channel);

  config()方法獲取了啟動時創建的 config 對象,這個對象的 group()方法就返回了啟動時傳入的 bossGroup。啟動代碼中傳入了兩個 group,返回的為什麽是 boosGroup 呢?查看啟動代碼中的 group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,在它第一行就調用了 super.group(parentGroup),將第一個 group 對象傳給了父類 AbstractBootstrap。而此處 config 調用的 group()方法返回的正是父類中的 group。

  因為這裏是一個 NioEventLoopGroup 對象,所以使用的 register(channel)方法是 MultithreadEventLoopGroup 中的。

1 @Override
2 public ChannelFuture register(Channel channel) {
3     return next().register(channel);
4 }

  查看 next()方法可以發現,最終是調用之前創建 group 時創建的 chooser 的 next()方法,該方法會返回一個 NioEventLooop 對象(EventLoop 是在這裏分配的),它的 register()方法是在父類 SingleThreadEventLoop 中實現的。最終調用了 AbstractChannel 中的註冊方法。

 1 @Override
 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 3     //...
 4     //將前面返回的 eventLoop 保存起來
 5     AbstractChannel.this.eventLoop = eventLoop;
 6     //判斷 eventLoop 中的 thread 是否是當前線程
 7     //初次啟動時,eventLoop 中的 thread 為 null
 8     if (eventLoop.inEventLoop()) {
 9         register0(promise);
10     } else {
11         try {
12             //將註冊任務傳進去
13             eventLoop.execute(new Runnable() {
14                 @Override
15                 public void run() {
16                     //註冊
17                     register0(promise);
18                 }
19             });
20         } catch (Throwable t) {
21             //...
22         }
23     }
24 }

  將註冊動作封裝成一個任務,然後交給 eventLoop 對象處理。

@Override
public void execute(Runnable task) {
    //...
    //這裏通用是 false
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        //啟動線程
        startThread();
        addTask(task);//將前面傳進來的註冊任務添加進隊列
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

private void startThread() {
    //判斷是否需要啟動線程
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            //啟動線程
            doStartThread();
        }
    }
}

  上面代碼中的 startThread()方法有個 STATE_UPDATER,它是用來更新該對象的 state 屬性,是一個線程安全的操作。state 默認值為 ST_NOT_STARTED,所以第一次進入該方法,條件判斷為 true,接下來進行 CAS 操作,將 state 設置為 ST_STARTED,然後調用 doStartThread()方法。當 group 中的線程都啟用之後,下一次 chooser 再選中這個線程,startThread()方法中的第一個 if 的條件判斷就是 false 了,不會再創建新的線程。

 1 private void doStartThread() {
 2     assert thread == null;
 3     //這個 executor 就是構建 group 時,創建出來的 executor
 4     executor.execute(new Runnable() {
 5         @Override
 6         public void run() {
 7             thread = Thread.currentThread();
 8             if (interrupted) {
 9                 thread.interrupt();
10             }
11 
12             boolean success = false;
13             updateLastExecutionTime();
14             try {
15                 //前面創建的是 NioEventLoop
16                 SingleThreadEventExecutor.this.run();
17                 success = true;
18             } catch (Throwable t) {
19                 logger.warn("Unexpected exception from an event executor: ", t);
20             } finally {
21                 for (;;) {
22                     //更新 state
23                     int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
24                     if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
25                             SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
26                         break;
27                     }
28                 }
29                 //...
30             }
31         }
32     });
33 }

  前一篇分析 EventLoopGroup 創建時說過,會在 EventLoop 保存一個 executor 對象的引用,最終個任務就是交給這個 executor 來處理的。executor 的 execute(Runnable task) 方法會創建新線程,並執行傳入的 task。接下來看一下 NioEventLoop 中的 run() 方法。

 1 protected void run() {
 2     for (;;) {
 3         try {
 4             //計算 select 策略,當前有任務時,會進行一次 selectNow(NIO),返回就緒的 key 個數
 5             //顯然 switch 中沒有匹配項,直接跳出 switch
 6             //無任務時,則直接返回 SelectStrategy.SELECT
 7             //這裏的 SelectStrategy.CONTINUE 感覺不會匹配到
 8             switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 9                 case SelectStrategy.CONTINUE:
10                     continue;
11                 case SelectStrategy.SELECT:
12                     //當沒有可處理的任務時,直接進行 select 操作
13                     // wakenUp.getAndSet(false) 返回的是 oldValue,由於默認值是 false
14                     // 所以第一次返回的是 false
15                     select(wakenUp.getAndSet(false));
16 
17                     // ‘wakenUp.compareAndSet(false, true)‘ is always evaluated
18                     // before calling ‘selector.wakeup()‘ to reduce the wake-up
19                     // overhead. (Selector.wakeup() is an expensive operation.)
20                     //
21                     // However, there is a race condition in this approach.
22                     // The race condition is triggered when ‘wakenUp‘ is set to
23                     // true too early.
24                     //
25                     // ‘wakenUp‘ is set to true too early if:
26                     // 1) Selector is waken up between ‘wakenUp.set(false)‘ and
27                     //    ‘selector.select(...)‘. (BAD)
28                     // 2) Selector is waken up between ‘selector.select(...)‘ and
29                     //    ‘if (wakenUp.get()) { ... }‘. (OK)
30                     //
31                     // In the first case, ‘wakenUp‘ is set to true and the
32                     // following ‘selector.select(...)‘ will wake up immediately.
33                     // Until ‘wakenUp‘ is set to false again in the next round,
34                     // ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore
35                     // any attempt to wake up the Selector will fail, too, causing
36                     // the following ‘selector.select(...)‘ call to block
37                     // unnecessarily.
38                     //
39                     // To fix this problem, we wake up the selector again if wakenUp
40                     // is true immediately after selector.select(...).
41                     // It is inefficient in that it wakes up the selector for both
42                     // the first case (BAD - wake-up required) and the second case
43                     // (OK - no wake-up required).
44 
45                     if (wakenUp.get()) {
46                         selector.wakeup();
47                     }
48                 default:
49                     // fallthrough
50             }
51 
52             cancelledKeys = 0;
53             needsToSelectAgain = false;
54             final int ioRatio = this.ioRatio;
55             //根據比例來處理 IO 事件和任務
56             if (ioRatio == 100) {
57                 try {
58                     //處理就緒的 key
59                     processSelectedKeys();
60                 } finally {
61                     // Ensure we always run tasks.
62                     //執行任務
63                     runAllTasks();
64                 }
65             } else {
66                 final long ioStartTime = System.nanoTime();
67                 try {
68                     processSelectedKeys();
69                 } finally {
70                     // Ensure we always run tasks.
71                     // 計算出處理 IO 事件的時間,然後根據比例算出執行任務的時間
72                     final long ioTime = System.nanoTime() - ioStartTime;
73                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
74                 }
75             }
76         } catch (Throwable t) {
77             handleLoopException(t);
78         }
79         // Always handle shutdown even if the loop processing threw an exception.
80         try {
81             if (isShuttingDown()) {
82                 closeAll();
83                 if (confirmShutdown()) {
84                     return;
85                 }
86             }
87         } catch (Throwable t) {
88             handleLoopException(t);
89         }
90     }
91 }

  run()方法主要是做 select 操作,和處理 IO 事件和任務隊列中的任務,這部分內容下一篇文章再分析。從 executor 執行 execute()方法開始,由 Netyy 管理的線程就開始啟動運行了。實際上此時的 NioServerSocketChannel 對象還沒有註冊到 Netty 線程的 Selector 上,Debug 結果如下圖:

技術分享圖片

  上圖中的 startThread()方法實際上是給 executor 提交了一個任務,緊接著 main 線程就調用了 addTask()方法,將 task 添加到 EventLoop 對象的任務隊列中,而這個 task 的內容就是執行註冊操作。在添加了註冊任務之後,Netty 線程就會在 select 完成後,執行隊列中的任務,將 NioServerSocketChannel 註冊到該線程的 Selector 上。接下來分析一下 AbstractChannel 的 register0()方法:

 1 private void register0(ChannelPromise promise) {
 2     try {
 3         // check if the channel is still open as it could be closed in the mean time when the register
 4         // call was outside of the eventLoop
 5         if (!promise.setUncancellable() || !ensureOpen(promise)) {
 6             return;
 7         }
 8         boolean firstRegistration = neverRegistered;
 9         //註冊通道
10         doRegister();
11         neverRegistered = false;
12         registered = true;
13 
14         // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
15         // user may already fire events through the pipeline in the ChannelFutureListener.
16         //添加服務端 Channel 的 Handler
17         pipeline.invokeHandlerAddedIfNeeded();
18 
19         safeSetSuccess(promise);
20         //觸發通道註冊事件在 pipeline 上傳播
21         pipeline.fireChannelRegistered();
22         // Only fire a channelActive if the channel has never been registered. This prevents firing
23         // multiple channel actives if the channel is deregistered and re-registered.
24         if (isActive()) {//第一次運行到這兒時,結果為 false,因為此時還沒有 bind
25             if (firstRegistration) {
26                 pipeline.fireChannelActive();
27             } else if (config().isAutoRead()) {
28                 // This channel was registered before and autoRead() is set. This means we need to begin read
29                 // again so that we process inbound data.
30                 //
31                 // See https://github.com/netty/netty/issues/4805
32                 beginRead();
33             }
34         }
35     } catch (Throwable t) {
36         // Close the channel directly to avoid FD leak.
37         closeForcibly();
38         closeFuture.setClosed();
39         safeSetFailure(promise, t);
40     }
41 }

  doRegister()方法實際上就是 Java NIO 中將通道註冊到 Selector 上的操作:

1 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//這裏感興趣的事件傳入的是 0

  pipeline.invokeHandlerAddedIfNeeded() 和 pipeline.fireChannelRegistered() 則是用來添加 Handler 並觸發 Handler 別添加的事件的動作。

  在 isActive()這個方法,由於當前是 NioServerSocketChannel,所以實際上是判斷當前通道是否成功綁定到一個地址,很顯然到目前為止,只是創建了通道並註冊到 Selector 上,還沒由綁定。

3. 綁定指定端口

  在 initAndRegister()方法結束後,main 線程開始調用 doBind0()方法,該方法將綁定操作封裝成任務交給 Netty 線程去執行。最後,調用 DefaultPipeline 中的 HeadContext 的 bind()方法,然後通過 unsafe.bind(localAddress,promise)完成綁定:

 1 @Override
 2 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
 3     //...
 4     //顯然這裏返回的是 false
 5     boolean wasActive = isActive();
 6     try {
 7         //綁定操作
 8         doBind(localAddress);
 9     } catch (Throwable t) {
10         safeSetFailure(promise, t);
11         closeIfClosed();
12         return;
13     }
14 
15     if (!wasActive && isActive()) {
16         invokeLater(new Runnable() {
17             @Override
18             public void run() {
19                 //這裏才是觸發服務端 Channel 激活事件的地方
20                 pipeline.fireChannelActive();
21             }
22         });
23     }
24 
25     safeSetSuccess(promise);
26 }

  這個過程,建議 Debug 跟一下代碼,比較清楚代碼是如何一步一步到 HeadContext 中來的。接下來分析一下 doBind()方法:

1 @Override
2 protected void doBind(SocketAddress localAddress) throws Exception {
3     if (PlatformDependent.javaVersion() >= 7) {
4         javaChannel().bind(localAddress, config.getBacklog());
5     } else {
6         javaChannel().socket().bind(localAddress, config.getBacklog());
7     }
8 }

  最終是根據平臺及其 Java 版本來調用 JDK 中的綁定方法。在綁定完成後,會觸發通道激活事件,在 HeadContext 中經過時,發現它裏面有這麽一行代碼:

1 readIfIsAutoRead();

  Debug 一下,發現這個方法最終會調用到 HeadContext 的 read()方法,該方法是調用了 unsafe.beginRead(),緊接著就到了 AbstractNioChannel 的 doBeginRead()方法:

 1 @Override
 2 protected void doBeginRead() throws Exception {
 3     // Channel.read() or ChannelHandlerContext.read() was called
 4     final SelectionKey selectionKey = this.selectionKey;
 5     if (!selectionKey.isValid()) {
 6         return;
 7     }
 8 
 9     readPending = true;
10 
11     final int interestOps = selectionKey.interestOps();
12     if ((interestOps & readInterestOp) == 0) {//說明對 OP_ACCEPT 不感興趣
13         selectionKey.interestOps(interestOps | readInterestOp);//通過 | 修改感興趣的事件
14     }
15 }

  前面通過反射創建 NioServerSocketChannel 對象時,調用了父類也就是 AbstractNioChannel 的構造方法,將 readInterestOp 設置為 16 了,在 NIO 中就是 OP_ACCEPT。從此,該 NioServerSocketChannel 就可以接收客戶端連接了。

4. 總結

  在 Netty 服務端啟動過程中,主線程僅僅是創建了 EventLoopGroup 和啟動引導對象,然後發起綁定操作。這個過程中的綁定,註冊等操作都是主線程封裝成任務交給 Netty 線程去執行的。

  由於 Netty 代碼中抽象類和接口都比較多,所以某些地方調用的方法有很多種實現,不熟悉的時候可以通過 Debug 來確定。 

Netty 服務端啟動過程