1. 程式人生 > >netty源碼解解析(4.0)-14 Channel NIO實現:讀取數據

netty源碼解解析(4.0)-14 Channel NIO實現:讀取數據

isa index 消息 soc 發現 接收數據 boolean 是的 cte

   本章分析Nio Channel的數據讀取功能的實現。

  Channel讀取數據需要Channel和ChannelHandler配合使用,netty設計數據讀取功能包括三個要素:Channel, EventLoop和ChannelHandler。Channel有個read方法,這個方法不會直接讀取數據,它的作用是通知持有當前channel的eventLoop可以從這個這個channel讀取數據了,這個方法被調用之後eventLoop會在channel有數據可讀的時候從channel讀出數據然後把數據放在channelRead事件中交給ChannelInboundHandler的channelRead方法處理,當eventLoop發現channel中暫時沒時間可讀會觸發一個channelReadComplete事件。

  read: Nio Channel通知eventLoop開始讀數據

  channel read方法的調用棧:

1 io.netty.channel.AbstractChannel#read
2 io.netty.channel.DefaultChannelPipeline#read
3 io.netty.channel.AbstractChannelHandlerContext#read
4 io.netty.channel.AbstractChannelHandlerContext#invokeRead
5 io.netty.channel.DefaultChannelPipeline.HeadContext#read
6 io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead 7 io.netty.channel.nio.AbstractNioChannel#doBeginRead

  調用channel的read的方法,會觸發read事件,通過pipeline調用AbstractChannel unsafe的beginRead方法,這個方法的語義是通知eventLoop可以從channel讀數據了,但他沒有實現具體功能,把具體功能留給doBeginRead實現。doBeginRead在AbstractChannel中定義,它是一個抽象方法。AbstractNioChannel實現了這個方法:

 1 @Override
 2 protected void doBeginRead() throws Exception {
 3     // Channel.read() or ChannelHandlerContext.read() was called
 4     if (inputShutdown) {
 5         return;
 6     }
 7 
 8     final SelectionKey selectionKey = this.selectionKey;
 9     if (!selectionKey.isValid()) {
10         return;
11     }
12 
13     readPending = true;
14 
15     final int interestOps = selectionKey.interestOps();
16     if ((interestOps & readInterestOp) == 0) {
17         selectionKey.interestOps(interestOps | readInterestOp);
18     }
19 }

  這裏的doBeginRead實現,只有第17行是核心代碼:把readInterestOps保存是的read操作標誌添加到SelectableChannel的SelectionKey中。這裏的readInterestOps是一個類的屬性,在AbstractNioChannel中,它沒有明確的定義,只有一個抽象的定義:NIO中的一個可以可以當成read操作的的標誌。在NIO中可以當成read的有SelectionKey.OP_READ和SelectionKey.OP_ACCEPT。readInterestOps在AbstractNioChannel的構造方法中使用傳入的參數初始化,子類就可以根據需要確定interestOps的具體含義。

  設置好beginRead之後,NioEventLoop就可以使用Selector得到檢測到channel上的read事件了,下面是NioEventLoop中處理read事件的代碼:

1 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
2 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
3     unsafe.read();
4 }

  這裏調用了unsafe的read的方法,在Channel的Unsafe中並沒有定義這個方法,它在io.netty.channel.nio.AbstractNioChannel.NioUnsafe中定義,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe和io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe中有兩個不同的實現。這兩個實現的區別是:NioMessageUnsafe.read是把從channel中讀出的數據轉換成Object, NioByteUnsafe.read是從channel中讀出byte數據流。下面來詳解分析這兩種實現。

  

  AbstractNioChannel.NioUnsafe.read實現:從channel讀取數據

  netty在NIO Channel的設計上,把讀數據設計成獨立的抽象層。之所以這樣設計有兩個方面的原因:

  1. 在NIO中,三中不同類型的Channel讀取的數據類型是不一樣的,NioServerSocketChannel讀出的是一個新建的NioSockeChannel, NioSocketChannel讀出的byte數據流,NioDatagramChannel讀出是數據報。
  2. NIO三種Channel都運行在非阻塞模式下,相比於阻塞模式,非阻塞模式下讀數據要處理的問題要復雜的多。使用Selector和非阻塞模式被動地讀取數據,需要處理連接斷開和socket異常,由於Selector使用的是邊緣觸發模式,一次read調用務必要把已經在socket recvbuffer中的數據全部讀出來,否則可以導致數據丟失或數據接收不及時。把read獨立出來處理讀取數據的復雜性,代碼結構會比較清晰。

  接下來開始詳細分析NioUnsafe read方法的兩種不同的實現。 

  io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read實現: 從channel中讀出Object

  這個實現是主要功能是調用doReadMessages方法,從channel中讀出Object消息,具體的類型這裏沒有限制,doReadMessages是一個抽象方法,留給子類實現, 下面是read方法的實現:

 1 //io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
 2 @Override
 3 public void read() {
 4     assert eventLoop().inEventLoop();
 5     final ChannelConfig config = config();
 6     if (!config.isAutoRead() && !isReadPending()) {
 7         // ChannelConfig.setAutoRead(false) was called in the meantime
 8         removeReadOp();
 9         return;
10     }
11 
12     final int maxMessagesPerRead = config.getMaxMessagesPerRead();
13     final ChannelPipeline pipeline = pipeline();
14     boolean closed = false;
15     Throwable exception = null;
16     try {
17         try {
18             for (;;) {
19                 int localRead = doReadMessages(readBuf);
20                 if (localRead == 0) {
21                     break;
22                 }
23                 if (localRead < 0) {
24                     closed = true;
25                     break;
26                 }
27 
28                 // stop reading and remove op
29                 if (!config.isAutoRead()) {
30                     break;
31                 }
32 
33                 if (readBuf.size() >= maxMessagesPerRead) {
34                     break;
35                 }
36             }
37         } catch (Throwable t) {
38             exception = t;
39         }
40         setReadPending(false);
41         int size = readBuf.size();
42         for (int i = 0; i < size; i ++) {
43             pipeline.fireChannelRead(readBuf.get(i));
44         }
45 
46         readBuf.clear();
47         pipeline.fireChannelReadComplete();
48 
49         if (exception != null) {
50             closed = closeOnReadError(exception);
51 
52             pipeline.fireExceptionCaught(exception);
53         }
54 
55         if (closed) {
56             if (isOpen()) {
57                 close(voidPromise());
58             }
59         }
60     } finally {
61         // Check if there is a readPending which was not processed yet.
62         // This could be for two reasons:
63         // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
64         // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
65         //
66         // See https://github.com/netty/netty/issues/2254
67         if (!config.isAutoRead() && !isReadPending()) {
68             removeReadOp();
69         }
70     }
71 }

   第12行,得到一次循環讀取消息的最大數量maxMessagesPerRead,這個配置的默認值因不同的channel類型而不同,io.netty.channel.ChannelConfig提供了setMaxMessagesPerRead方法設置這個配置的值。調節這個值的大小可以影響I/O操作在eventLoop線程分配的執行時間,它的值越大,I/O操作站的時間越大。

  18-36行,使用doReadMessages讀取消息,並把消息放到readBuf中,readBuf是List<Object>類型。20,21行,沒有可讀的數據結束循環。23-25行,socket已經關閉。33,34行,readBuf中的消息數量已經超過限制,跳出循環。

  41-47行,對readBuf中的每一個消息觸發一次channelRead事件,然後清空readBuf, 觸發channelReadComplete事件。

  49-53行,處理異常。

  55-59行,處理channel正常關閉。

  doReadMessages方法有兩個實現。一個是io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages,這個實現中讀出的消息是NioSocketChannel。另一個是io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages,這個實現中讀出的消息時DatagramPacket。

  io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages實現代碼:

 1 @Override
 2 protected int doReadMessages(List<Object> buf) throws Exception {
 3     SocketChannel ch = SocketUtils.accept(javaChannel());
 4 
 5     try {
 6         if (ch != null) {
 7             buf.add(new NioSocketChannel(this, ch));
 8             return 1;
 9         }
10     } catch (Throwable t) {
11         logger.warn("Failed to create a new channel from an accepted socket.", t);
12 
13         try {
14             ch.close();
15         } catch (Throwable t2) {
16             logger.warn("Failed to close a socket.", t2);
17         }
18     }
19 
20     return 0;
21 }

  第3行, 使用accept方法得到一個新的SocketChannel。

  7,8行,使用新的SocketChannel創建NioSocketChannel,並把它放到buf中。

  11-20行,出現異常,關閉這個socket, 最後返回0.

  

  io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages實現代碼:

 1 @Override
 2 protected int doReadMessages(List<Object> buf) throws Exception {
 3     DatagramChannel ch = javaChannel();
 4     DatagramChannelConfig config = config();
 5     RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
 6     if (allocHandle == null) {
 7         this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
 8     }
 9     ByteBuf data = allocHandle.allocate(config.getAllocator());
10     boolean free = true;
11     try {
12         ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
13         int pos = nioData.position();
14         InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
15         if (remoteAddress == null) {
16             return 0;
17         }
18 
19         int readBytes = nioData.position() - pos;
20         data.writerIndex(data.writerIndex() + readBytes);
21         allocHandle.record(readBytes);
22 
23         buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
24         free = false;
25         return 1;
26     } catch (Throwable cause) {
27         PlatformDependent.throwException(cause);
28         return -1;
29     }  finally {
30         if (free) {
31             data.release();
32         }
33     }
34 }

   4-12行,得到接收數據的緩沖區data。

   13-21行,從socket收到一個數據包,這個數據報包含兩部分: data中的二進制數據和發送端的地址remoteAddress(第14行)。然後設置data中的數據長度。

   23-25行,把數據報轉換成DatagramPacket類型放到buf中返回。

  

  io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read實現:從channel中讀byte流

  這個實現的主要功能是調用doReadBytes讀取byte流。doReadBytes是一個抽象方法,留給子類實現。下面是這個read的實現。

 1 @Override
 2 public final void read() {
 3     final ChannelConfig config = config();
 4     if (!config.isAutoRead() && !isReadPending()) {
 5         // ChannelConfig.setAutoRead(false) was called in the meantime
 6         removeReadOp();
 7         return;
 8     }
 9 
10     final ChannelPipeline pipeline = pipeline();
11     final ByteBufAllocator allocator = config.getAllocator();
12     final int maxMessagesPerRead = config.getMaxMessagesPerRead();
13     RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
14     if (allocHandle == null) {
15         this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
16     }
17 
18     ByteBuf byteBuf = null;
19     int messages = 0;
20     boolean close = false;
21     try {
22         int totalReadAmount = 0;
23         boolean readPendingReset = false;
24         do {
25             byteBuf = allocHandle.allocate(allocator);
26             int writable = byteBuf.writableBytes();
27             int localReadAmount = doReadBytes(byteBuf);
28             if (localReadAmount <= 0) {
29                 // not was read release the buffer
30                 byteBuf.release();
31                 byteBuf = null;
32                 close = localReadAmount < 0;
33                 if (close) {
34                     // There is nothing left to read as we received an EOF.
35                     setReadPending(false);
36                 }
37                 break;
38             }
39             if (!readPendingReset) {
40                 readPendingReset = true;
41                 setReadPending(false);
42             }
43             pipeline.fireChannelRead(byteBuf);
44             byteBuf = null;
45 
46             if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
47                 // Avoid overflow.
48                 totalReadAmount = Integer.MAX_VALUE;
49                 break;
50             }
51 
52             totalReadAmount += localReadAmount;
53 
54             // stop reading
55             if (!config.isAutoRead()) {
56                 break;
57             }
58 
59             if (localReadAmount < writable) {
60                 // Read less than what the buffer can hold,
61                 // which might mean we drained the recv buffer completely.
62                 break;
63             }
64         } while (++ messages < maxMessagesPerRead);
65 
66         pipeline.fireChannelReadComplete();
67         allocHandle.record(totalReadAmount);
68 
69         if (close) {
70             closeOnRead(pipeline);
71             close = false;
72         }
73     } catch (Throwable t) {
74         handleReadException(pipeline, byteBuf, t, close);
75     } finally {
76         // Check if there is a readPending which was not processed yet.
77         // This could be for two reasons:
78         // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
79         // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
80         //
81         // See https://github.com/netty/netty/issues/2254
82         if (!config.isAutoRead() && !isReadPending()) {
83             removeReadOp();
84         }
85     }
86 }

  10-16行,得到一個接受緩沖區的分配器和分配器的的專用handle。這兩個東西的功能是高效的創建大量的接接收數據緩沖區,具體原理和實現會在後面buffer相關章節中詳細分析,這裏暫時略過。

  24-64行,這是一個使用doReadBytes讀取數據並觸發channelRead事件的循環。25-27行,得到一個接受數據的緩沖區,然後從socket中讀取數據。28-38行,沒有數據可讀了,或socket已經斷開了。43行,正確收到了數據,觸發channelRead事件。59-62行,讀出的數據小於緩沖區的長度,表示沒有socket中暫時沒有數據可讀了。 64行,讀取次數大於上限配置,跳出。

  66行,讀循環完成,觸發channelReadComplete事件。

  69-72, 處理socket正常關閉。

  74,83行,處理其他異常。

  doReadBytes只有一個實現:

//io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

  這個實現非常簡單,使用ByteBuf的能力從SocketChannel中讀取byte流。

  

netty源碼解解析(4.0)-14 Channel NIO實現:讀取數據