1. 程式人生 > >徹底搞懂 netty 執行緒模型

徹底搞懂 netty 執行緒模型

編者注:Netty是Java領域有名的開源網路庫,特點是高效能和高擴充套件性,因此很多流行的框架都是基於它來構建的,比如我們熟知的Dubbo、Rocketmq、Hadoop等。本文就netty執行緒模型展開分析討論下 : )

IO模型

  • BIO:同步阻塞IO模型;
  • NIO:基於IO多路複用技術的“非阻塞同步”IO模型。簡單來說,核心將可讀可寫事件通知應用,由應用主動發起讀寫操作;
  • AIO:非阻塞非同步IO模型。簡單來說,核心將讀完成事件通知應用,讀操作由核心完成,應用只需操作資料即可;應用做非同步寫操作時立即返回,核心會進行寫操作排隊並執行寫操作。

NIO和AIO不同之處在於應用是否進行真正的讀寫操作。

reactor和proactor模型

  • reactor:基於NIO技術,可讀可寫時通知應用;
  • proactor:基於AIO技術,讀完成時通知應用,寫操作應用通知核心。

netty執行緒模型

netty的執行緒模型是基於Reactor模型的。

netty單執行緒模型

Reactor 單執行緒模型,是指所有的 I/O 操作都在同一個 NIO 執行緒上面完成的,此時NIO執行緒職責包括:接收新建連線請求、讀寫操作等。

在一些小容量應用場景下,可以使用單執行緒模型(注意,Redis的請求處理也是單執行緒模型,為什麼Redis的效能會如此之高呢?因為Redis的讀寫操作基本都是記憶體操作,並且Redis協議比較簡潔,序列化/反序列化耗費效能更低

)。但是對於高負載、大併發的應用場景卻不合適,主要原因如下:

  • 一個NIO執行緒同時處理成百上千的連線,效能上無法支撐,即便NIO執行緒的CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送。
  • 當NIO執行緒負載過重之後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往會進行重發,這更加重了NIO執行緒的負載,最終會導致大量訊息積壓和處理超時,成為系統的效能瓶頸。
  • 可靠性問題:一旦NIO執行緒意外跑飛,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障。

Reactor多執行緒模型

Rector 多執行緒模型與單執行緒模型最大的區別就是有一組 NIO 執行緒來處理連線讀寫操作,一個NIO執行緒處理Accept。一個NIO執行緒可以處理多個連線事件,一個連線的事件只能屬於一個NIO執行緒。

在絕大多數場景下,Reactor 多執行緒模型可以滿足效能需求。但是,在個別特殊場景中,一個 NIO 執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如併發百萬客戶端連線,或者服務端需要對客戶端握手進行安全認證,但是認證本身非常損耗效能。在這類場景下,單獨一個 Acceptor 執行緒可能會存在效能不足的問題,為了解決效能問題,產生了第三種 Reactor 執行緒模型——主從Reactor 多執行緒模型。

Reactor主從多執行緒模型

主從 Reactor 執行緒模型的特點是:服務端用於接收客戶端連線的不再是一個單獨的 NIO 執行緒,而是一個獨立的 NIO 執行緒池。Acceptor 接收到客戶端 TCP連線請求並處理完成後(可能包含接入認證等),將新建立的 SocketChannel注 冊 到 I/O 線 程 池(sub reactor 線 程 池)的某個I/O執行緒上, 由它負責SocketChannel 的讀寫和編解碼工作。Acceptor 執行緒池僅僅用於客戶端的登入、握手和安全認證,一旦鏈路建立成功,就將鏈路註冊到後端 subReactor 執行緒池的 I/O 執行緒上,由 I/O 執行緒負責後續的 I/O 操作。

netty執行緒模型思考

netty 的執行緒模型並不是一成不變的,它實際取決於使用者的啟動引數配置。通過設定不同的啟動引數,Netty 可以同時支援 Reactor 單執行緒模型、多執行緒模型。

