參考:Scalable IO in Java - http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

mainReactor負責處理客戶端的連線請求,並將accept的連線註冊到subReactor的其中一個執行緒上;subReactor負責處理客戶端通道上的資料讀寫;Thread Pool是具體的業務邏輯執行緒池,處理具體業務。

參考: https://www.jianshu.com/nb/6812432

Selector監聽多個通道的事件,可以用一個執行緒監聽多個channel事件,SelectionKey表示通道和選擇器之間的關係,追蹤對應通道是否就緒。

ServerSocketChannel監聽新進來的TCP連線的通道,當連線達到會建立SocketChannel

Buffer可存放固定數量資料的緩衝,與通道一起工作,讀寫資料。資料只能通過buffer寫入到channel或者從channel讀出到Buffer

Netty服務端簡單例子,源自原始碼測試中

EventLoopGroup是Netty實現的執行緒池介面,兩個執行緒池:bossGroup和workerGroup分別對應mainReactor和subReactor,其中boss專門用於接受客戶端連線,worker也就是常說的IO執行緒專門用於處理IO事件。使用者自定義的業務執行緒池須實現EventExecutorGroup介面,4.1版本則可以直接使用JAVA自帶的執行緒池

Netty提供了兩個啟動器ServerBootstrapBootstrap,分別用於啟動伺服器端和客戶端程式

group(EventLoopGroup...)方法用於指定一個或兩個Reactor

channel(Channel)方法本質用來指定一個Channel工廠

option(Key, Value)用於指定TCP相關的引數以及一些Netty自定義的引數

childHandler()用於指定subReactor中的處理器

handler()用於指定mainReactor的處理器

ChannelInitializer,它是一個特殊的Handler,功能是初始化多個Handler,完成初始化工作後,ChannelInitializer會從Handler鏈中刪除

bind(int)方法將服務端Channel繫結到本地埠,成功後將accept客戶端的連線,從而是整個框架執行起來

不在addLast(Handler)方法中指定執行緒池,那麼將使用預設的subReacor即woker執行緒池也即IO執行緒池執行處理器中的業務邏輯程式碼

       // Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 使用者可以自定義ThreadPool EventExecutorGroup threadPool = new ThreadPool();
//在後面p.addLast的時候可以指定是否使用單獨的Pool還是和前面的在一起
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
}); // Start the server.
ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

NioEventLoop以及NioEventLoopGroup即執行緒和執行緒池

EventExecutorGroup

The EventExecutorGroup is responsible for providing the EventExecutor's to use via its next() method. Besides this, it is also responsible for handling their life-cycle and allows shutting them down in a global fashion.

AbstractEventExecutorGroup implements EventExecutorGroup 包含都如類似實現,(1).找一個執行緒。(2).交給執行緒執行。
    @Override
public void execute(Runnable command) {
next().execute(command);
}

MultithreadEventExecutorGroup 實現了執行緒的建立和執行緒的選擇

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

MultithreadEventLoopGroup

ByteBuf:

清空前:
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
清空後:
+------------------+-------------------------------------+
| readable bytes | writable bytes (got more space) |
+------------------+-------------------------------------+
| | |
readerIndex(0)<= writerIndex (decreased) <= capacity

Netty4開始引入了引用計數的特性,緩衝區的生命週期可由引用計數管理,當緩衝區不再有用時,可快速返回給物件池或者分配器用於再次分配,從而大大提高效能,進而保證請求的實時處理,引用計數並不專門服務於緩衝區ByteBuf。使用者可根據實際需求,在其他物件之上實現引用計數介面ReferenceCounted

使用retain()使引用計數增加1,使用release()使引用計數減少1

通過duplicate()slice()等等生成的派生緩衝區ByteBuf會共享原生緩衝區的內部儲存區域。此外,派生緩衝區並沒有自己獨立的引用計數而需要共享原生緩衝區的引用計數。也就是說,當我們需要將派生緩衝區傳入下一個元件時,一定要注意先呼叫retain()方法

HeapByteBuf的底層實現為JAVA堆內的位元組陣列。

DirectByteBuf的底層實現為作業系統核心空間的位元組陣列,直接緩衝區的位元組陣列位於JVM堆外的NATIVE堆,由作業系統管理申請和釋放,而DirectByteBuf的引用由JVM管理。

