1. 程式人生 > >Netty 原始碼分析之 三 我就是大名鼎鼎的 EventLoop(一)

Netty 原始碼分析之 三 我就是大名鼎鼎的 EventLoop(一)

此文章已同步傳送到我的 github 上

簡述

這一章是 Netty 原始碼分析 的第三章, 我將在這一章中大家一起探究一下 Netty 的 EventLoop 的底層原理, 讓大家對 Netty 的執行緒模型有更加深入的瞭解.

NioEventLoopGroup

在 Netty 原始碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端) 章節中我們已經知道了, 一個 Netty 程式啟動時, 至少要指定一個 EventLoopGroup(如果使用到的是 NIO, 那麼通常是 NioEventLoopGroup), 那麼這個 NioEventLoopGroup 在 Netty 中到底扮演著什麼角色呢? 我們知道, Netty 是 Reactor 模型的一個實現, 那麼首先從 Reactor 的執行緒模型開始吧.

關於 Reactor 的執行緒模型

首先我們來看一下 Reactor 的執行緒模型.
Reactor 的執行緒模型有三種:

  • 單執行緒模型

  • 多執行緒模型

  • 主從多執行緒模型

首先來看一下 單執行緒模型:

所謂單執行緒, 即 acceptor 處理和 handler 處理都在一個執行緒中處理. 這個模型的壞處顯而易見: 當其中某個 handler 阻塞時, 會導致其他所有的 client 的 handler 都得不到執行, 並且更嚴重的是, handler 的阻塞也會導致整個服務不能接收新的 client 請求(因為 acceptor 也被阻塞了). 因為有這麼多的缺陷, 因此單執行緒Reactor 模型用的比較少.

那麼什麼是 多執行緒模型 呢? Reactor 的多執行緒模型與單執行緒模型的區別就是 acceptor 是一個單獨的執行緒處理, 並且有一組特定的 NIO 執行緒來負責各個客戶端連線的 IO 操作. Reactor 多執行緒模型如下:

Reactor 多執行緒模型 有如下特點:

  • 有專門一個執行緒, 即 Acceptor 執行緒用於監聽客戶端的TCP連線請求.

  • 客戶端連線的 IO 操作都是由一個特定的 NIO 執行緒池負責. 每個客戶端連線都與一個特定的 NIO 執行緒繫結, 因此在這個客戶端連線中的所有 IO 操作都是在同一個執行緒中完成的.

  • 客戶端連線有很多, 但是 NIO 執行緒數是比較少的, 因此一個 NIO 執行緒可以同時繫結到多個客戶端連線中.

接下來我們再來看一下 Reactor 的主從多執行緒模型.
一般情況下, Reactor 的多執行緒模式已經可以很好的工作了, 但是我們考慮一下如下情況: 如果我們的伺服器需要同時處理大量的客戶端連線請求或我們需要在客戶端連線時, 進行一些許可權的檢查, 那麼單執行緒的 Acceptor 很有可能就處理不過來, 造成了大量的客戶端不能連線到伺服器.
Reactor 的主從多執行緒模型就是在這樣的情況下提出來的, 它的特點是: 伺服器端接收客戶端的連線請求不再是一個執行緒, 而是由一個獨立的執行緒池組成. 它的執行緒模型如下:

可以看到, Reactor 的主從多執行緒模型和 Reactor 多執行緒模型很類似, 只不過 Reactor 的主從多執行緒模型的 acceptor 使用了執行緒池來處理大量的客戶端請求.

NioEventLoopGroup 與 Reactor 執行緒模型的對應

我們介紹了三種 Reactor 的執行緒模型, 那麼它們和 NioEventLoopGroup 又有什麼關係呢? 其實, 不同的設定 NioEventLoopGroup 的方式就對應了不同的 Reactor 的執行緒模型.

單執行緒模型

來看一下下面的例子:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class)
 ...

