1. 程式人生 > >Netty系列(一):NioEventLoopGroup原始碼解析

Netty系列(一):NioEventLoopGroup原始碼解析

前言

對於NioEventLoopGroup這個物件,在我的理解裡面它就和ThreadGroup類似,NioEventLoopGroup中有一堆NioEventLoop小弟,ThreadGroup中有一堆Thread小弟,真正意義上幹活的都是NioEventLoopThread這兩個小弟。下面的文章大家可以類比下進行閱讀,應該會很容易弄懂的。(本文基於netty-4.1.32.Final)

NioEventLoopGroup

這裡咱們可以從NioEventLoopGroup最簡單的無參建構函式開始。

1    public NioEventLoopGroup
() 
{
2        this(0);
3    }
複製程式碼

一步步往下走,可以發現最終呼叫到建構函式:

1    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
2                             final
 SelectStrategyFactory selectStrategyFactory)
 
{
3        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
4    }
複製程式碼

引數說明:

  1. nThreads:在整個方法鏈的呼叫過程中,其值到這裡為止一直為0,在沒有主動配置的情況下後面會進行設定。若配置io.netty.eventLoopThreads
    系統環境變數,則優先考慮,否則設定成為CPU核心數*2
  2. executor: 到目前為止是null
  3. selectorProvider: 這裡為JDK的預設實現SelectorProvider.provider()
  4. selectStrategyFactory:這裡的值是DefaultSelectStrategyFactory的一個例項SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
  5. RejectedExecutionHandlers:這裡是個拒絕策略,這裡預設的實現是佇列溢位時丟擲RejectedExecutionException異常。

MultithreadEventLoopGroup

繼續往下面走,呼叫父類MultithreadEventLoopGroup中的建構函式:

1    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
2        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
3    }
複製程式碼

這裡可以看到判斷nThreads == 0後就會給其附上一個預設值。繼續走,呼叫父類MultithreadEventExecutorGroup中的構造方法。

1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
2        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
3    }
複製程式碼
DefaultEventExecutorChooserFactory

這裡有個關注的點,DefaultEventExecutorChooserFactory。這是一個chooserFactory,用來生產EventExecutorChooser選擇器的。而EventExecutorChooser的功能是用來選擇哪個EventExecutor去執行咱們的任務。咱們從下面的程式碼中可以觀察到DefaultEventExecutorChooserFactory一共給咱們提供了兩種策略。

1    public EventExecutorChooser newChooser(EventExecutor[] executors) {
2        if (isPowerOfTwo(executors.length)) {
3            return new PowerOfTwoEventExecutorChooser(executors);
4        } else {
5            return new GenericEventExecutorChooser(executors);
6        }
7    }
複製程式碼

這裡的策略也很簡單:

  1. 如果給的執行緒數是2^n個,那麼選擇PowerOfTwoEventExecutorChooser這個選擇器,因為這樣可以採用位運算去獲取執行任務的EventExecutor
1        public EventExecutor next() {
2            return executors[idx.getAndIncrement() & executors.length - 1];
3        }
複製程式碼
  1. GenericEventExecutorChooser選擇器,這裡採用的是取模的方式去獲取執行任務的EventExecutor
1        public EventExecutor next() {
2            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
3        }
複製程式碼

相比而言,位運算的效率要比取模的效率高,所以咱們在自定義執行緒數的時候,最好設定成為2^n個執行緒數。


幹正事

到達最終呼叫的函式

 1    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
2                                            EventExecutorChooserFactory chooserFactory, Object... args)
 
{
3        if (nThreads <= 0) {
4            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
5        }
6
7        if (executor == null) {
8            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
9        }
10
11        children = new EventExecutor[nThreads];
12
13        for (int i = 0; i < nThreads; i ++) {
14            boolean success = false;
15            try {
16                children[i] = newChild(executor, args);
17                success = true;
18            } catch (Exception e) {
19                // TODO: Think about if this is a good exception type
20                throw new IllegalStateException("failed to create a child event loop", e);
21            } finally {
22                if (!success) {
23                    for (int j = 0; j < i; j ++) {
24                        //建立NioEventLoop失敗後進行資源的一些釋放
25                        children[j].shutdownGracefully();
26                    }
27
28                    for (int j = 0; j < i; j ++) {
29                        EventExecutor e = children[j];
30                        try {
31                            while (!e.isTerminated()) {
32                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
33                            }
34                        } catch (InterruptedException interrupted) {
35                            // Let the caller handle the interruption.
36                            Thread.currentThread().interrupt();
37                            break;
38                        }
39                    }
40                }
41            }
42        }
43       //這裡可以去看下上面對於 DefaultEventExecutorChooserFactory的一些介紹
44        chooser = chooserFactory.newChooser(children);
45
46        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
47            @Override
48            public void operationComplete(Future<Object> future) throws Exception {
49                if (terminatedChildren.incrementAndGet() == children.length) {
50                    terminationFuture.setSuccess(null);
51                }
52            }
53        };
54
55        for (EventExecutor e: children) {
56            // 給每一個成功建立的EventExecutor 繫結一個監聽終止事件
57            e.terminationFuture().addListener(terminationListener);
58        }
59
60        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
61        Collections.addAll(childrenSet, children);
62        // 弄一個只讀的EventExecutor陣列,方便後面快速迭代,不會丟擲併發修改異常
63        readonlyChildren = Collections.unmodifiableSet(childrenSet);
64    }
複製程式碼

