1. 程式人生 > >Netty-主從Reactor多執行緒模式的原始碼實現

Netty-主從Reactor多執行緒模式的原始碼實現

Netty--主從Reactor多執行緒模式的原始碼實現

總覽


EventLoopGroup到底是什麼?

EventLoopGroup是一個儲存EventLoop的容器,同時他應該具備執行緒池的功能。

graph BT; EventLoopGroup --> EventExecutorGroup; EventExecutorGroup --> ScheduledExecutorService;

由於EventLoopGroup間接繼承ScheduledExecutorService介面,因此其實現類應該具備執行緒池的功能。

看一下NioEventLoopGroup的核心屬性

    // 預設的執行緒池大小
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); // CPU核數 x 2
    }
    
    // 儲存EventLoop的陣列
    private final EventExecutor[] children;

構造方法

    // 如果傳入的nThread為空,那麼使用預設的執行緒池大小(CPU核數 x 2)
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

    // 最終的構建方法
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        // ......省略

        children = new EventExecutor[nThreads]; // 初始化EventExecutor陣列的大小

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args); // 初始化EventLoop
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // ......省略
            }
        }
        // ......省略
    }

當建立NioEventLoopGroup例項後,就已經初始化好其EventExecutor陣列的大小以及其儲存的EventLoop。

graph TB; NioEventLoopGroup --> array[EventExecutor陣列]; array --> E1[EventLoop]; array --CPU核數 x 2--> E2[EventLoop]; array --> E3[EventLoop...];

EventLoop又是什麼?

graph BT; EventLoop --> EventExecutor; NioEventLoop -->
EventLoop;

EventLoop是EventExecutor的子介面。

看一下NioEventLoop的核心屬性

  private Selector selector;
  
  private volatile Thread thread;
  
  private final Executor executor;
  
  private final Queue<Runnable> taskQueue;
  
  static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

每一個NioEventLoop都有一個Selector、Thread、Executor、Queue屬性,當建立EventLoop例項後其thread屬性仍為NULL,還沒有建立執行緒。


Channel

看一下Channel的核心屬性

private volatile EventLoop eventLoop; // 指向EventLoop的引用

每一個Channel都有一個指向EventLoop的引用,也就是說每一個Channel都會與一個EventLoop進行繫結,其eventLoop()方法返回此Channel繫結的EventLoop。

graph TB; EventLoop --> A[Channel]; EventLoop --> B[Channel]; EventLoop --> C[Channel...];

Netty啟動伺服器後的核心流程

  // 建立bossGroup和workerGroup
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  // 繫結埠,啟動服務
  serverBootstrap.bind(8888).sync();

AbstarctBootstrap的doBind()方法

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); // 初始化Channel並將Channel註冊到bossGroup中的EventLoop中的Selector
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() { 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {  // 當initAndRegister()方法完成後,由IO執行緒自動呼叫
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);   // 進行繫結操作
                    }
                }
            });
            return promise;
        }
    }
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); // 建立NioServerSocketChannel
            init(channel); // 初始化Channel
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        
        // config()方法返回AbstractBootStrap
        // AbstractBootStrap儲存bossGroup,ServerBootStrap儲存workerGroup
        ChannelFuture regFuture = config().group().register(channel); 
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

EventLoopGroup的register()方法會通過選擇器從EventLoopGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法

    // EventLoopGroup的register()方法
    public ChannelFuture register(Channel channel) {
        return next().register(channel); // 呼叫EventLoop的register()方法
    }
    
    /**
     * 通過選擇器從EventLoopGroup中取出一個EventLoop
     */
    public EventExecutor next() {
        return chooser.next();
    }

SingleThreadEventLoop的register()方法

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise); // 呼叫Channel的register()方法,傳遞當前物件,即當前EventLoop例項
        return promise;
    }

AbstarctChannel的register()方法

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }

        AbstractChannel.this.eventLoop = eventLoop; // 初始化Channel的EventLoop屬性
           
        // 第一次呼叫時,EventLoop中的thread為空,因此inEventLoop()返回false
        if (eventLoop.inEventLoop()) { 
            register0(promise); 
        } else {
            try {
                eventLoop.execute(new Runnable() { // 呼叫EventLoop的execute()方法,並將註冊操作作為一個任務,放入到Runnable例項中
                @Override
                public void run() {
                    // register0()方法最終將會呼叫doRegister()方法,將Channel註冊到EventLoop中的Selector
                    register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
           }
        }
     }

SingleThreadEventExecutor的execute()方法

     public void execute(Runnable task) {
         if (task == null) {
             throw new NullPointerException("task");
         }
 
         boolean inEventLoop = inEventLoop(); 
         addTask(task); // 將任務放入到當前EventLoop的任務佇列中
         if (!inEventLoop) {
             startThread(); // 開啟一個執行緒,最終呼叫doStartThread()方法
             if (isShutdown()) {
                 boolean reject = false;
                 try {
                     if (removeTask(task)) {
                         reject = true;
                     }
                 } catch (UnsupportedOperationException e) {
                 
                 }
                 if (reject) {
                     reject();
                 }
             }
         }

         if (!addTaskWakesUp && wakesUpForTask(task)) {
             wakeup(inEventLoop);
         }
     }

SingleThreadEventExecutor的doStartThread()方法

     private void doStartThread() {
         assert thread == null; // 當前EventLoop中的thread為空,輸出結果肯定為true
         executor.execute(new Runnable() { // 向執行緒池中提交一個任務,到這裡才建立執行緒同時非同步執行
             @Override
             public void run() {
                 thread = Thread.currentThread(); // 初始化EventLoop中的thread屬性,即執行緒池中執行該任務的執行緒
                 if (interrupted) {
                     thread.interrupt();
                 }

                 boolean success = false;
                 updateLastExecutionTime();
                 try {
                     SingleThreadEventExecutor.this.run(); // 核心方法(當前物件為NioEventLoop)
                     success = true;
                 } catch (Throwable t) {
                     logger.warn("Unexpected exception from an event executor: ", t);
                 } finally {
                     // ......省略
                 }
             }
         });
     }

NioEventLoop的run()方法

     protected void run() {
         for (;;) { // 死迴圈
             try {
                 try {
                     switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                     case SelectStrategy.CONTINUE:
                         continue;

                     case SelectStrategy.BUSY_WAIT:

                     case SelectStrategy.SELECT:
                         select(wakenUp.getAndSet(false));

                         if (wakenUp.get()) {
                             selector.wakeup();
                         }
                     default:
                     }
                 } catch (IOException e) {
                     rebuildSelector0();
                     handleLoopException(e);
                     continue;
                 }

                 cancelledKeys = 0;
                 needsToSelectAgain = false;
                 final int ioRatio = this.ioRatio;
                 if (ioRatio == 100) {
                     try {
                         processSelectedKeys(); // 處理Channel的就緒事件,最終呼叫processSelectedKeysOptimized()方法
                     } finally {
                         runAllTasks(); // 執行儲存在任務佇列中的所有任務
                     }
                 } else {
                     final long ioStartTime = System.nanoTime();
                     try {
                         processSelectedKeys();
                     } finally {
                         // Ensure we always run tasks.
                         final long ioTime = System.nanoTime() - ioStartTime;
                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                     }
                 }
             } catch (Throwable t) {
                 handleLoopException(t);
             }
             try {
                 if (isShuttingDown()) {
                     closeAll();
                     if (confirmShutdown()) {
                         return;
                     }
                 }
             } catch (Throwable t) {
                 handleLoopException(t);
             }
         }
     }
     private void processSelectedKeysOptimized() {
         for (int i = 0; i < selectedKeys.size; ++i) { // 遍歷就緒的Channel對應的SelectionKey集合,如果Channel沒有事件就緒則集合為空
             final SelectionKey k = selectedKeys.keys[i];
             selectedKeys.keys[i] = null;

             final Object a = k.attachment();

             if (a instanceof AbstractNioChannel) {
                 processSelectedKey(k, (AbstractNioChannel) a);
             } else {
                 @SuppressWarnings("unchecked")
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 processSelectedKey(k, task);
             }

             if (needsToSelectAgain) {
                 selectedKeys.reset(i + 1);

                 selectAgain();
                 i = -1;
             }
         }
     }

NioEventLoop的processSelectedKey()方法

     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
         if (!k.isValid()) {
             final EventLoop eventLoop;
             try {
                 eventLoop = ch.eventLoop();
             } catch (Throwable ignored) {
                 return;
             }
             if (eventLoop != this || eventLoop == null) {
                 return;
             }
             unsafe.close(unsafe.voidPromise());
             return;
         }

         try {
             int readyOps = k.readyOps(); // 就緒的事件
             if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 連線就緒
                 int ops = k.interestOps();
                 ops &= ~SelectionKey.OP_CONNECT;
                 k.interestOps(ops);
                 unsafe.finishConnect(); // 處理連線就緒
             }
           
            if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 寫就緒
                 ch.unsafe().forceFlush(); // 處理寫就緒
             }
           
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 接收和讀就緒
                 unsafe.read(); // 處理接收和讀就緒
             }
         } catch (CancelledKeyException ignored) {
             unsafe.close(unsafe.voidPromise());
         }
     }