注意, 我們例項化了一個 NioEventLoopGroup, 構造器引數是1, 表示 NioEventLoopGroup 的執行緒池大小是1. 然後接著我們呼叫 b.group(bossGroup) 設定了伺服器端的 EventLoopGroup. 有些朋友可能會有疑惑: 我記得在啟動伺服器端的 Netty 程式時, 是需要設定 bossGroup 和 workerGroup 的, 為什麼這裡就只有一個 bossGroup?
其實很簡單, ServerBootstrap 重寫了 group 方法:

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

因此當傳入一個 group 時, 那麼 bossGroup 和 workerGroup 就是同一個 NioEventLoopGroup 了.
這時候呢, 因為 bossGroup 和 workerGroup 就是同一個 NioEventLoopGroup, 並且這個 NioEventLoopGroup 只有一個執行緒, 這樣就會導致 Netty 中的 acceptor 和後續的所有客戶端連線的 IO 操作都是在一個執行緒中處理的. 那麼對應到 Reactor 的執行緒模型中, 我們這樣設定 NioEventLoopGroup 時, 就相當於 Reactor 單執行緒模型.

多執行緒模型

同理, 再來看一下下面的例子:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

bossGroup 中只有一個執行緒, 而 workerGroup 中的執行緒是 CPU 核心數乘以2, 因此對應的到 Reactor 執行緒模型中, 我們知道, 這樣設定的 NioEventLoopGroup 其實就是 Reactor 多執行緒模型.

主從多執行緒模型

相信讀者朋友都想到了, 實現主從執行緒模型的例子如下:

EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

bossGroup 執行緒池中的執行緒數我們設定為4, 而 workerGroup 中的執行緒是 CPU 核心數乘以2, 因此對應的到 Reactor 執行緒模型中, 我們知道, 這樣設定的 NioEventLoopGroup 其實就是 Reactor 主從多執行緒模型.

根據 @labmem 的提示, Netty 的伺服器端的 acceptor 階段, 沒有使用到多執行緒, 因此上面的 主從多執行緒模型 在 Netty 的伺服器端是不存在的.
伺服器端的 ServerSocketChannel 只繫結到了 bossGroup 中的一個執行緒, 因此在呼叫 Java NIO 的 Selector.select 處理客戶端的連線請求時, 實際上是在一個執行緒中的, 所以對只有一個服務的應用來說, bossGroup 設定多個執行緒是沒有什麼作用的, 反而還會造成資源浪費.

經 Google, Netty 中的 bossGroup 為什麼使用執行緒池的原因大家眾所紛紜, 不過我在 stackoverflow 上找到一個比較靠譜的答案:

the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps, but I don't see the reason for it.

因此上面的 主從多執行緒模型 分析是有問題, 抱歉.

NioEventLoopGroup 類層次結構

NioEventLoopGroup 例項化過程

在前面 Netty 原始碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端) 章節中, 我們已經簡單地介紹了一下 NioEventLoopGroup 的初始化過程, 這裡再回顧一下:

即:

  • EventLoopGroup(其實是MultithreadEventExecutorGroup) 內部維護一個型別為 EventExecutor children 陣列, 其大小是 nThreads, 這樣就構成了一個執行緒池

  • 如果我們在例項化 NioEventLoopGroup 時, 如果指定執行緒池大小, 則 nThreads 就是指定的值, 反之是處理器核心數 * 2

  • MultithreadEventExecutorGroup 中會呼叫 newChild 抽象方法來初始化 children 陣列

  • 抽象方法 newChild 是在 NioEventLoopGroup 中實現的, 它返回一個 NioEventLoop 例項.

  • NioEventLoop 屬性:

    • SelectorProvider provider 屬性: NioEventLoopGroup 構造器中通過 SelectorProvider.provider() 獲取一個 SelectorProvider

    • Selector selector 屬性: NioEventLoop 構造器中通過呼叫通過 selector = provider.openSelector() 獲取一個 selector 物件.

