1. 程式人生 > >《Netty in action》 讀書筆記

《Netty in action》 讀書筆記

source ati 計算 結束 邏輯 socket 而且 sub 後者

聲明:這篇文章是記錄讀書過程中的知識點,並加以歸納總結,成文。文中圖片、代碼出自《Netty in action》。

1. 為什麽用Netty?

每個框架的流行,都一定有它出眾的地方。Netty就是為了網絡編程使用,它封裝了大量的通訊的底層內容,簡化開發者的工作量,讓開發者的精力全都放於業務上,而且它能高效的處理網絡通訊的東西。很多大的公司都使用Netty作為通訊框架,因此,使這個框架更加完善。

2. OIO(阻塞I/O)與NIO(非阻塞I/O)與AIO(異步I/O)

2.1 阻塞I/O

為什麽叫阻塞I/O呢?因為每個線程對應一個連接,連接的建立、數據讀取、寫入,都會進行阻塞,所以叫阻塞I/O。

性能瓶頸:一個線程對應一個連接,當連接數量巨大時,線程數量過多,每個線程都需要內存,內存會被迅速耗盡,以及線程間切換的開銷也十分巨大。另外,阻塞也是一個問題。accept()、read()、write()都會進行阻塞。

OIO線程模型:

技術分享圖片

處理邏輯:

技術分享圖片

2.2 非阻塞I/O

NIO三大組件:Buffer、Channel(數據來源或數據寫入的目的地)、Selector(多路復用器)

開啟Selector方式:Selector selector = Selector.open();

註冊的四種事件:SelectionKey.OP_READ(可讀)、SelectionKey.OP_WRITE(可寫)、SelectionKey.OP_CONNECT(建立起連接)、SelectionKey.OP_ACCEPT(接收連接)

為什麽叫非阻塞I/O呢?它不阻塞了嗎?不是的。是因為,使用Selector對多個並發的連接進行輪詢(也就是用更少的線程監視更多的連接),通過事件驅動來查明哪些可以進行I/O,不必等待到操作的結束。

NIO線程模型:

技術分享圖片

處理邏輯:

技術分享圖片

2.3 異步I/O

在NIO的基礎上,AIO可以讓效率有更多的提高。AIO是基於回調和事件驅動,當異步方法提交任務後可以立即返回,由線程池負責來執行任務,當任務完成後可以立即通知給任務的提交者。異步I/O的目的,主要為了控制線程數量,減少過多的線程帶來的內存消耗和CPU在線程調度上的開銷。

異步I/O的使用方式有兩種,一種是Future,另一種是回調函數。

2.3.1 Future

它代表異步計算的結果。

 1 interface ArchiveSearcher { String search(String target); }
 2  class App {
3 ExecutorService executor = ... 4 ArchiveSearcher searcher = ... 5 void showSearch(final String target) 6 throws InterruptedException { 7 Future<String> future 8 = executor.submit(new Callable<String>() { 9 public String call() { 10 return searcher.search(target); 11 }}); 12 displayOtherThings(); // do other things while searching 13 try { 14 displayText(future.get()); // use future 這裏會進行阻塞,一直到計算的完成 15 } catch (ExecutionException ex) { cleanup(); return; } 16 } 17 }

在Future這裏,還要提到FutreTask這個類,它可以包裝Callable和Runnable的對象,此外,因為它繼承Runnable接口,所以它可以被提交給Executor來執行。

2.3.2 回調函數 CompletionHandler

CompletionHandler它是一個處理器,用於異步計算結果。有兩個方法,completed(V result, A attachment)(執行成功時調用)和failed(Throwable exc, A attachment)(執行失敗時調用)。在下列地方,都有使用CompletionHandler作為回調函數的方法:

AsynchronousFileChannel 類的 read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer,? super A> handler)

AsynchronousServerSocketChannel類的 accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)

AsynchronousSocketChannel 類的 read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler)

3. ByteBuf

Java NIO中提供ByteBuffer作為byte的容器,在Netty中,使用ByteBuf來替代ByteBuffer。ByteBuf功能更強大,更加靈活。Netty對於數據的處理主要是通過兩個組件,抽象類ByteBuf和接口ByteBufHolder。

3.1 ByteBuf使用方式

HEAP BUFFERS 存儲JVM的堆數據