為了儘可能地提升效能,Netty 在很多地方進行了無鎖化的設計,例如在 I/O 執行緒內部進行序列操作,避免多執行緒競爭導致的效能下降問題。表面上看,序列化設計似乎 CPU 利用率不高,併發程度不夠。但是,通過調整 NIO 執行緒池的執行緒引數,可以同時啟動多個序列化的執行緒並行執行,這種區域性無鎖化的序列執行緒設計相比一個佇列多個工作執行緒的模型效能更優。(小夥伴們後續多執行緒併發流程可參考該類實現方案

Netty 的 NioEventLoop 讀取到訊息之後,直接呼叫 ChannelPipeline 的fireChannelRead (Object msg)。 只要使用者不主動切換執行緒, 一直都是由NioEventLoop 呼叫使用者的 ChannelHandler,期間不進行執行緒切換。這種序列化處理方式避免了多執行緒操作導致的鎖的競爭,從效能角度看是最優的。

Netty擁有兩個NIO執行緒池,分別是bossGroupworkerGroup,前者處理新建連線請求,然後將新建立的連線輪詢交給workerGroup中的其中一個NioEventLoop來處理,後續該連線上的讀寫操作都是由同一個NioEventLoop來處理。注意,雖然bossGroup也能指定多個NioEventLoop(一個NioEventLoop對應一個執行緒),但是預設情況下只會有一個執行緒,因為一般情況下應用程式只會使用一個對外監聽埠。

這裡試想一下,難道不能使用多執行緒來監聽同一個對外埠麼,即多執行緒epoll_wait到同一個epoll例項上?

epoll相關的主要兩個方法是epoll_wait和epoll_ctl,多執行緒同時操作同一個epoll例項,那麼首先需要確認epoll相關方法是否執行緒安全:簡單來說,epoll是通過鎖來保證執行緒安全的, epoll中粒度最小的自旋鎖ep->lock(spinlock)用來保護就緒的佇列, 互斥鎖ep->mtx用來保護epoll的重要資料結構紅黑樹。

看到這裡,可能有的小夥伴想到了Nginx多程序針對監聽埠的處理策略,Nginx是通過accept_mutex機制來保證的。accept_mutex是nginx的(新建連線)負載均衡鎖,讓多個worker程序輪流處理與client的新連線。當某個worker程序的連線數達到worker_connections配置(單個worker程序的最大處理連線數)的最大連線數的7/8時,會大大減小獲取該worker獲取accept鎖的概率,以此實現各worker程序間的連線數的負載均衡。accept鎖預設開啟,關閉它時nginx處理新建連線耗時會更短,但是worker程序之間可能連線不均衡,並且存在“驚群”問題。只有在使能accept_mutex並且當前系統不支援原子鎖時,才會用檔案實現accept鎖。注意,accept_mutex加鎖失敗時不會阻塞當前執行緒,類似tryLock。

現代linux中,多個socker同時監聽同一個埠也是可行的,nginx 1.9.1也支援這一行為。linux 3.9以上核心支援SO_REUSEPORT選項,允許多個socker bind/listen在同一埠上。這樣,多個程序可以各自申請socker監聽同一埠,當連線事件來臨時,核心做負載均衡,喚醒監聽的其中一個程序來處理,reuseport機制有效的解決了epoll驚群問題。

再回到剛才提出的問題,java中多執行緒來監聽同一個對外埠,epoll方法是執行緒安全的,這樣就可以使用使用多執行緒監聽epoll_wait了麼,當然是不建議這樣乾的,除了epoll的驚群問題之外,還有一個就是,一般開發中我們使用epoll設定的是LT模式(水平觸發方式,與之相對的是ET預設,前者只要連線事件未被處理就會在epoll_wait時始終觸發,後者只會在真正有事件來時在epoll_wait觸發一次),這樣的話,多執行緒epoll_wait時就會導致第一個執行緒epoll_wait之後還未處理完畢已發生的事件時,第二個執行緒也會epoll_wait返回,顯然這不是我們想要的,關於java nio的測試demo如下:

public class NioDemo {
    private static AtomicBoolean flag = new AtomicBoolean(true);
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(8080));
        // non-block io
        serverChannel.configureBlocking(false);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 多執行緒執行
        Runnable task = () -> {
            try {
                while (true) {
                    if (selector.select(0) == 0) {
                        System.out.println("selector.select loop... " + Thread.currentThread().getName());
                        Thread.sleep(1);
                        continue;
                    }

                    if (flag.compareAndSet(true, false)) {
                        System.out.println(Thread.currentThread().getName() + " over");
                        return;
                    }

                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();

                        // accept event
                        if (key.isAcceptable()) {
                            handlerAccept(selector, key);
                        }

                        // socket event
                        if (key.isReadable()) {
                            handlerRead(key);
                        }

                        /**
                         * Selector不會自己從已選擇鍵集中移除SelectionKey例項,必須在處理完通道時手動移除。
                         * 下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。
                         */
                        iter.remove();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        };

        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < 2; i++) {
            Thread thread = new Thread(task);
            threadList.add(thread);
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
        System.out.println("main end");
    }

    static void handlerAccept(Selector selector, SelectionKey key) throws Exception {
        System.out.println("coming a new client... " + Thread.currentThread().getName());
        Thread.sleep(10000);
        SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
    }

    static void handlerRead(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();

        int num = channel.read(buffer);
        if (num <= 0) {
            // error or fin
            System.out.println("close " + channel.getRemoteAddress());
            channel.close();
        } else {
            buffer.flip();
            String recv = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
            System.out.println("recv: " + recv);

            buffer = ByteBuffer.wrap(("server: " + recv).getBytes());
            channel.write(buffer);
        }
    }
}

netty執行緒模型實踐

(1) 時間可控的簡單業務直接在 I/O 執行緒上處理

時間可控的簡單業務直接在 I/O 執行緒上處理,如果業務非常簡單,執行時間非常短,不需要與外部網路互動、訪問資料庫和磁碟,不需要等待其它資源,則建議直接在業務 ChannelHandler 中執行,不需要再啟業務的執行緒或者執行緒池。避免執行緒上下文切換,也不存線上程併發問題。

(2) 複雜和時間不可控業務建議投遞到後端業務執行緒池統一處理

複雜度較高或者時間不可控業務建議投遞到後端業務執行緒池統一處理,對於此類業務,不建議直接在業務 ChannelHandler 中啟動執行緒或者執行緒池處理,建議將不同的業務統一封裝成 Task,統一投遞到後端的業務執行緒池中進行處理。過多的業務ChannelHandler 會帶來開發效率和可維護性問題,不要把 Netty 當作業務容器,對於大多數複雜的業務產品,仍然需要整合或者開發自己的業務容器,做好和Netty 的架構分層。

(3) 業務執行緒避免直接操作 ChannelHandler

業務執行緒避免直接操作 ChannelHandler,對於 ChannelHandler,IO 執行緒和業務執行緒都可能會操作,因為業務通常是多執行緒模型,這樣就會存在多執行緒操作ChannelHandler。為了儘量避免多執行緒併發問題,建議按照 Netty 自身的做法,通過將操作封裝成獨立的 Task 由 NioEventLoop 統一執行,而不是業務執行緒直接操作,相關程式碼如下所示:

如果你確認併發訪問的資料或者併發操作是安全的,則無需多此一舉,這個需要根據具體的業務場景進行判斷,靈活處理。

推薦閱讀

  • Java nio 空輪詢bug到底是什麼
  • Netty 入門,這一篇文章就夠了
  • Netty 啟動流程解析
  • Netty 處理連線那些事
  • Java常見幾種動態代理的對比
  • 程式設計師必看| mockito原理淺析
  • Eureka 原理分析
  • MQ初窺門徑【面試必看的Kafka和RocketMQ儲存區別】
  • java lambda 深入淺出

歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。

相關推薦

徹底 netty 執行模型

編者注:Netty是Java領域有名的開源網路庫,特點是高效能和高擴充套件性,因此很多流行的框架都是基於它來構建的,比如我們熟知的Dubbo、Rocketmq、Hadoop等。本文就netty執行緒模型展開分析討論下 : ) IO模型 BIO:同步阻塞IO模型; NIO:基於IO多路複用技術的“非阻塞同