NioEventLoop

NioEventLoop 繼承於 SingleThreadEventLoop, 而 SingleThreadEventLoop 又繼承於 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中對本地執行緒的抽象, 它內部有一個 Thread thread 屬性, 儲存了一個本地 Java 執行緒. 因此我們可以認為, 一個 NioEventLoop 其實和一個特定的執行緒繫結, 並且在其生命週期內, 繫結的執行緒都不會再改變.

NioEventLoop 類層次結構

NioEventLoop 的類層次結構圖還是比較複雜的, 不過我們只需要關注幾個重要的點即可. 首先 NioEventLoop 的繼承鏈如下:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

在 AbstractScheduledEventExecutor 中, Netty 實現了 NioEventLoop 的 schedule 功能, 即我們可以通過呼叫一個 NioEventLoop 例項的 schedule 方法來執行一些定時任務. 而在 SingleThreadEventLoop 中, 又實現了任務佇列的功能, 通過它, 我們可以呼叫一個 NioEventLoop 例項的 execute 方法來向任務佇列中新增一個 task, 並由 NioEventLoop 進行排程執行.

通常來說, NioEventLoop 肩負著兩種任務, 第一個是作為 IO 執行緒, 執行與 Channel 相關的 IO 操作, 包括 呼叫 select 等待就緒的 IO 事件、讀寫資料與資料的處理等; 而第二個任務是作為任務佇列, 執行 taskQueue 中的任務, 例如使用者呼叫 eventLoop.schedule 提交的定時任務也是這個執行緒執行的.

NioEventLoop 的例項化過程

從上圖可以看到, SingleThreadEventExecutor 有一個名為 thread 的 Thread 型別欄位, 這個欄位就代表了與 SingleThreadEventExecutor 關聯的本地執行緒.
下面是這個構造器的程式碼:

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 省略清理程式碼
                ...
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    taskQueue = newTaskQueue();
}

在 SingleThreadEventExecutor 構造器中, 通過 threadFactory.newThread 建立了一個新的 Java 執行緒. 在這個執行緒中所做的事情主要就是呼叫 SingleThreadEventExecutor.this.run() 方法, 而因為 NioEventLoop 實現了這個方法, 因此根據多型性, 其實呼叫的是 NioEventLoop.run() 方法.

EventLoop 與 Channel 的關聯

Netty 中, 每個 Channel 都有且僅有一個 EventLoop 與之關聯, 它們的關聯過程如下:

從上圖中我們可以看到, 當呼叫了 AbstractChannel#AbstractUnsafe.register 後, 就完成了 Channel 和 EventLoop 的關聯. register 實現如下:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 刪除條件檢查.
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

在 AbstractChannel#AbstractUnsafe.register 中, 會將一個 EventLoop 賦值給 AbstractChannel 內部的 eventLoop 欄位, 到這裡就完成了 EventLoop 與 Channel 的關聯過程.

EventLoop 的啟動

在前面我們已經知道了, NioEventLoop 本身就是一個 SingleThreadEventExecutor, 因此 NioEventLoop 的啟動, 其實就是 NioEventLoop 所繫結的本地 Java 執行緒的啟動.
依照這個思想, 我們只要找到在哪裡呼叫了 SingleThreadEventExecutor 的 thread 欄位的 start() 方法就可以知道是在哪裡啟動的這個執行緒了.
從程式碼中搜索, thread.start() 被封裝到 SingleThreadEventExecutor.startThread() 方法中了:

private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            thread.start();
        }
    }
}

STATE_UPDATER 是 SingleThreadEventExecutor 內部維護的一個屬性, 它的作用是標識當前的 thread 的狀態. 在初始的時候, STATE_UPDATER == ST_NOT_STARTED, 因此第一次呼叫 startThread() 方法時, 就會進入到 if 語句內, 進而呼叫到 thread.start().
而這個關鍵的 startThread() 方法又是在哪裡呼叫的呢? 經過方法呼叫關係搜尋, 我們發現, startThread 是在 SingleThreadEventExecutor.execute 方法中呼叫的:

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread(); // 呼叫 startThread 方法, 啟動EventLoop 執行緒.
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

