Netty NioEventLoop 建立過程原始碼分析
原文: ofollow,noindex">wangwei.one/posts/netty…
前面 ,我們分析了Netty中的Channel元件,本篇我們來介紹一下與Channel關聯的另一個核心的元件 —— EventLoop 。
Netty版本:4.1.30
概述
EventLoop定義了Netty的核心抽象,用於處理網路連線生命週期中所有發生的事件。
我們先來從一個比較高的視角來了解一下Channels、Thread、EventLoops、EventLoopGroups之間的關係。

上圖是表示了擁有4個EventLoop的EventLoopGroup處理IO的流程圖。它們之間的關係如下:
- 一個 EventLoopGroup包含一個或多個EventLoop
- 一個 EventLoop在它的生命週期內只和一個Thread繫結
- 所有由EventLoop處理的I/O事件都將在它專有的Thread上被處理
- 一個Channel在它的生命週期內只註冊於一個EventLoop
- 一個EventLoop可能會被分配給一個或多個Channel
EventLoop 原理
下圖是Netty EventLoop相關類的UML圖。從中我們可以看到EventLoop相關的類都是實現了 java.util.concurrent
包中的 ExecutorService 介面。我們可以直接將任務(Runable 或 Callable) 提交給EventLoop去立即執行或定時執行。

例如,使用EventLoop去執行定時任務,樣例程式碼:
public static void scheduleViaEventLoop() { Channel ch = new NioSocketChannel(); ScheduledFuture<?> future = ch.eventLoop().schedule( () -> System.out.println("60 seconds later"), 60, TimeUnit.SECONDS); } 複製程式碼
Thread 管理
Netty執行緒模型的高效能主要取決於當前所執行執行緒的身份的確定。一個執行緒提交到EventLoop執行的流程如下:
- 將Task任務提交給EventLoop執行
- 在Task傳遞到execute方法之後,檢查當前要執行的Task的執行緒是否是分配給EventLoop的那個執行緒
- 如果是,則該執行緒會立即執行
- 如果不是,則將執行緒放入任務佇列中,等待下一次執行
其中,Netty中的每一個EventLoop都有它自己的任務佇列,並且和其他的EventLoop的任務佇列獨立開來。

Thread 分配
服務於Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根據不同的傳輸實現,EventLoop的建立和分配方式也不同。
NIO傳輸

在NIO傳輸方式中,使用盡可能少的EventLoop就可以服務多個Channel。如圖所示,EventLoopGroup採用順序迴圈的方式負責為每一個新建立的Channel分配EventLoop,每一個EventLoop會被分配給多個Channels。
一旦一個Channel被分配給了一個EventLoop,則這個Channel的生命週期內,只會繫結這個EventLoop。這就讓我們在ChannelHandler的實現省去了對執行緒安全和同步問題的擔心。
OIO傳輸

