1. 程式人生 > >自頂向下深入分析Netty(四)--EventLoop-1

自頂向下深入分析Netty(四)--EventLoop-1


netty執行緒模型

我們再次回顧這幅圖,通過先前的講解,現在是不是親切很多了。圖中綠色的acceptor應該是你最熟悉的部分,之前我們在ServerBootstrap中進行了詳細分析。我們知道了mainReactor是一個執行緒池,處理Accept事件負責接受客戶端的連線;subReactor也是一個執行緒池,處理Read(讀取客戶端通道上的資料)、Write(將資料寫入到客戶端通道上)等事件。在這一節中,我們將深入分析這兩個執行緒池的實現,不斷完善其中的細節。我們首先從類圖開始。

4.1 類圖


EventLoop類圖

看到這幅類圖,如果你的第一印象是氣勢恢巨集,那麼恭喜你,你已經成功了一半。但不難預料的是,大多數人和我的感受是一樣的:這麼多類,一定很累。好在這只是第一印象,我們仔細觀察,便會發現其中明顯的脈絡,兩條線索(這裡使用自下而上):NioEventLoop以及NioEventLoopGroup即執行緒和執行緒池。忽略其中大量的介面,剩餘這樣的兩條線:

NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor -->
AbstractScheduledEventExecutor --> AbstractScheduledEventExecutor --> 
AbstractEventExecutor --> AbstractExecutorService

NioEventLoopGroup --> MultithreadEventLoopGroup --> 
MultithreadEventExecutorGroup
-->
AbstractEventExecutorGroup

下面我們正式開始分析,依舊使用自頂向下的方法,從類圖頂部向下、從執行緒池到執行緒分析。

4.2 EventExecutorGroup

EventExecutorGroup在類圖中處於承上啟下的位置,其上是Java原生的介面和類,其下是Netty新建的介面和類,由於它處於如此重要的位置,我們詳細分析其中的方法。

4.2.1 Executor

首先看其繼承自Executor的方法:

    // Executes the given command at some time in the future
    void execute
(Runnable command)
;

只有一個簡單的execute()方法,但這個方法奠定了java併發的基礎,提供了非同步執行任務的。

4.2.2 ExecutorService

ExecutorService的關鍵方法如下(其中的invoke*方法並非關鍵,不再列出):

    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

這些方法我們能從命名中便能知道方法的作用。我們主要看submit()方法,該方法是execute()方法的擴充套件,相較於execute不關心執行結果,submit返回一個非同步執行結果Future。這無疑是很大的進步,但這裡的Future不提供回撥操作,顯得很雞肋,所以Netty將Java原生的java.util.concurrent.Future擴充套件為io.netty.util.concurrent.Future,我們將在之後進行介紹。

4.2.3 ScheduledExecutorService

從名字可以看出,該介面提供了一系列排程方法:

    ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                long period,TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                long delay,TimeUnit unit);

schedule()方法排程任務使任務在延遲一段時間後執行。scheduleAtFixedRate延遲一段時間後以固定頻率執行任務,scheduleWithFixedDelay延遲一段時間後以固定延時執行任務。是不是有點頭暈?那就對了,這裡有一個例子專門治頭暈。專家建議程式設計師應該每小時工作50分鐘,休息10分鐘,類似這樣:

    13:00 - 13:10 休息
    13:10 - 14:00 寫程式碼
    14:00 - 14:10 休息
    14:10 - 15:00 寫程式碼

實現這樣的排程我們可以使用(假設現在時間為13:00):

    executor.scheduleAtFixedRate(new RestRunnable(), 0 , 60, TimeUnit.MINUTES);
    executor.scheduleWithFixedDelay(new RestRunnable(), 0 , 50, TimeUnit.MINUTES);

4.2.4 EventExecutorGroup

    boolean isShuttingDown();
    Future<?> shutdownGracefully();
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    Future<?> terminationFuture();
    EventExecutor next();

EventExecutorGroup擴充套件的方法有5個,前四個可以從命名中推斷出功能。shutdownGracefully()我們已經在Bootstrap一節中使用過,優雅關閉執行緒池;terminationFuture()返回執行緒池終止時的非同步結果。重點關注next()方法,該方法的功能是從執行緒池中選擇一個執行緒。EventExecutorGroup還覆蓋了一些方法,我們不再列出,如果你感興趣可以去原始碼裡面檢視,需要注意的是,覆蓋的方法大部分是將Java原生的java.util.concurrent.Future返回值覆蓋為io.netty.util.concurrent.Future。

4.3 執行緒池

4.3.1 AbstractEventExecutorGroup

AbstractEventExecutorGroup實現了EventExecutorGroup介面的大部分方法,實現都長的和下面的差不多:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

從這段程式碼可以看出這個執行緒池和程式設計師有一個相同點:懶。當執行緒池執行一個任務或命令時,步驟是這樣的:(1).找一個執行緒。(2).交給執行緒執行。

4.3.2 MultithreadEventExecutorGroup

MultithreadEventExecutorGroup實現了執行緒的建立和執行緒的選擇,其中的欄位為:

    // 執行緒池,陣列形式可知為固定執行緒池
    private final EventExecutor[] children;
    // 執行緒索引,用於執行緒選擇
    private final AtomicInteger childIndex = new AtomicInteger();
    // 終止的執行緒個數
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    // 執行緒池終止時的非同步結果
    private final Promise<?> terminationFuture = 
                          new DefaultPromise(GlobalEventExecutor.INSTANCE);
    // 執行緒選擇器
    private final EventExecutorChooser chooser;

MultithreadEventExecutorGroup的構造方法很長,我們將選出其中的關鍵部分分析,故不列出整體程式碼。如果你是處女座,這裡有一個連結MultithreadEventExecutorGroup
我們先看構造方法簽名:

    protected MultithreadEventExecutorGroup(int nThreads, 
                                        ThreadFactory threadFactory, Object... args)

其中的nThreads表示執行緒池的固定執行緒數。
MultithreadEventExecutorGroup初始化的步驟是:
(1).設定執行緒工廠
(2).設定執行緒選擇器
(3).例項化執行緒
(4).設定執行緒終止非同步結果
首先我們看裝置執行緒工廠的程式碼:

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }

    protected ThreadFactory newDefaultThreadFactory()() {
        return new DefaultThreadFactory(getClass());
    }

如果構造引數threadFactory為空則使用預設執行緒池,建立預設執行緒池使用newDefaultThreadFactory(),這是一個protected方法,可以在子類中覆蓋實現。
接著我們看設定執行緒選擇器的程式碼:

    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

如果執行緒數是2的冪次方使用2的冪次方選擇器,否則使用通用選擇器。下次如果有面試官問你怎麼判斷一個整數是2的冪次方,請甩給他這一行程式碼:

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

Netty實現了兩個執行緒選擇器,雖然程式碼不一致,功能都是一樣的:每次選擇索引為上一次所選執行緒索引+1的執行緒。如果你沒看明白程式碼的含義,沒關係,再看一遍。

    private interface EventExecutorChooser {
        EventExecutor next();
    }

    private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[childIndex.getAndIncrement() & children.length - 1];
        }
    }

    private final class GenericEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    }

最佳實踐:執行緒池數量使用2的冪次方,這樣執行緒池選擇執行緒時使用位操作,能使效能最高。
下面我們接著分析例項化執行緒的步驟:

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 使用模板方法newChild例項化一個執行緒
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 如果不成功,所有已經例項化的執行緒優雅關閉
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                // 確保已經例項化的執行緒終止
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

實現的過程一句話描述就是:使用newChild()依次例項化執行緒,如果出錯,關閉所有已經例項化的執行緒。也許你對finally中的程式碼有疑問,這是因為不清楚shutdownGracefully()的含義。你需要提前明白這樣的事實:shutdownGracefully()只是通知執行緒池該關閉,但什麼時候關閉由執行緒池決定,所以需要使用e.isTerminated()來判斷執行緒池是否真正關閉。
例項化執行緒池正常完成後,Netty使用下面的程式碼設定非同步終止結果:

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            // 執行緒池中的執行緒每終止一個增加記錄數,直到全部終止設定執行緒池非同步終止結果為成功
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