Java IO模型Netty執行模型

目錄   一、概念介紹 1、同步與非同步 2、阻塞與非阻塞 3、同步阻塞io 4、同步非阻塞io 5、IO多路複用 6、非同步IO 二、BIO(同步阻塞IO) 三、偽非同步IO ​ 四、NIO(同步阻塞IO) 五、Netty執行緒模型

一文 Java 執行中斷

在之前的一文《如何"優雅"地終止一個執行緒》中詳細說明了 stop 終止執行緒的壞處及如何優雅地終止執行緒,那麼還有別的可以終止執行緒的方法嗎?答案是肯定的,它就是我們今天要分享的——執行緒中斷。 下面的這斷程式碼大家應該再熟悉不過了,執行緒休眠需要捕獲或者丟擲

java NIO模擬 Netty執行模型

兩個類,NioAcceptor,處理連線 單執行緒。NioReactor處理讀寫,多執行緒。 解釋放在程式碼中 NioAcceptor package com.zwj.myNio; import java.io.IOException; import java.net

Netty系列三:netty執行模型

netty使用reactor反應堆執行緒模型。 一、Reactor模型單執行緒模型如下: 使用者發起IO操作到事件分離器 事件分離器呼叫相應的處理器處理事件 事件處理完成,事件分離器獲得控制權,繼續相應處理 二、Reactor模型多執行緒模型如下

Netty執行模型(中)

1. 背景 1.1. 驚人的效能資料資料分析與企業架構京東618大促下的資料驅動個性化推薦如何構建軟硬體結合的人工智慧產品研發體系中國創新型網際網路企業走向海外的技術機遇與挑戰LinkedIn成員分類平臺大資料應用的最佳實踐1.2. Netty基礎入門2. Net