1 ByteBuf heapBuf = ...;
2 if (heapBuf.hasArray()) {
3 byte[] array = heapBuf.array();
4 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
5 int length = heapBuf.readableBytes();
6 handleArray(array, offset, length);
7 }

DIRECT BUFFERS

這裏的Direct該如何理解呢?它是ByteBuf的一種形式,在使用JNI的時候,它避免了將buffer的內容復制到中間媒介buffer上,可以直接使用。這就是DIrect的說明。

在JavaDoc中,關於ByteBuffer有明確的說明:direct buffers會處於正常的垃圾回收之外,因此,它非常適用於網絡傳輸方面。如果你的數據保留在heap-buffer上面,那麽socket傳輸前,要將buffer復制到direct buffer上。至於,它的不足之處,在於使用direct buffer的時候,必須先復制一份。代碼如下:

1 ByteBuf directBuf = ...;
2 if (!directBuf.hasArray()) {
3 int length = directBuf.readableBytes();
4 byte[] array = new byte[length];
5 directBuf.getBytes(directBuf.readerIndex(), array);
6 handleArray(array, 0, length);
7 }

COMPOSITE BUFFERS 就是前面兩種的結合

3.2 接口ByteBufHolder

在保存實際數值的時候,常需要保存一些附加的值。比如:Http返回的例子,除了返回的實際內容外,還有status code,cookies等等。

Netty用ByteBufholder來提供一些共通的方法使用,它有幾個常用的方法:content()、copy()、duplicate()

如果你要實現一個在ByteBuf中存儲實際負載的對象的話,那麽ByteBufHolder是個好的選擇。

3.3 ByteBuf allocation

這裏討論的是ByteBuf實例的管理。

3.3.1 On-demand: interface ByteBufAllocator

ByteBufAllocator 常見方法(部分記錄):buffer()、heapBuffer()、directBuffer()、compositeBuffer()、ioBuffer()

Netty用ByteBufAllocator實現了緩存池。我們可以通過Channel或是綁定在ChannelHandler上的ChannelHandlerContext,來獲得ByteBufAllocator的引用。示例如下:

1 Channel channel = ...;
2 ByteBufAllocator allocator = channel.alloc();
3 ....
4 ChannelHandlerContext ctx = ...;
5 ByteBufAllocator allocator2 = ctx.alloc();

ByteBufAllocator有兩種實現:PooledByteBufAllocator 和 UnpooledByteBufAllocator 前者會進行緩存;後者,不會緩存ByteBuf的實例,每次調用都會返回新的。

3.3.2 Unpooled buffers

Netty有一個Unpooled的類,來提供靜態方法,用於創建未被緩存ByteBuf實例。

常用的方法:buffer()、directBuffer()、wrappedBuffer()、copiedBuffer()

3.4 Reference counting

Reference counting被用來優化內存的使用,通過釋放不再使用的資源。Netty中的Reference counting 和 ByteBufHolder 都實現了ReferenceCounted接口。ReferenceCounted的實現的實例,它們的活躍的實例的計數都是從1進行的,只要大於0,對象就一定不會被釋放。當數字減為0時,資源就會被釋放。下面給出兩個例子:

Reference counting:
1
Channel channel = ...; 2 ByteBufAllocator allocator = channel.alloc(); 3 .... 4 ByteBuf buffer = allocator.directBuffer(); 5 assert buffer.refCnt() == 1;

Release reference-counted object

1 ByteBuf buffer = ...;
2 boolean released = buffer.release();   // 活躍的引用,變成0,對象被釋放,方法返回true

4. ChannelHandler 和 ChannelPipline

4.1 The ChannelHandler family

4.1.1 Channel 生命周期

技術分享圖片

ChannelRegistered:Channel被註冊到EventLoop

ChannelActive: Channel活躍,可以發送、接收數據

ChannelInactive:Channel沒有連接到remote peer

ChannelUnregistered:Channel被創建了,但沒被註冊到EventLoop

4.1.2 ChannelHandler

先看看它的接口層次圖:

技術分享圖片

ChannelHandler 生命周期的方法:handlerAdded(ChannelHandler被加入到ChannelPipeline時調用)、handlerRemoved(被移除時)、exceptionCaught(ChannelPipeline的處理過程中發生exception時調用)