分析完MultithreadEventExecutorGroup的構造方法,我們繼續分析普通方法。它的普通方法基本與下面的isTerminated()類似:

    @Override
    public boolean isTerminated() {
        for (EventExecutor l: children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

總結起來就是:執行緒池的狀態由其中的各個執行緒決定。明白了這點,我們使用類比的方法可以推知其他方法的實現,故不再具體分析。

4.3.3 MultithreadEventLoopGroup

MultithreadEventLoopGroup實現了EventLoopGroup介面的方法,EventLoopGroup介面作為Netty併發的關鍵介面,我們看其中擴充套件的方法:

    // 將通道channel註冊到EventLoopGroup中的一個執行緒上
    ChannelFuture register(Channel channel);
    // 返回的ChannelFuture為傳入的ChannelPromise
    ChannelFuture register(Channel channel, ChannelPromise promise);
    // 覆蓋父類介面的方法,返回EventLoop
    @Override EventLoop next();

這些方法在MultithreadEventLoopGroup的具體實現很簡單。register()方法選擇一個執行緒,該執行緒負責具體的register()實現。next()方法使用父類實現,即使用上一節所述的選擇器選擇一個執行緒。程式碼如下:

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

分析完這些程式碼,我們關注一下執行緒數的預設設定。

    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", 
                Runtime.getRuntime().availableProcessors() * 2));

預設情況,執行緒數最小為1,如果配置了系統引數io.netty.eventLoopThreads,設定為該系統引數值,否則設定為核心數的2倍。

4.3.4 NioEventLoopGroup

NioEventLoopGroup的主要程式碼實現是模板方法newChild(),用來建立執行緒池中的單個執行緒,程式碼如下:

    @Override
    protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) 
                   throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), 
            (RejectedExecutionHandler) args[2]);
    }

關於程式碼中的引數含義,我們放在NioEventLoop中分析。此外NioEventLoopGroup還提供了setIoRatio()和rebuildSelectors()兩個方法,一個用來設定I/O任務和非I/O任務的執行時間比,一個用來重建執行緒中的selector來規避JDK的epoll 100% CPU Bug。其實現也是依次設定各執行緒的狀態,故不再列出。

相關推薦

深入分析Netty--EventLoop-1

netty執行緒模型 我們再次回顧這幅圖,通過先前的講解,現在是不是親切很多了。圖中綠色的acceptor應該是你最熟悉的部分,之前我們在ServerBootstrap中進行了詳細分析。我們知道了mainReactor是一個執行緒池,處理Accept事件負責接受客戶

深入分析Netty--Bootstrap

1.使用示例 首先使用Netty構造如圖所示的框架,原始碼如下: // 指定mainReactor EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 指定subReactor EventLoopGr

深入分析Netty--執行緒模型

上面這幅圖描述了netty的執行緒模型,其中mainReacotor,subReactor,Thread Pool是三個執行緒池。mainReactor負責處理客戶端的連線請求,並將accept的連線註冊到subReactor的其中一個執行緒上;subReactor負責處理客戶端通道上的資料讀

深入分析Netty--預備知識

netty是基於Java NIO封裝的網路通訊框架,只有充分理解了Java NIO才能理解好netty的底層設計。Java NIO有幾個重要的概念Channel,Buffer,Selector。NIO是基於Channel和Buffer操作的,資料只能通過Buffer寫入到Channel或者從Chan

章——語法分析方法

一、語法分析器的功能 語法分析器的任務:判斷所給單詞串是不是給定文法的正確句子。 1、確定的自頂向下分析思想: 從文法的識別符號出發、根據當前的輸入符號、唯一的確定一個產生式、用產生式右部的符號串代替相應的非終結符往下推。能構造成功則是句子,否則不是。 2、什麼

計算機網路 :應用層FTP、SMTP、POP3、IMAP

題外話,最近補習數學課還是很有成效的。 關於應用層是分三次寫的,因此寫的重點是在HTTP協議和DNS協議分析上。而應用層中FTP、SMTP、POP3、IMAP,計算機網路自頂向下中並沒有太詳細的介紹。我也試著看RFC然後抓包分析一下過程,採用的抓包方法是FTP

計算機網路 :應用層DNS,POP

DNS是: 一個分散式DNS伺服器實現分佈或資料庫。 一個使得主機能夠查詢分佈資料庫的應用層協議。 DNS伺服器通常是執行BIND軟體的Linux機器。 DNS協議執行在UDP之上使用53埠。 DNS提供的服務: DNS除了進行主機到IP地址的