當EventLoop中的執行緒處理完Channel的就緒事件後將會執行儲存在任務佇列中的所有任務,此時註冊任務將被執行

     private void register0(ChannelPromise promise) {
         try {
             if (!promise.setUncancellable() || !ensureOpen(promise)) {
                 return;
             }
             boolean firstRegistration = neverRegistered;
             doRegister(); // 執行註冊操作
             neverRegistered = false;
             registered = true;

             pipeline.invokeHandlerAddedIfNeeded();

             safeSetSuccess(promise); // ChannelFuture設定成成功狀態,同時isDone()方法將返回true
             pipeline.fireChannelRegistered();
             if (isActive()) {
                 if (firstRegistration) {
                     pipeline.fireChannelActive();
                 } else if (config().isAutoRead()) {
                     beginRead();
                 }
             }
          } catch (Throwable t) {
             closeForcibly();
             closeFuture.setClosed();
             safeSetFailure(promise, t); // ChannelFuture設定成失敗狀態,同時isDone()方法將返回true
          }
     }

AbstractNioChannel的doRegister()方法

     protected void doRegister() throws Exception {
         boolean selected = false;
         for (;;) {
             try {
                 // 將Channel註冊到EventLoop中的Selector
                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 將Channel註冊到Selector當中,當Channel執行完相應操作後,再向Selector傳遞需要監聽此Channel的事件型別
                 return;
             } catch (CancelledKeyException e) {
                 if (!selected) {
                     eventLoop().selectNow();
                     selected = true;
                 } else {
                     throw e;
                 }
             }
         }
     }

回到一開始的AbstarctBootstrap的doBind()方法

     private ChannelFuture doBind(final SocketAddress localAddress) {
         final ChannelFuture regFuture = initAndRegister(); 
         final Channel channel = regFuture.channel(); 
         if (regFuture.cause() != null) {
             return regFuture;
         }

         if (regFuture.isDone()) { // 由於註冊任務是非同步執行的,此時任務還未被執行,因此isDone()方法將返回false
             ChannelPromise promise = channel.newPromise();
             doBind0(regFuture, channel, localAddress, promise);
             return promise;
         } else {
             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
             regFuture.addListener(new ChannelFutureListener() { 
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {  // 當註冊任務被執行完畢後,由IO執行緒自動
                     Throwable cause = future.cause();
                     if (cause != null) {
                         promise.setFailure(cause);
                     } else {
                         promise.registered();
                         doBind0(regFuture, channel, localAddress, promise); // 進行繫結操作
                     }
                 }
             });
             return promise;
         }
     }
     private static void doBind0(
         final ChannelFuture regFuture, final Channel channel,
         final SocketAddress localAddress, final ChannelPromise promise) {

         channel.eventLoop().execute(new Runnable() { // 同樣的,獲取Channel繫結的EventLoop,呼叫EventLoop的execute()方法,並將繫結操作作為一個任務,放入到Runnable例項中
         @Override
         public void run() {
             if (regFuture.isSuccess()) {
                   channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                   promise.setFailure(regFuture.cause());
             }
         }
        });
      }
     public void execute(Runnable task) {
         if (task == null) {
             throw new NullPointerException("task");
         }

         boolean inEventLoop = inEventLoop(); // 由於當前方法是被處理註冊任務的那個執行緒呼叫的,即EventLoop中的那個執行緒,因此inEventLoop()方法返回true
         addTask(task); // 將任務放入到佇列中,等待被執行
         if (!inEventLoop) { // 不會執行,到目前為止仍然只開啟了一個執行緒
             startThread();
             if (isShutdown()) {
                 boolean reject = false;
                 try {
                     if (removeTask(task)) {
                         reject = true;
                     }
                 } catch (UnsupportedOperationException e) {
                    
                 }
                 if (reject) {
                     reject();
                 }
             }
         }

         if (!addTaskWakesUp && wakesUpForTask(task)) {
             wakeup(inEventLoop);
         }
     }