既然如此, 那現在我們的工作就變為了尋找 在哪裡第一次呼叫了 SingleThreadEventExecutor.execute() 方法.
如果留心的讀者可能已經注意到了, 我們在 EventLoop 與 Channel 的關聯 這一小節時, 有提到到在註冊 channel 的過程中, 會在 AbstractChannel#AbstractUnsafe.register 中呼叫 eventLoop.execute 方法, 在 EventLoop 中進行 Channel 註冊程式碼的執行, AbstractChannel#AbstractUnsafe.register 部分程式碼如下:

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new OneTimeTask() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
        ...
    }
}

很顯然, 一路從 Bootstrap.bind 方法跟蹤到 AbstractChannel#AbstractUnsafe.register 方法, 整個程式碼都是在主執行緒中執行的, 因此上面的 eventLoop.inEventLoop() 就為 false, 於是進入到 else 分支, 在這個分支中呼叫了 eventLoop.execute. eventLoop 是一個 NioEventLoop 的例項, 而 NioEventLoop 沒有實現 execute 方法, 因此呼叫的是 SingleThreadEventExecutor.execute:

@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

我們已經分析過了, inEventLoop == false, 因此執行到 else 分支, 在這裡就呼叫了 startThread() 方法來啟動 SingleThreadEventExecutor 內部關聯的 Java 本地執行緒了.
總結一句話, 當 EventLoop.execute 第一次被呼叫時, 就會觸發 startThread() 的呼叫, 進而導致了 EventLoop 所對應的 Java 執行緒的啟動.
我們將 EventLoop 與 Channel 的關聯 小節中的時序圖補全後, 就得到了 EventLoop 啟動過程的時序圖:

相關推薦

Netty 原始碼分析 就是大名鼎鼎EventLoop()

此文章已同步傳送到我的 github 上 簡述 這一章是 Netty 原始碼分析 的第三章, 我將在這一章中大家一起探究一下 Netty 的 EventLoop 的底層原理, 讓大家對 Netty 的執行緒模型有更加深入的瞭解. NioEventLoopGroup 在 Netty 原始碼分析之 一 揭

Netty原始碼分析就是大名鼎鼎EventLoop