CompositeByteBuf,顧名思義,有以上兩種方式組合實現, 需要將後一個緩衝區的資料拷貝到前一個緩衝區;而使用組合緩衝區則可以直接儲存兩個緩衝區,因為其內部實現組合兩個緩衝區並保證使用者如同操作一個普通緩衝區一樣操作該組合緩衝區,從而減少拷貝操作。

UnpooledByteBuf為不使用物件池的緩衝區,不需要建立大量緩衝區物件時建議使用該類緩衝區。

PooledByteBuf為物件池緩衝區,當物件釋放後會歸還給物件池,所以可迴圈使用。

slice()和duplicate()方法生成的ByteBuf與原ByteBuf共享相同的底層實現,只是各自維護獨立的索引和標記,使用這兩個方法時,特別需要注意結合使用場景確定是否呼叫retain()增加引用計數

在Netty中,小件商品和大件商品都首先從同城倉庫(ThreadCache-tcache)送出;如果同城倉庫沒有,則會從區域倉庫(Arena)送出。

From: https://www.jianshu.com/p/15304cd63175

  • 記憶體分配的最小單位為16B。

  • < 512B的請求為Tiny,< 8KB(PageSize)的請求為Small,<= 16MB(ChunkSize)的請求為Normal,> 16MB(ChunkSize)的請求為Huge。

  • < 512B的請求以16B為起點每次增加16B;>= 512B的請求則每次加倍。

  • 不在表格中的請求大小,將向上規範化到表格中的資料,比如:請求分配511B、512B、513B,將依次規範化為512B、512B、1KB

為了提高記憶體分配效率並減少內部碎片,jemalloc演算法將Arena切分為小塊Chunk,根據每塊的記憶體使用率又將小塊組合為以下幾種狀態:QINIT,Q0,Q25,Q50,Q75,Q100。Chunk塊可以在這幾種狀態間隨著記憶體使用率的變化進行轉移

  • QINIT的記憶體使用率為[0,25)、Q0為(0,50)、Q100為[100,100]。

  • Chunk塊的初始狀態為QINIT,當使用率達到25時轉移到Q0狀態,再次達到50時轉移到Q25,依次類推直到Q100;當記憶體釋放時又從Q100轉移到Q75,直到Q0狀態且記憶體使用率為0時,該Chunk從Arena中刪除。注意極端情況下,Chunk可能從QINIT轉移到Q0再釋放全部記憶體,然後從Arena中刪除。

雖然已將Arena切分為小塊Chunk,但實際上Chunk是相當大的記憶體塊,在jemalloc中建議為4MB,Netty預設使用16MB。為了進一步提高記憶體利用率並減少內部碎片,需要繼續將Chunk切分為小的塊Page。一個典型的切分將Chunk切分為2048塊,Netty正是如此,可知Page的大小為:16MB/2048=8KB。一個好的記憶體分配演算法,應使得已分配記憶體塊儘可能保持連續,這將大大減少內部碎片,由此jemalloc使用夥伴分配演算法儘可能提高連續性。夥伴分配演算法的示意圖如下:

  1. 8KB--需要一個Page,第11層滿足要求,故分配2048節點即Page0;

  2. 16KB--需要兩個Page,故需要在第10層進行分配,而1024的子節點2048已分配,從左到右找到滿足要求的1025節點,故分配節點1025即Page2和Page3;

  3. 8KB--需要一個Page,第11層滿足要求,2048已分配,從左到右找到2049節點即Page1進行分配。

分配結束後,已分配連續的Page0-Page3,這樣的連續記憶體塊,大大減少內部碎片並提高記憶體使用率。

Netty中每個Page的預設大小為8KB,在實際使用中,很多業務需要分配更小的記憶體塊比如16B、32B、64B等。為了應對這種需求,需要進一步切分Page成更小的SubPage。SubPage是jemalloc中記憶體分配的最小單位,不能再進行切分。SubPage切分的單位並不固定,以第一次請求分配的大小為單位(最小切分單位為16B)。比如,第一次請求分配32B,則Page按照32B均等切分為256塊;第一次請求16B,則Page按照16B均等切分為512塊。為了便於記憶體分配和管理,根據SubPage的切分單位進行分組,每組使用雙向連結串列組合