這就是Netty啟動服務端後的核心流程

1.建立ServerSocketChannel並進行初始化。

2.呼叫bossGroup的register()方法,方法內部通過選擇器從bossGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法。

3.最終呼叫Channel的register()方法,方法中初始化Channel的eventLoop屬性,然後將Channel註冊到bossGroup中的EventLoop中的Selector作為一個任務,放入到Runnable例項中,然後呼叫EventLoop的execute(Runnable)方法。

4.execute()方法將任務放入到任務隊列當中,然後向執行緒池中提交一個任務,此時才建立一個執行緒,同時初始化EventLoop中的thread屬性。

5.任務中初始化EventLoop的thread屬性,然後呼叫NioEventLoop的run()方法,死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務。

6.當註冊任務被執行完畢後,該執行緒會回撥ChannelFutureListener中的operationComplete()方法,將繫結操作作為一個任務,然後呼叫EventLoop的execute(Runnable)方法。

7.重複第4步驟,將任務放入到任務佇列中,由於當前執行緒就是EventLoop中的thread,因此inEventLoop()方法返回true,不會向執行緒池中提交任務,任務等待被EventLoop中的執行緒執行。


Netty處理接收和讀就緒事件的核心流程

BossGroup中的EventLoop中的Thread正在死迴圈的處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務

// 建立一個連線
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));

BossGroup中的EventLoop中的Selector監聽到ServerSocketChannel有接收就緒事件

        // NioEventLoop的processSelectedKey()方法
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
        
        public void read() {
            assert eventLoop().inEventLoop(); // 肯定是true
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf); // 讀取Channel中的資料
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i)); // 最終會呼叫ServerBootStrap的register()方法
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

NioServerSocketChannel的doReadMessage()方法

     protected int doReadMessages(List<Object> buf) throws Exception {
         SocketChannel ch = SocketUtils.accept(javaChannel()); // 接收連線,呼叫ServerSocketChannel的accept()方法

         try {
             if (ch != null) {
                 buf.add(new NioSocketChannel(this, ch)); // 將接收到連線放入到buffer中
                 return 1;
             }
         } catch (Throwable t) {
             logger.warn("Failed to create a new channel from an accepted socket.", t);

             try {
                 ch.close();
             } catch (Throwable t2) {
                 logger.warn("Failed to close a socket.", t2);
             }
         }

         return 0;
     }

ServerBootstrap的register()方法

     public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

         child.pipeline().addLast(childHandler);

         setChannelOptions(child, childOptions, logger);
         setAttributes(child, childAttrs);

         try {
              // 呼叫workerGroup的register()方法,方法內部通過選擇器從workerGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法,最終呼叫AbstractChannel的register()方法
              childGroup.register(child).addListener(new ChannelFutureListener() {
              @Override
              public void operationComplete(ChannelFuture future) throws Exception {
                  if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                  }
              }
             });
          } catch (Throwable t) {
              forceClose(child, t);
          }
      }