從上面的程式碼可以觀察到,等了很久的executor 在這裡終於給其賦值了,其值為ThreadPerTaskExecutor的一個例項物件,這一塊的初始化賦值都是很簡單的,幹活呼叫的是如下方法:

1    public void execute(Runnable command) {
2        threadFactory.newThread(command).start();
3    }
複製程式碼

對這一塊不是很瞭解的可以去查閱下執行緒池有關的資料,咱們重點關注一下newChild這個方法,可以說是上面整個流程中的重點:

newChild

newChild這個方法在NioEventLoopGroup中被重寫了:

1    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
2        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
3            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
4    }
複製程式碼

細心的小夥伴可以觀察到,這裡有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler這個三個引數,實際上就是本文最開始初始化的三個例項物件(可以翻閱到頂部檢視一下)。
繼續往下走流程:

 1    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
2                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
3        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
4        if (selectorProvider == null) {
5            throw new NullPointerException("selectorProvider");
6        }
7        if (strategy == null) {
8            throw new NullPointerException("selectStrategy");
9        }
10        provider = selectorProvider;
11        final SelectorTuple selectorTuple = openSelector();
12        selector = selectorTuple.selector;
13        unwrappedSelector = selectorTuple.unwrappedSelector;
14        selectStrategy = strategy;
15    }
複製程式碼

在上面的程式碼片段中除了呼叫父類的構造器之外就進行了引數的判空和簡單的賦值。這裡openSelector方法呼叫後返回SelectorTuple例項主要是為了能同時得到包裝前後的selectorunwrappedSelector

 1    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
2                                        boolean addTaskWakesUp, int maxPendingTasks,
3                                        RejectedExecutionHandler rejectedHandler) {
4        super(parent);
5        this.addTaskWakesUp = addTaskWakesUp;
6        this.maxPendingTasks = Math.max(16, maxPendingTasks);
7        this.executor = ObjectUtil.checkNotNull(executor, "executor");
8        taskQueue = newTaskQueue(this.maxPendingTasks);
9        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
10    }
複製程式碼

這裡會有一個taskQueue佇列的初始化(Queue<Runnable> taskQueue),看名字就知道,這個佇列裡面放著的是咱們要去執行的任務。這裡的初始化方法newTaskQueueNioEventLoop中重寫了的。具體如下:

1    protected Queue<Runnable> newTaskQueue(int maxPendingTasks{
2        // This event loop never calls takeTask()
3        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
4                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
5    }
複製程式碼

這裡生成的是一個MPSC佇列(Multi Producer Single Consumer),這是一個多生產者單消費的無鎖佇列,支援併發。從字面意思上就可以觀察到這個佇列效率應該是蠻高的。這裡的maxPendingTasks值為Integer.MAX_VALUE。然後最終生成的是MpscUnboundedArrayQueue這樣一個無邊界的佇列。

這樣newChild這個方法到這裡就走完了。


terminationListener

簡單介紹下這個環節,在上面的建立NioEventLoopGroup有個環節是給每個NioEventLoop兒子繫結一個terminationListener監聽事件

1        for (EventExecutor e: children) {
2            e.terminationFuture().addListener(terminationListener);
3        }
複製程式碼

這個事件的回撥方法是:

1            @Override
2            public void operationComplete(Future<Object> future) throws Exception {
3                if (terminatedChildren.incrementAndGet() == children.length) {
4                    terminationFuture.setSuccess(null);
5                }
6            }
複製程式碼

在每一個NioEventLoop關閉後,就會回撥這個方法,然後給NioEventLoopGroup例項中的terminatedChildren欄位自增1,並與初始化成功的NioEventLoop的總個數進行比較,如果
terminatedChildren的值與NioEventLoop的總個數相等,則呼叫bossGroup.terminationFuture().get()方法就不會阻塞,並正常返回null
同樣,future.channel().closeFuture().sync()這段程式碼也將不會阻塞住了,呼叫sync.get()也會返回null
下面給一段測試程式碼,完整示例大家可以到我的github中去獲取:

terminationListener_test
terminationListener_test

上面的程式碼只是一個簡單的測試,後面還有別的發現的話會繼續在github中與大家一起分享~


End