netty原始碼解解析(4.0)-14 Channel NIO實現:讀取資料
本章分析Nio Channel的資料讀取功能的實現。
Channel讀取資料需要Channel和ChannelHandler配合使用,netty設計資料讀取功能包括三個要素:Channel, EventLoop和ChannelHandler。Channel有個read方法,這個方法不會直接讀取資料,它的作用是通知持有當前channel的eventLoop可以從這個這個channel讀取資料了,這個方法被呼叫之後eventLoop會在channel有資料可讀的時候從channel讀出資料然後把資料放在channelRead事件中交給ChannelInboundHandler的channelRead方法處理,當eventLoop發現channel中暫時沒時間可讀會觸發一個channelReadComplete事件。
read: Nio Channel通知eventLoop開始讀資料
channel read方法的呼叫棧:
1 io.netty.channel.AbstractChannel#read 2 io.netty.channel.DefaultChannelPipeline#read 3 io.netty.channel.AbstractChannelHandlerContext#read 4 io.netty.channel.AbstractChannelHandlerContext#invokeRead 5 io.netty.channel.DefaultChannelPipeline.HeadContext#read 6 io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead 7 io.netty.channel.nio.AbstractNioChannel#doBeginRead
呼叫channel的read的方法,會觸發read事件,通過pipeline呼叫AbstractChannel unsafe的beginRead方法,這個方法的語義是通知eventLoop可以從channel讀資料了,但他沒有實現具體功能,把具體功能留給doBeginRead實現。doBeginRead在AbstractChannel中定義,它是一個抽象方法。AbstractNioChannel實現了這個方法:
1 @Override 2 protected void doBeginRead() throws Exception { 3// Channel.read() or ChannelHandlerContext.read() was called 4if (inputShutdown) { 5return; 6} 7 8final SelectionKey selectionKey = this.selectionKey; 9if (!selectionKey.isValid()) { 10return; 11} 12 13readPending = true; 14 15final int interestOps = selectionKey.interestOps(); 16if ((interestOps & readInterestOp) == 0) { 17selectionKey.interestOps(interestOps | readInterestOp); 18} 19 }
這裡的doBeginRead實現,只有第17行是核心程式碼:把readInterestOps儲存是的read操作標誌新增到SelectableChannel的SelectionKey中。這裡的readInterestOps是一個類的屬性,在AbstractNioChannel中,它沒有明確的定義,只有一個抽象的定義:NIO中的一個可以可以當成read操作的的標誌。在NIO中可以當成read的有SelectionKey.OP_READ和SelectionKey.OP_ACCEPT。readInterestOps在AbstractNioChannel的構造方法中使用傳入的引數初始化,子類就可以根據需要確定interestOps的具體含義。
設定好beginRead之後,NioEventLoop就可以使用Selector得到檢測到channel上的read事件了,下面是NioEventLoop中處理read事件的程式碼:
1 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) 2 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 3unsafe.read(); 4 }
這裡呼叫了unsafe的read的方法,在Channel的Unsafe中並沒有定義這個方法,它在io.netty.channel.nio.AbstractNioChannel.NioUnsafe中定義,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe和io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe中有兩個不同的實現。這兩個實現的區別是:NioMessageUnsafe.read是把從channel中讀出的資料轉換成Object, NioByteUnsafe.read是從channel中讀出byte資料流。下面來詳解分析這兩種實現。
AbstractNioChannel.NioUnsafe.read實現:從channel讀取資料
netty在NIO Channel的設計上,把讀資料設計成獨立的抽象層。之所以這樣設計有兩個方面的原因:
- 在NIO中,三中不同型別的Channel讀取的資料型別是不一樣的,NioServerSocketChannel讀出的是一個新建的NioSockeChannel, NioSocketChannel讀出的byte資料流,NioDatagramChannel讀出是資料報。
- NIO三種Channel都執行在非阻塞模式下,相比於阻塞模式,非阻塞模式下讀資料要處理的問題要複雜的多。使用Selector和非阻塞模式被動地讀取資料,需要處理連線斷開和socket異常,由於Selector使用的是邊緣觸發模式,一次read呼叫務必要把已經在socket recvbuffer中的資料全部讀出來,否則可以導致資料丟失或資料接收不及時。把read獨立出來處理讀取資料的複雜性,程式碼結構會比較清晰。
接下來開始詳細分析NioUnsafe read方法的兩種不同的實現。
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read實現: 從channel中讀出Object
這個實現是主要功能是呼叫doReadMessages方法,從channel中讀出Object訊息,具體的型別這裡沒有限制,doReadMessages是一個抽象方法,留給子類實現, 下面是read方法的實現:
1 //io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe 2 @Override 3 public void read() { 4assert eventLoop().inEventLoop(); 5final ChannelConfig config = config(); 6if (!config.isAutoRead() && !isReadPending()) { 7// ChannelConfig.setAutoRead(false) was called in the meantime 8removeReadOp(); 9return; 10} 11 12final int maxMessagesPerRead = config.getMaxMessagesPerRead(); 13final ChannelPipeline pipeline = pipeline(); 14boolean closed = false; 15Throwable exception = null; 16try { 17try { 18for (;;) { 19int localRead = doReadMessages(readBuf); 20if (localRead == 0) { 21break; 22} 23if (localRead < 0) { 24closed = true; 25break; 26} 27 28// stop reading and remove op 29if (!config.isAutoRead()) { 30break; 31} 32 33if (readBuf.size() >= maxMessagesPerRead) { 34break; 35} 36} 37} catch (Throwable t) { 38exception = t; 39} 40setReadPending(false); 41int size = readBuf.size(); 42for (int i = 0; i < size; i ++) { 43pipeline.fireChannelRead(readBuf.get(i)); 44} 45 46readBuf.clear(); 47pipeline.fireChannelReadComplete(); 48 49if (exception != null) { 50closed = closeOnReadError(exception); 51 52pipeline.fireExceptionCaught(exception); 53} 54 55if (closed) { 56if (isOpen()) { 57close(voidPromise()); 58} 59} 60} finally { 61// Check if there is a readPending which was not processed yet. 62// This could be for two reasons: 63// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 64// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 65// 66// See https://github.com/netty/netty/issues/2254 67if (!config.isAutoRead() && !isReadPending()) { 68removeReadOp(); 69} 70} 71 }
第12行,得到一次迴圈讀取訊息的最大數量maxMessagesPerRead,這個配置的預設值因不同的channel型別而不同,io.netty.channel.ChannelConfig提供了setMaxMessagesPerRead方法設定這個配置的值。調節這個值的大小可以影響I/O操作在eventLoop執行緒分配的執行時間,它的值越大,I/O操作站的時間越大。
18-36行,使用doReadMessages讀取訊息,並把訊息放到readBuf中,readBuf是List<Object>型別。20,21行,沒有可讀的資料結束迴圈。23-25行,socket已經關閉。33,34行,readBuf中的訊息數量已經超過限制,跳出迴圈。
41-47行,對readBuf中的每一個訊息觸發一次channelRead事件,然後清空readBuf, 觸發channelReadComplete事件。
49-53行,處理異常。
55-59行,處理channel正常關閉。
doReadMessages方法有兩個實現。一個是io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages,這個實現中讀出的訊息是NioSocketChannel。另一個是io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages,這個實現中讀出的訊息時DatagramPacket。
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages實現程式碼:
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3SocketChannel ch = SocketUtils.accept(javaChannel()); 4 5try { 6if (ch != null) { 7buf.add(new NioSocketChannel(this, ch)); 8return 1; 9} 10} catch (Throwable t) { 11logger.warn("Failed to create a new channel from an accepted socket.", t); 12 13try { 14ch.close(); 15} catch (Throwable t2) { 16logger.warn("Failed to close a socket.", t2); 17} 18} 19 20return 0; 21 }
第3行, 使用accept方法得到一個新的SocketChannel。
7,8行,使用新的SocketChannel建立NioSocketChannel,並把它放到buf中。
11-20行,出現異常,關閉這個socket, 最後返回0.
io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages實現程式碼:
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3DatagramChannel ch = javaChannel(); 4DatagramChannelConfig config = config(); 5RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 6if (allocHandle == null) { 7this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 8} 9ByteBuf data = allocHandle.allocate(config.getAllocator()); 10boolean free = true; 11try { 12ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); 13int pos = nioData.position(); 14InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData); 15if (remoteAddress == null) { 16return 0; 17} 18 19int readBytes = nioData.position() - pos; 20data.writerIndex(data.writerIndex() + readBytes); 21allocHandle.record(readBytes); 22 23buf.add(new DatagramPacket(data, localAddress(), remoteAddress)); 24free = false; 25return 1; 26} catch (Throwable cause) { 27PlatformDependent.throwException(cause); 28return -1; 29}finally { 30if (free) { 31data.release(); 32} 33} 34 }
4-12行,得到接收資料的緩衝區data。
13-21行,從socket收到一個數據包,這個資料報包含兩部分: data中的二進位制資料和傳送端的地址remoteAddress(第14行)。然後設定data中的資料長度。
23-25行,把資料報轉換成DatagramPacket型別放到buf中返回。
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read實現:從channel中讀byte流
這個實現的主要功能是呼叫doReadBytes讀取byte流。doReadBytes是一個抽象方法,留給子類實現。下面是這個read的實現。
1 @Override 2 public final void read() { 3final ChannelConfig config = config(); 4if (!config.isAutoRead() && !isReadPending()) { 5// ChannelConfig.setAutoRead(false) was called in the meantime 6removeReadOp(); 7return; 8} 9 10final ChannelPipeline pipeline = pipeline(); 11final ByteBufAllocator allocator = config.getAllocator(); 12final int maxMessagesPerRead = config.getMaxMessagesPerRead(); 13RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 14if (allocHandle == null) { 15this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 16} 17 18ByteBuf byteBuf = null; 19int messages = 0; 20boolean close = false; 21try { 22int totalReadAmount = 0; 23boolean readPendingReset = false; 24do { 25byteBuf = allocHandle.allocate(allocator); 26int writable = byteBuf.writableBytes(); 27int localReadAmount = doReadBytes(byteBuf); 28if (localReadAmount <= 0) { 29// not was read release the buffer 30byteBuf.release(); 31byteBuf = null; 32close = localReadAmount < 0; 33if (close) { 34// There is nothing left to read as we received an EOF. 35setReadPending(false); 36} 37break; 38} 39if (!readPendingReset) { 40readPendingReset = true; 41setReadPending(false); 42} 43pipeline.fireChannelRead(byteBuf); 44byteBuf = null; 45 46if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { 47// Avoid overflow. 48totalReadAmount = Integer.MAX_VALUE; 49break; 50} 51 52totalReadAmount += localReadAmount; 53 54// stop reading 55if (!config.isAutoRead()) { 56break; 57} 58 59if (localReadAmount < writable) { 60// Read less than what the buffer can hold, 61// which might mean we drained the recv buffer completely. 62break; 63} 64} while (++ messages < maxMessagesPerRead); 65 66pipeline.fireChannelReadComplete(); 67allocHandle.record(totalReadAmount); 68 69if (close) { 70closeOnRead(pipeline); 71close = false; 72} 73} catch (Throwable t) { 74handleReadException(pipeline, byteBuf, t, close); 75} finally { 76// Check if there is a readPending which was not processed yet. 77// This could be for two reasons: 78// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 79// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 80// 81// See https://github.com/netty/netty/issues/2254 82if (!config.isAutoRead() && !isReadPending()) { 83removeReadOp(); 84} 85} 86 }
10-16行,得到一個接受緩衝區的分配器和分配器的的專用handle。這兩個東西的功能是高效的建立大量的接接收資料緩衝區,具體原理和實現會在後面buffer相關章節中詳細分析,這裡暫時略過。
24-64行,這是一個使用doReadBytes讀取資料並觸發channelRead事件的迴圈。25-27行,得到一個接受資料的緩衝區,然後從socket中讀取資料。28-38行,沒有資料可讀了,或socket已經斷開了。43行,正確收到了資料,觸發channelRead事件。59-62行,讀出的資料小於緩衝區的長度,表示沒有socket中暫時沒有資料可讀了。 64行,讀取次數大於上限配置,跳出。
66行,讀迴圈完成,觸發channelReadComplete事件。
69-72, 處理socket正常關閉。
74,83行,處理其他異常。
doReadBytes只有一個實現:
//io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
這個實現非常簡單,使用ByteBuf的能力從SocketChannel中讀取byte流。