與NIO方式的不同在於,一個EventLoop只會服務於一個Channel。
NioEventLoop & NioEventLoopGroup 建立
初步瞭解了 EventLoop 以及 EventLoopGroup 的工作機制,接下來我們以 NioEventLoopGroup 為例,來深入分析 NioEventLoopGroup 是如何建立的,又是如何啟動的,它的內部執行邏輯又是怎樣的等等問題。
MultithreadEventExecutorGroup 構造器
我們從 NioEventLoopGroup 的建構函式開始分析:
EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup(1); 複製程式碼
NioEventLoopGroup建構函式會呼叫到父類 MultithreadEventLoopGroup 的建構函式,預設情況下,EventLoop的數量 = 處理器數量 x 2:
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); private static final int DEFAULT_EVENT_LOOP_THREADS; // 預設情況下,EventLoop的數量 = 處理器數量 x 2 static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args){ super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ... } 複製程式碼
繼續呼叫父類,會呼叫到 MultithreadEventExecutorGroup 的構造器,主要做三件事情:
- 建立執行緒任務執行器 ThreadPerTaskExecutor
- 通過for迴圈建立數量為 nThreads 個的 EventLoop
- 建立 EventLoop 選擇器 EventExecutorChooser
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 建立任務執行器 ThreadPerTaskExecutor if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 建立 EventExecutor 陣列 children = new EventExecutor[nThreads]; // 通過for迴圈建立數量為 nThreads 個的 EventLoop for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 呼叫 newChild 介面 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 建立選擇器 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } 複製程式碼
建立執行緒任務執行器 ThreadPerTaskExecutor
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } 複製程式碼
執行緒任務執行器 ThreadPerTaskExecutor 原始碼如下,具體的任務都由 ThreadFactory 去執行:
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } // 使用 threadFactory 執行任務 @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } } 複製程式碼
來看看 newDefaultThreadFactory 方法:
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); } 複製程式碼
DefaultThreadFactory
接下來看看 DefaultThreadFactory 這個類,實現了 ThreadFactory 介面,我們可以瞭解到:
- EventLoopGroup的命名規則
- 具體的執行緒為 FastThreadLocalThread
public class DefaultThreadFactory implements ThreadFactory { // 執行緒池ID編號自增器 private static final AtomicInteger poolId = new AtomicInteger(); // 執行緒ID自增器 private final AtomicInteger nextId = new AtomicInteger(); // 執行緒名稱字首 private final String prefix; // 是否為守護程序 private final boolean daemon; // 執行緒優先順序 private final int priority; // 執行緒組 protected final ThreadGroup threadGroup; public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); } ... // 獲取執行緒名,返回結果:nioEventLoopGroup public static String toPoolName(Class<?> poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } // nioEventLoopGroup-2- prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override public Thread newThread(Runnable r) { // 建立新執行緒 nioEventLoopGroup-2-1 Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } // 建立新執行緒 FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } } 複製程式碼
建立NioEventLoop
繼續從 MultithreadEventExecutorGroup 構造器開始,建立完任務執行器 ThreadPerTaskExecutor 之後,進入for迴圈,開始建立 NioEventLoop:
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 建立 nioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } ... } 複製程式碼
NioEventLoopGroup類中的 newChild()
方法:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } 複製程式碼
NioEventLoop 構造器:
public final class NioEventLoop extends SingleThreadEventLoop{ ... NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 呼叫父類 SingleThreadEventLoop 構造器 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } // 設定 selectorProvider provider = selectorProvider; // 獲取 SelectorTuple 物件,裡面封裝了原生的selector和優化過的selector final SelectorTuple selectorTuple = openSelector(); // 設定優化過的selector selector = selectorTuple.selector; // 設定原生的selector unwrappedSelector = selectorTuple.unwrappedSelector; // 設定select策略 selectStrategy = strategy; } ... } 複製程式碼
接下來我們看看 獲取多路複用選擇器 方法—— openSelector() ,
// selectKey 優化選項flag private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); private SelectorTuple openSelector() { // JDK原生的selector final Selector unwrappedSelector; try { // 通過 SelectorProvider 建立獲得selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } // 如果不優化,則直接返回 if (DISABLE_KEYSET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } // 通過反射建立 sun.nio.ch.SelectorImpl 物件 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); // 如果 maybeSelectorImplClass 不是 selector 的一個實現,則直接返回原生的Selector if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. // 確保當前的選擇器實現是我們可以檢測的 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } // maybeSelectorImplClass 是selector的實現,則轉化為 selector 實現類 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; // 建立新的 SelectionKey 集合 SelectedSelectionKeySet,內部採用的是 SelectionKey 陣列的形 // 式,而非 set 集合 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // 通過反射的方式獲取 sun.nio.ch.SelectorImpl 的成員變數 selectedKeys Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); // 通過反射的方式獲取 sun.nio.ch.SelectorImpl 的成員變數 publicSelectedKeys Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } // 設定欄位 selectedKeysAccessible 為true Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } // 設定欄位 publicSelectedKeysAccessible 為true cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } // 設定 SelectedSelectionKeySet selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); // 返回包含了原生selector和優化過的selector的SelectorTuple return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } 複製程式碼
優化後的 SelectedSelectionKeySet 物件,內部採用 SelectionKey 陣列的形式:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } // 使用陣列,來替代HashSet,可以降低時間複雜度為O(1) @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; } @Override public boolean remove(Object o) { return false; } @Override public boolean contains(Object o) { return false; } @Override public int size() { return size; } @Override public Iterator<SelectionKey> iterator() { return new Iterator<SelectionKey>() { private int idx; @Override public boolean hasNext() { return idx < size; } @Override public SelectionKey next() { if (!hasNext()) { throw new NoSuchElementException(); } return keys[idx++]; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } void reset() { reset(0); } void reset(int start) { Arrays.fill(keys, start, size, null); size = 0; } // 擴容 private void increaseCapacity() { SelectionKey[] newKeys = new SelectionKey[keys.length << 1]; System.arraycopy(keys, 0, newKeys, 0, size); keys = newKeys; } } 複製程式碼
SingleThreadEventLoop 構造器
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ... protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { // 呼叫 SingleThreadEventExecutor 構造器 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); } ... } 複製程式碼
SingleThreadEventExecutor 構造器,主要做兩件事情:
- 設定執行緒任務執行器。
- 設定任務佇列。前面講到EventLoop對於不能立即執行的Task會放入一個佇列中,就是這裡設定的。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ... protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); // 設定執行緒任務執行器 this.executor = ObjectUtil.checkNotNull(executor, "executor"); // 設定任務佇列 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } ... } 複製程式碼
NioEventLoop 中對 newTaskQueue 介面的實現,返回的是 JCTools 工具包 Mpsc 佇列。後面我們寫文章單獨介紹 JCTools 中的相關佇列。
Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)
多個生產者對單個消費者(無鎖、有界和無界都有實現)
public final class NioEventLoop extends SingleThreadEventLoop { ... @Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); } ... } 複製程式碼
建立執行緒執行選擇器chooser
接下來,我們看看 MultithreadEventExecutorGroup 構造器的最後一個部分內容,建立執行緒執行選擇器chooser,它的主要作用就是 EventLoopGroup 用於從 EventLoop 陣列中選擇一個 EventLoop 去執行任務。
// 建立選擇器 chooser = chooserFactory.newChooser(children); 複製程式碼
EventLoopGroup 中定義的 next()
介面:
public interface EventLoopGroup extends EventExecutorGroup { ... // 選擇下一個 EventLoop 用於執行任務 @Override EventLoop next(); ... } 複製程式碼
MultithreadEventExecutorGroup 中對 next() 的實現:
@Override public EventExecutor next() { // 呼叫 DefaultEventExecutorChooserFactory 中的next() return chooser.next(); } 複製程式碼
DefaultEventExecutorChooserFactory 對於如何從陣列中選擇任務執行器,也做了巧妙的優化。
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // 判斷執行緒任務執行的個數是否為 2 的冪次方。e.g: 2、4、8、16 private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } // 冪次方選擇器 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 通過二級制進行 & 運算,效率更高 return executors[idx.getAndIncrement() & executors.length - 1]; } } // 普通選擇器 private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 按照最普通的取模的方式從index=0開始向後開始選擇 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } } 複製程式碼
小結
通過本節內容,我們瞭解到了EventLoop與EventLoopGroup的基本原理,EventLoopGroup與EventLoop的建立過程:
- 建立執行緒任務執行器 ThreadPerTaskExecutor
- 建立EventLoop
- 建立任務選擇器 EventExecutorChooser