ChannelInboundHandler

處理inbound數據和Channel狀態的改變

常用方法:channelRegistered、channelActive、channelReadComplete、channelRead 等等

看一段代碼:

1 @Sharable       //這裏的註解的意思是,這個handler可以被加到多個ChannelPipelines中
2 public class DiscardHandler extends ChannelInboundHandlerAdapter {
// 當繼承ChannelInboundHandler,覆蓋channelRead時,需要明確地釋放緩存的實例
3 @Override
4 public void channelRead(ChannelHandlerContext ctx, Object msg) {
5 ReferenceCountUtil.release(msg);
6 }
7 }

看另一個例子:

1 @Sharable
2 public class SimpleDiscardHandler       // SimpleDiscardHandler 會自動釋放資源
3 extends SimpleChannelInboundHandler<Object> {
4 @Override
5 public void channelRead0(ChannelHandlerContext ctx,
6 Object msg) {
7 // 不需要任何明確地釋放資源的操作,這裏只需做業務處理就可以
8 }
9 }

註意:如果一個message被使用或是丟棄,並且沒有被傳給管道中下一個ChannelOutboundHandler時,就一定要調用ReferenceCountUtil.release() 釋放資源。如果消息到達實際傳輸層,那麽,write或是Channel關閉時,會自動釋放資源。

ChannelOutboundHandler:處理outbound數據,並允許所有操作的攔截

常見方法:bind、connect、disconnect、close 等

ChannelHandlerAdapter有一個isSharable()方法,判斷是否有註解@Sharable

4.1.3 Resource management

無論是ChannelInboundHandler.channelRead()還是ChannelOutboundHandler.write(),都需要確保沒有資源泄露。當你使用ByteBuf之後,需要調整引用的計數reference count。為了幫助診斷潛在的問題,Netty提供了一個ResourceLeakDetector,檢測到泄漏時,會打印log。

使用方法:java -Dio.netty.leakDetectionLevel=ADVANCED

參數有四種:DISABLED、SIMPLE(default)、ADVANCED、PARANOID

4.2 ChannelPipeline接口

每個新Channel創建時,都會被指定一個新的ChannelPipeline,這種對應關系是永久的,不會改變。

ChannelPipeline 和 ChannelHandlers 關系圖:

技術分享圖片

ChannelPipeline一些常用方法:

addLast、addFirst、remove、replace、get、context(獲得綁定在ChannelHandler上的ChannelHandlerContext)、names(獲得ChannelPipeline上的所有ChannelHandler名字)

4.3 ChannelHandlerContext接口

4.3.1 ChannelHandlerContext的使用

它的主要作用就是管理handlers之間的交互。

常見API:bind、channel、close、connect、deregister、fireChannelActive、fireChannelRead、handler、pipeline、read、write 等等

ChannelHandlerContext的使用,如下圖:

技術分享圖片

每個ChannelHandler都是和一個ChannelHandlerContext相互關聯,關聯關系一直不會改變,因此,緩存一個它的引用很安全。

下面看2個代碼片段:

 1 ChannelHandlerContext ctx = ..;
 2 Channel channel = ctx.channel();   // 從ChannelHandlerContext關聯到Channel
 3 channel.write(Unpooled.copiedBuffer("Netty in Action",
 4 CharsetUtil.UTF_8));
 5 
 。。。
 9 ChannelHandlerContext ctx = ..;
10 ChannelPipeline pipeline = ctx.pipeline();  // 從ChannelHandlerContext 關聯到 ChannelPipeline 
11 pipeline.write(Unpooled.copiedBuffer("Netty in Action", 12 CharsetUtil.UTF_8));

上面代碼中,雖然Channel 或是 ChannelPipeline 調用的write()方法,代理了pipline中事件的傳遞,實際上,ChannelHandler之間的傳遞是ChannelHandlerContext來做的。

技術分享圖片

如果你想從某個handler開始(不是從第一個開始),那麽就可以利用ChannelHandlerContext。

1 ChannelHandlerContext ctx = ..;       // 獲取ChannelHandlerContext的引用
2 ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));   //write() sends the buffer to the next ChannelHandler

技術分享圖片

4.3.2 ChannelHandler 和ChannelHandlerContext 的高級用法

6.3.2

To be continued!!!

《Netty in action》 讀書筆記