AbstractChannel的register()方法

     public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
              promise.setFailure(new IllegalStateException("registered to an event loop already"));
              return;
        }
        if (!isCompatible(eventLoop)) {
              promise.setFailure(
                     new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
              return;
        }

        AbstractChannel.this.eventLoop = eventLoop; // 初始化Channel的eventLoop屬性

        // 如果取出的是新的EventLoop,那麼其thread屬性為空,當前執行緒總是bossGroup中的EventLoop中的thread,因此inEventLoop()返回false。
        // 如果取出的是舊的EventLoop,那麼其thread屬性不為空,當前執行緒總是bossGroup中的EventLoop中的thread,因此inEventLoop()返回false。
        if (eventLoop.inEventLoop()) { // 總是返回false,當前執行緒總是bossGroup中的EventLoop中的thread,肯定與workerGroup中的任意一個EventLoop中的thread都不相等。
              register0(promise);
         } else {
             try {
                 eventLoop.execute(new Runnable() { // 總是會執行該方法
                     @Override
                     public void run() {
                        register0(promise);
                     }
                  });
             } catch (Throwable t) {
                  logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
                  closeForcibly();
                  closeFuture.setClosed();
                  safeSetFailure(promise, t);
             }
        }
     }
     public void execute(Runnable task) {
         if (task == null) {
             throw new NullPointerException("task");
         }

         boolean inEventLoop = inEventLoop();
         addTask(task); // 將任務放入到隊列當中
         if (!inEventLoop) { 
             startThread(); // 不管是新的還是舊的EventLoop都會呼叫該方法 
             if (isShutdown()) {
                 boolean reject = false;
                 try {
                     if (removeTask(task)) {
                         reject = true;
                     }
                 } catch (UnsupportedOperationException e) {
                
                 }
                 if (reject) {
                     reject();
                 }
             }
         }

         if (!addTaskWakesUp && wakesUpForTask(task)) {
             wakeup(inEventLoop);
         }
     }
     private void startThread() {
         if (state == ST_NOT_STARTED) { // 如果取出的是舊的EventLoop,那麼其thread屬性本身就不為空,因此其state屬性就不等於ST_NOT_STARTED,因此不會開啟新的執行緒,註冊任務等待被EventLoop中的執行緒執行
             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                 boolean success = false;
                 try {
                     doStartThread(); // 取出的是新的EventLoop,其thread屬性為null,同時其state等於ST_NOT_STARTED,因此需要開啟執行緒,向執行緒池中提交一個任務,死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務
                     success = true;
                 } finally {
                     if (!success) {
                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                     }
                 }
             }
         }
     }

這就是Netty處理接收和讀就緒事件的核心流程

1.客戶端建立SocketChannel並進行連線。

2.bossGroup中的EventLoop中的Selector監聽到ServerSocketChannel有接收就緒事件。

3.接收連線,最終呼叫workerGroup的register()方法,方法內部通過選擇器從workGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法。

4.最終呼叫Channel的register()方法,方法中初始化Channel的eventLoop屬性,然後將Channel註冊到bossGroup中的EventLoop中的Selector作為一個任務,放入到Runnable例項中,然後呼叫EventLoop的execute(Runnable)方法。

5.execute()方法將任務放入到任務佇列中,如果取出的是新的EventLoop,那麼其thread屬性為空,此時將會開啟執行緒,向執行緒池中提交一個任務,死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務,如果取出的是舊的EventLoop,那麼其thread屬性不為空,任務等待被EventLoop中的執行緒執行。


總結

1.當建立NioEventLoopGroup例項後,就已經初始化好其EventExecutor陣列的大小以及其儲存的EventLoop。

2.每一個NioEventLoop都有一個Selector、Thread、Executor、Queue屬性,當建立NioEventLoop例項後,其thread屬性仍為空。

3.每一個Channel都會與一個EventLoop進行繫結,其eventLoop()方法返回其繫結的EventLoop,同時該Channel會註冊到其繫結的EventLoop的Selector中。

4.EventLoopGroup的register()方法會通過選擇器從EventLoopGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法。

5.EventLoop的execute()方法會將任務放入到任務隊列當中,如果inEventLoop()方法返回false同時其thread屬性為空,則建立一個執行緒,向執行緒池中提交一個任務(任務中初始化EventLoop中的thread屬性,然後死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務