簡述 這一章是 Netty 原始碼分析 的第三章, 我將在這一章中大家一起探究一下 Netty 的 EventLoop 的底層原理, 讓大家對 Netty 的執行緒模型有更加深入的瞭解. NioEventLoopGroup 在[Netty 原始碼分析之

Netty原始碼分析NioEventLoop()—NioEventLoop的執行

前面兩篇文章Netty原始碼分析之NioEventLoop(一)—NioEventLoop的建立與Netty原始碼分析之NioEventLoop(二)—NioEventLoop的啟動中我們對NioEventLoop的建立與啟動做了具體的分析,本篇文章中我們會對NioEventLoop的

Netty 原始碼分析拆包器的奧祕

為什麼要粘包拆包 為什麼要粘包 首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資

netty原始碼分析服務端啟動

ServerBootstrap與Bootstrap分別是netty中服務端與客戶端的引導類,主要負責服務端與客戶端初始化、配置及啟動引導等工作,接下來我們就通過netty原始碼中的示例對ServerBootstrap與Bootstrap的原始碼進行一個簡單的分析。首先我們知道這兩個類都繼承自AbstractB

Netty原始碼分析LengthFieldBasedFrameDecoder

拆包的原理 關於拆包原理的上一篇博文 netty原始碼分析之拆包器的奧祕 中已詳細闡述,這裡簡單總結下:netty的拆包過程和自己寫手工拆包並沒有什麼不同,都是將位元組累加到一個容器裡面,判斷當前累加的位元組資料是否達到了一個包的大小,達到一個包大小就拆開,進而傳遞到上層業務解碼handler 之所以ne

Netty原始碼分析ChannelPipeline

每個channel內部都會持有一個ChannelPipeline物件pipeline. pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。     當channel完成registe

netty原始碼分析新連線接入全解析

本文收穫 通讀本文,你會了解到 1.netty如何接受新的請求 2.netty如何給新請求分配reactor執行緒 3.netty如何給每個新連線增加ChannelHandler 其實,遠不止這些~ 前序背景 讀這篇文章之前,最好掌握一些前序知識,包括netty中的r

nettynetty原始碼分析ChannelPipeline和ChannelHandler

ChannelPipeline ChannelPipeline是ChannelHandler的容器,他負責ChannelHandler的事件攔截.       @Override public final ChannelPipelin

nettynetty原始碼分析NioEventLoop和NioEventLoopGroup

NioEventLoop 繼承Executor介面. NioEventLoop持有Selector selector;  通過openSelector() 獲取Selector .   @Override protected void run() {

netty原始碼分析-SimpleChannelInboundHandler與ChannelInboundHandlerAdapter詳解(6)

每一個Handler都一定會處理出站或者入站(也可能兩者都處理)資料,例如對於入站的Handler可能會繼承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,而SimpleChannelIn

Netty 專欄】Netty原始碼分析NioEventLoop

點選上方“芋道原始碼”,選擇“置頂公眾號”技術文章第一時間送達!原始碼精品專欄 上一章節中,我們

Netty 專欄】Netty原始碼分析ChannelPipeline

點選上方“芋道原始碼”,選擇“置頂公眾號”技術文章第一時間送達!原始碼精品專欄 本章節分析Net

Spark原始碼分析:Stage劃分

        Stage劃分的大體流程如下圖所示:         前面提到,對於JobSubmitted事件,我們通過呼叫DAGScheduler的handleJobSubmitted()方法來處理。那麼我們先來看下程式碼: // 處理Job提交的函式 pri

Netty原始碼分析Reactor執行緒模型

一、背景  最近在研究netty的原始碼,今天發表一篇關於netty的執行緒框架--Reactor執行緒模型,作為最近研究成果。如果有還不瞭解Reactor模型請自行百度,網上有很多關於Reactor模式。  研究netty的時候,先看了下《netty權威指南》,裡面講解不

netty原始碼分析 五 transport(ChannelHandler)

上文說到了,channelHandler, 顧名思義 handler 處理者 從channelPipeline的定義中看出,channelPipeline是channelHandler的集合 public interface ChannelPipeline extends

netty原始碼分析 ByteBuf

終於到最後的ByteBuf了,其實和jdk nio的ByteBuffer  含義大致相同 都是對byte陣列的操作,不同的是ByteBuf定義了兩個下標 讀下標和寫下標 然後再看看其的實現類 WrappedByteBuf 對byteBuf的包裝類 Empt

libjingle原始碼分析:P2P

摘要         本文主要介紹了libjingle庫中的P2P模組。 概述         在libjingle中,P2P模組並非一個完全獨立的模組,它的實現依賴於Jingle協議,需要通過libjingle中的其它模組獲取必要的資訊和支援。P2P模組的內部結構及與

Netty原始碼分析流水線

上一篇分析Netty執行緒模型,今天分析Netty另外一個重點流水線Pipe。 一、流水線處理邏輯 Netty把各個事件放到Pipe中,進行自動化處理,這個做法非常棒!!思想非常獨特,使用者不需要關心

netty原始碼分析 九 handler

學習完前面的channel,回頭來學習handler 會感覺到很簡單的. handler 這個包裡面的類實現  ChannelHandlerAdapter  codec我們最後來看,先看其他 logging LoggingHandler 為log的輸出類, 定義