徹底 Java 執行池原理

概述 這篇文章是我在閱讀原始碼時整理的一些筆記,對原始碼的關鍵點進行了比較詳細的註釋,然後加上一些自己對執行緒池機制的理解。最終目的是要弄清楚下面這些問題: 執行緒池有 execute() 和 submit() 方法,執行機制分別是什麼? 如何新建執行緒? 任務如何執行? 執行緒如何銷燬

netty原始碼分析(十七)Netty執行模型深度解讀與架構設計原則

上次分析到: public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop public ChannelFutu

netty執行模型

Proactor和Reactor Proactor和Reactor是兩種經典的多路複用I/O模型,主要用於在高併發、高吞吐量的環境中進行I/O處理。 I/O多路複用機制都依賴於一個事件分發器,事件分離器把接收到的客戶事件分發到不同的事件處理器中,如下圖:

Netty執行模型之客戶端執行模型

相比於服務端,客戶端的執行緒模型簡單一些,它的工作原理如下:第一步:由使用者執行緒發起客戶端連線,示例程式碼如下:客戶端只需要建立一個EventLoopGroup,因為它不需要獨立的執行緒去監聽客戶端連

Netty執行模型及EventLoop和EventLoopGroup原始碼解析

 1、netty執行緒模型   一般在討論netty的執行緒模型的時候,我們會考慮到經典的Reactor執行緒模型,下面分別說明下經典的Reactor執行緒模型 1、1 Reactor單執行緒模型      這個執行緒模型指的是所有的nio操作都是在一個執行緒中去完成的。n

Netty執行模型詳解

1. 背景 1.1. Java執行緒模型的演進 1.1.1. 單執行緒 時間回到十幾年前,那時主流的CPU都還是單核(除了商用高效能的小機),CPU的核心頻率是機器最重要的指標之一。 在Java領域當時比較流行的是單執行緒程式設計,對於CPU密集型的應用程式而言,

一夜 | JVM 執行安全與鎖優化

前言 本文已經收錄到我的 Github 個人部落格,歡迎大佬們光臨寒舍: 我的 GIthub 部落格 學習導圖 一.為什麼要學習記憶體模型與執行緒? 之前我們學習了記憶體模型和執行緒,瞭解了 JMM 和執行緒,初步探究了 JVM 怎麼實現併發,而本篇文章,我們的關注點是 JVM 如何實現高效 併發

Netty原始碼死磕一(netty執行模型及EventLoop機制)

## 引言 好久沒有寫部落格了,近期準備把`Netty`原始碼啃一遍。在這之前本想直接看原始碼,但是看到後面發現其實效率不高, 有些概念還是有必要回頭再細啃的,特別是其執行緒模型以及`EventLoop`的概念。 當然在開始之前還是有務必要對`IO模型`要有清晰準確的認識。 傳送門 []() ## 事件

執行安全(上)--徹底synchronized(從偏向鎖到重量級鎖)

接觸過執行緒安全的同學想必都使用過synchronized這個關鍵字,在java同步程式碼快中,synchronized的使用方式無非有兩個: 通過對一個物件進行加鎖來實現同步,如下面程式碼。 synchronized(lockObject){  &nb

Netty(EventLoop 和執行模型)

EventLoop介面     Netty的EventLoop是協同設計的一部分,它採用了兩個基本的API:併發和網路程式設計。首先,io.netty.util.concurrent包構建在JDK的java.util.concurrent包上,用來提供執行緒執行

netty原始碼解解析(4.0)-5 執行模型-EventExecutorGroup框架

上一章講了EventExecutorGroup的整體結構和原理,這一章我們來探究一下它的具體實現。 EventExecutorGroup和EventExecutor介面 io.netty.util.concurrent.EventExecutorGroup j

(譯)Netty In Action第七章—事件迴圈和執行模型

請尊重勞動成果,未經本人允許,拒絕轉載,謝謝! 這章包涵以下內容 - 執行緒模型概覽 - 事件迴圈概念和實現 - 任務排程 - 實現細節 簡單地說,執行緒模型指定了OS、程式語言、框架或應用程式的上下文中的執行緒管理的關鍵方面。執行緒創造的方式和時間明顯對於應用程

netty原始碼解解析(4.0)-6 執行模型-IO執行EventLoopGroup和NIO實現(一)

介面定義 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 說明

netty原始碼解解析(4.0)-7 執行模型-IO執行EventLoopGroup和NIO實現(二)

把NIO事件轉換成對channel unsafe的呼叫或NioTask的呼叫 processSelectedKeys()方法是處理NIO事件的入口: private void processSelectedKeys() { if (selectedKeys != null) {