文法分析小結:底向上的分析方法和分析方法有哪些

首先注意一點:無論是那種語法分析,語法都是從左至右的讀入符號! 自底向上分析法,也稱移進-歸約分析法。 它的實現思想是對輸入符號串自左向右進行掃描,並將輸入符逐個移入一個後進先出棧中,邊移入邊分析,一旦棧頂符號串形成某個句型的控制代碼時,(該控制代碼對應某產生式的右部),就

分析一個簡單的語音識別系統

上回分析了run_model函式的configuration過程,其中load_placeholder_into_network函式用於構建該語音識別系統中RNN網路的基本結構,本回將分析以下該網路。 1.RNN簡介 人們並不是從每秒鐘他接收到的資訊開始

分析一個簡單的語音識別系統

接著上回結束的地方,本回我們來分析sparse_tuple_to_texts函式和ndarray_to_text函式。首先分析sparse_tuple_to_texts函式。 1.sparse_tuple_to_texts函式 給出程式碼如下: def s

分析一個簡單的語音識別系統

上回我們說到了get_audio_and_transcript函式、pad_sequences函式和sparse_tuple_from函式等3個函式,本回我們分析這3個函式分別實現了哪些功能。 1.get_audio_and_transcript函式 該

分析一個簡單的語音識別系統

本回我們主要分析run_model中的configuration過程的相關函式。 1.run_model函式 第二回我們簡單介紹了run_model函式的結構,現在我們貼出程式碼如下所示: def run_model(self):

分析一個簡單的語音識別系統

上回我們分析了模型的初始化,花開兩朵各表一枝,本回我們說一下上回提到的set_dirs.py。該檔案結構如下圖所示: Created with Raphaël 2.1.0get_home_dirget_data_dir/get_conf_dir/get_mod

分析一個簡單的語音識別系統

RNN處理帶有時間序列的資料時具有很大的優勢,接下來幾篇文章將介紹如何使用RNN訓練一個簡單的語音識別系統。 主要參考該GitHub專案,https://github.com/silicon-vall

計算機網絡方法——可靠數據傳輸原理1構造可靠數據傳輸協議

需要 足夠 方向 信息 不發送 可靠的 更多 定時器 基於 TCP向調用它的因特網應用提供所提供的服務模型 數據可以通過一條可靠的信道進行傳輸。借助於可靠的信道,傳輸比特就不會受到損壞或丟失,而且所有數據都是按其發送順序進行交付。 可靠傳輸協議 實現服務模型就需要可靠

【ACM】UVa 489 劊子手遊戲

 【題目】 Hangman Judge是一個猜英文單字的小遊戲(在電子字典中常會看到),遊戲規則如下: 1、答案單字寫在紙上(每個字元一張紙),並且被蓋起來,玩家每次猜一個英文字元(letter)。 2、如果這個英文字元猜中(在答案的英文單字中有出現),被猜中的字元就被翻

計算機網路:方法第七版Wireshark實驗指南

計算機網路:自頂向下方法(第七版)Wireshark實驗指南 這個資源在網上好像很難找,我歷經千辛萬苦找到之後,在文件的內部發現這些文件其實是免費公開在一個網站上的……,連結如下: http://gaia.cs.umass.edu/wireshark-labs/?tdsourcetag=s_pcqq_ai

深入 逐步求精 面向過程程式設計方法

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

筆記 -《計算機網路:方法》 第5章 鏈路層:鏈路、接入網和區域網0

第5章 鏈路層:鏈路、接入網和區域網(0)   ** “結構” 均為本章知識結構; ** “假設” 均為理想化,抽象的模型; ** “例項” 均為已經投入使用的模型; (結構1)   (假設1)同一子網內 傳遞網路層資料報的鏈路層工作流程 &nbs

深入 逐步求精 面向過程程式設計方法

                  程式設計初學者常常受困於不會想問題:“不知道讓計算機解決這個問題該如何做”。其實,程式設計師的一個基本功是,能夠將複雜的問題分解開來。學會分解任務,因超級大分為大的、中的、小的、超小的,直到能用很直接的方法解決。記住一個很管用的策略:自項向下,逐步求精。不管做何事,都拿這個