Netty(一):我和 NIO 不得不說的關係

Netty後端架構
先看定義:
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
可見Netty就是基於NIO的網路(Socket)客戶端服務端實現框架,它簡化了TCP/UDP客戶端服務端程式設計,開發人員不再關注底層的Socket讀取和寫入,而且Netty提供了不少的handler(如http、mqtt、redis協議等)實現,簡化了基於網路協議的程式設計複雜度。
啥是NIO?
NIO三件套:
- Channel:Channel儲存了socket連線的有關資訊,以及ChannelPipeLine(串起ChannelHandlerContext)/Unsafe(實現底層傳輸)/EventLoop(對應一個IO執行緒)/SelectionKey(如果為NIO模式,標識該Channel此時的連線讀寫事件)等有效資訊,NIO服務端負責響應連線的是NioServerSocketChannel,客戶端為NioSocketChannel
- Buffer:緩衝區,用於和NIO Channel互動,從Channel中讀取資料到Buffer,從Buffer將資料寫入到Channel
- Selector:選擇器,實現一個執行緒就能監聽多個Channel(Channel需先register到Selector)的讀寫等狀態,然後觸發ChannelPipeLine的fireChannelRead操作(序列ChannelHandlerContext持有的ChannelHandler)/底層Channel的資料寫入(資料來源於Buffer)
傳統的IO基於流(Stream),讀寫都是阻塞的,在讀取寫入Socket資料過程中,執行緒什麼都不能做,而NIO裡面有個Selector,由它來監聽所有Channel相關的檔案描述符(在linux中,裝置、Socket連線等都對應檔案描述符)狀態,這樣其他執行緒(業務開發的工作執行緒)就可以釋放出來,不被阻塞掉(如果涉及IO,可以走Future非同步處理),當然真正讀取資料的時候Channel對應的IO執行緒(在NIO中為NioEventLoop)還是阻塞讀寫的,但資料被儲存到了Buffer中,後面的處理都是面向Buffer,不再是面向流的了。

selector模型
我們來看看Netty(NIO模式)是怎麼和NIO對應的
- 見頂圖,處理客戶端連線的EventLoopGroup一般包含一個NioEventLoop,NioEventLoop即為一個Selector(也是一個執行緒,負責NIO),負責處理NioServerSocketChannel的狀態監測,當有連線到來時,執行accept(),新建一個NioSocketChannel負責與Client端的通訊,NioSocketChannel從Worker EventLoopGroup(NioEventLoop數量根據配置生成)中選擇一個NioEventLoop register進去,該Channel後續所有的NIO操作均由該NioEventLoop負責處理

NioEventLoop繼承體系
NioEventLoop主要包含兩部分操作:
- processSelectedKeys() :即selector功能
- runAllTasks():執行任務佇列中的任務,主要是定時任務和外部工作執行緒新增的讀寫任務
Netty中,耗時的業務程式碼可以寫在哪?
Netty允許在非NIO執行緒中寫訊息,如果當前執行緒是Channel對應的NIO執行緒則直接寫,如果不是,則寫訊息操作會被封裝成一個WriteTask,然後再由NioEventLoop的runAllTasks()定期處理
相關程式碼(見AbstractChannelHandlerContext.class):
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); }else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
歷史文章: