1. 程式人生 > >Netty原始碼分析 (七)----- read過程 原始碼分析

Netty原始碼分析 (七)----- read過程 原始碼分析

在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work執行緒中的read過程。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //檢查該SelectionKey是否有效,如果無效,則關閉channel
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 如果準備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的原始碼英文註釋所說:解決JDK可能會產生死迴圈的一個bug。
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件
                // Connection already closed - no need to handle write.
                return;
            }
        }
        // 如果準備好了WRITE則將緩衝區中的資料傳送出去,如果緩衝區中資料都發送完成,則清除之前關注的OP_WRITE標記
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況

1)OP_ACCEPT,接受客戶端連線

2)OP_READ, 可讀事件, 即 Channel 中收到了新資料可供上層讀取。

3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入資料。

4)OP_CONNECT, 連線建立事件, 即 TCP 連線已經建立, Channel 處於 active 狀態。

本篇博文主要來看下當work 執行緒 selector檢測到OP_READ事件時,內部幹了些什麼。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件
        // Connection already closed - no need to handle write.
        return;
    }
} 

從程式碼中可以看到,當selectionKey發生的事件是SelectionKey.OP_READ,執行unsafe的read方法。注意這裡的unsafe是NioByteUnsafe的例項

為什麼說這裡的unsafe是NioByteUnsafe的例項呢?在上篇博文Netty原始碼分析:accept中我們知道Boss NioEventLoopGroup中的NioEventLoop只負責accpt客戶端連線,然後將該客戶端註冊到Work NioEventLoopGroup中的NioEventLoop中,即最終是由work執行緒對應的selector來進行read等時間的監聽,即work執行緒中的channel為SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的例項

下面來看下NioByteUnsafe中的read方法

@Override
    public void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                //1、分配快取
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();//可寫的位元組容量
                //2、將socketChannel資料寫入快取
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    close = localReadAmount < 0;
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
                //3、觸發pipeline的ChannelRead事件來對byteBuf進行後續處理
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
} 

下面一一介紹比較重要的程式碼

allocHandler的例項化過程

allocHandle負責自適應調整當前快取分配的大小,以防止快取分配過多或過少,先看allocHandler的例項化過程

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
    this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}

其中, config.getRecvByteBufAllocator()得到的是一個 AdaptiveRecvByteBufAllocator例項DEFAULT。

public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

而AdaptiveRecvByteBufAllocator中的newHandler()方法的程式碼如下:

@Override
public Handle newHandle() {
    return new HandleImpl(minIndex, maxIndex, initial);
}

HandleImpl(int minIndex, int maxIndex, int initial) {
    this.minIndex = minIndex;
    this.maxIndex = maxIndex;

    index = getSizeTableIndex(initial);
    nextReceiveBufferSize = SIZE_TABLE[index];
}

其中,上面方法中所用到引數:minIndex maxIndex initial是什麼意思呢?含義如下:minIndex是最小快取在SIZE_TABLE中對應的下標。maxIndex是最大快取在SIZE_TABLE中對應的下標,initial為初始化快取大小。

AdaptiveRecvByteBufAllocator的相關常量欄位

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {

        static final int DEFAULT_MINIMUM = 64;
        static final int DEFAULT_INITIAL = 1024;
        static final int DEFAULT_MAXIMUM = 65536;

        private static final int INDEX_INCREMENT = 4;
        private static final int INDEX_DECREMENT = 1;

        private static final int[] SIZE_TABLE; 

上面這些欄位的具體含義說明如下:

1)、SIZE_TABLE:按照從小到大的順序預先儲存可以分配的快取大小。 
從16開始,每次累加16,直到496,接著從512開始,每次增大一倍,直到溢位。SIZE_TABLE初始化過程如下。

static {
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

2)、DEFAULT_MINIMUM:最小快取(64),在SIZE_TABLE中對應的下標為3。

3)、DEFAULT_MAXIMUM :最大快取(65536),在SIZE_TABLE中對應的下標為38。

4)、DEFAULT_INITIAL :初始化快取大小,第一次分配快取時,由於沒有上一次實際收到的位元組數做參考,需要給一個預設初始值。

5)、INDEX_INCREMENT:上次預估快取偏小,下次index的遞增值。

6)、INDEX_DECREMENT :上次預估快取偏大,下次index的遞減值。

建構函式:

private AdaptiveRecvByteBufAllocator() {
    this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    if (minimum <= 0) {
        throw new IllegalArgumentException("minimum: " + minimum);
    }
    if (initial < minimum) {
        throw new IllegalArgumentException("initial: " + initial);
    }
    if (maximum < initial) {
        throw new IllegalArgumentException("maximum: " + maximum);
    }

    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }

    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }

    this.initial = initial;
}

該建構函式對引數進行了有效性檢查,然後初始化了如下3個欄位,這3個欄位就是上面用於產生allocHandle物件所要用到的引數。

private final int minIndex;
private final int maxIndex;
private final int initial;

其中,getSizeTableIndex函式的程式碼如下,該函式的功能為:找到SIZE_TABLE中的元素剛好大於或等於size的位置。

private static int getSizeTableIndex(final int size) {
    for (int low = 0, high = SIZE_TABLE.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }

        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else { //這裡的情況就是 a < size <= b 的情況
            return mid + 1;
        }
    }
}

byteBuf = allocHandle.allocate(allocator);

申請一塊指定大小的記憶體

AdaptiveRecvByteBufAllocator#HandlerImpl

@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(nextReceiveBufferSize);
}

直接呼叫了ioBuffer方法,繼續看

AbstractByteBufAllocator.java

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

ioBuffer函式中主要邏輯為:看平臺是否支援unsafe,選擇使用直接實體記憶體還是堆上記憶體。先看 heapBuffer

AbstractByteBufAllocator.java 

@Override
public ByteBuf heapBuffer(int initialCapacity) {
    return heapBuffer(initialCapacity, Integer.MAX_VALUE);
}

@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newHeapBuffer(initialCapacity, maxCapacity);
} 

這裡的newHeapBuffer有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設定,如果設定為: “pooled”,則快取分配器就為:PooledByteBufAllocator,進而利用物件池技術進行記憶體分配。如果不設定或者設定為其他,則快取分配器為:UnPooledByteBufAllocator,則直接返回一個UnpooledHeapByteBuf物件。

UnpooledByteBufAllocator.java

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

PooledByteBufAllocator.java

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    ByteBuf buf;
    if (heapArena != null) {
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

再看directBuffer

AbstractByteBufAllocator.java

@Override
public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, Integer.MAX_VALUE);
}  

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);//引數的有效性檢查
    return newDirectBuffer(initialCapacity, maxCapacity);
}

與newHeapBuffer一樣,這裡的newDirectBuffer方法也有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設定,如果設定為: “pooled”,則快取分配器就為:PooledByteBufAllocator,進而利用物件池技術進行記憶體分配。如果不設定或者設定為其他,則快取分配器為:UnPooledByteBufAllocator。這裡主要看下UnpooledByteBufAllocator. newDirectBuffer的內部實現

UnpooledByteBufAllocator.java

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

UnpooledUnsafeDirectByteBuf是如何實現快取管理的?對Nio的ByteBuffer進行了封裝,通過ByteBuffer的allocateDirect方法實現快取的申請。

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    //省略了部分引數檢查的程式碼
    this.alloc = alloc;
    setByteBuffer(allocateDirect(initialCapacity));
}
protected ByteBuffer allocateDirect(int initialCapacity) {
    return ByteBuffer.allocateDirect(initialCapacity);
}

private void setByteBuffer(ByteBuffer buffer) {
    ByteBuffer oldBuffer = this.buffer;
    if (oldBuffer != null) {
        if (doNotFree) {
            doNotFree = false;
        } else {
            freeDirect(oldBuffer);
        }
    }

    this.buffer = buffer;
    memoryAddress = PlatformDependent.directBufferAddress(buffer);
    tmpNioBuf = null;
    capacity = buffer.remaining();
}

上面程式碼的主要邏輯為:

1、先利用ByteBuffer的allocateDirect方法分配了大小為initialCapacity的快取

2、然後判斷將舊快取給free掉

3、最後將新快取賦給欄位buffer上

其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 獲取buffer的address欄位值,指向快取地址。
capacity = buffer.remaining() 獲取快取容量。

接下來看toLeakAwareBuffer(buf)方法

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeak leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.open(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.open(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
    }
    return buf;
}

方法toLeakAwareBuffer(buf)對申請的buf又進行了一次包裝。

上面一長串的分析,得到了快取後,回到AbstractNioByteChannel.read方法,繼續看。

doReadBytes方法

下面看下doReadBytes方法:將socketChannel資料寫入快取。

NioSocketChannel.java

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}

將Channel中的資料讀入快取byteBuf中。繼續看

WrappedByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    return buf.writeBytes(in, length);
} 

AbstractByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

這裡的setBytes方法有不同的實現,這裡看下UnpooledUnsafeDirectByteBuf的setBytes的實現。

UnpooledUnsafeDirectByteBuf.java

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;//當Channel 已經關閉,則返回-1.    
    }
} 

private ByteBuffer internalNioBuffer() {
    ByteBuffer tmpNioBuf = this.tmpNioBuf;
    if (tmpNioBuf == null) {
        this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
    }
    return tmpNioBuf;
}

最終底層採用ByteBuffer實現read操作,無論是PooledByteBuf、還是UnpooledXXXBuf,裡面都將底層資料結構BufBuffer/array轉換為ByteBuffer 來實現read操作。即無論是UnPooledXXXBuf還是PooledXXXBuf裡面都有一個ByteBuffer tmpNioBuf,這個tmpNioBuf才是真正用來儲存從管道Channel中讀取出的內容的。到這裡就完成了將channel的資料讀入到了快取Buf中。

我們具體來看看 in.read(tmpBuf); FileChannel和SocketChannel的read最後都是依賴的IOUtil來實現,程式碼如下

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.read(fd, dst, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

最後目的就是將SocketChannel中的資料讀出存放到ByteBuffer dst中,我們看看 IOUtil.read(fd, dst, -1, nd)

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    if (var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    //如果最終承載資料的buffer是DirectBuffer,則直接將資料讀入到堆外記憶體中
    } else if (var1 instanceof DirectBuffer) {
        return readIntoNativeBuffer(var0, var1, var2, var4);
    } else {
        // 分配臨時的堆外記憶體
        ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var7;
        try {
            // Socket I/O 操作會將資料讀入到堆外記憶體中
            int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
            var5.flip();
            if (var6 > 0) {
                // 將堆外記憶體的資料拷貝到堆記憶體中(使用者定義的快取,在jvm中分配記憶體)
                var1.put(var5);
            }

            var7 = var6;
        } finally {
            // 裡面會呼叫DirectBuffer.cleaner().clean()來釋放臨時的堆外記憶體
            Util.offerFirstTemporaryDirectBuffer(var5);
        }

        return var7;
    }
}
通過上述實現可以看出,基於channel的資料讀取步驟如下: 1、如果快取記憶體是DirectBuffer,就直接將Channel中的資料讀取到堆外記憶體
2、如果快取記憶體是堆記憶體,則先申請一塊和快取同大小的臨時 DirectByteBuffer var5。
3、將核心快取中的資料讀到堆外快取var5,底層由NativeDispatcher的read實現。
4、把堆外快取var5的資料拷貝到堆記憶體var1(使用者定義的快取,在jvm中分配記憶體)。 5、會呼叫DirectBuffer.cleaner().clean()來釋放建立的臨時的堆外記憶體 如果AbstractNioByteChannel.read中第一步建立的是堆外記憶體,則會直接將資料讀入到堆外記憶體,並不會先建立臨時堆外記憶體,再將資料讀入到堆外記憶體,最後將堆外記憶體拷貝到堆記憶體 簡單的說,如果使用堆外記憶體,則只會複製一次資料,如果使用堆記憶體,則會複製兩次資料 我們來看看readIntoNativeBuffer
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)  throws IOException  {  
    int i = bytebuffer.position();  
    int j = bytebuffer.limit();  
    //如果斷言開啟,buffer的position大於limit,則丟擲斷言錯誤  
    if(!$assertionsDisabled && i > j)  
        throw new AssertionError();  
    //獲取需要讀的位元組數  
    int k = i > j ? 0 : j - i;  
    if(k == 0)  
        return 0;  
    int i1 = 0;  
    //從輸入流讀取k個位元組到buffer  
    if(l != -1L)  
        i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);  
    else  
        i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);  
    //重新定位buffer的position  
    if(i1 > 0)  
        bytebuffer.position(i + i1);  
    return i1;  
}  
這個函式就是將核心緩衝區中的資料讀取到堆外快取DirectBuffer

回到AbstractNioByteChannel.read方法,繼續看。

@Override
public void read() {
        //...
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    close = localReadAmount < 0;
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
}

int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,則表示沒有讀取到資料,則退出迴圈。
2、如果返回-1,表示對端已經關閉連線,則退出迴圈。
3、否則,表示讀取到了資料,資料讀入快取後,觸發pipeline的ChannelRead事件,byteBuf作為引數進行後續處理,這時自定義Inbound型別的handler就可以進行業務處理了。Pipeline的事件處理在我之前的博文中有詳細的介紹。處理完成之後,再一次從Channel讀取資料,直至退出迴圈。

4、迴圈次數超過maxMessagesPerRead時,即只能在管道中讀取maxMessagesPerRead次資料,既是還沒有讀完也要退出。在上篇博文中,Boss執行緒接受客戶端連線也用到了此變數,即當boss執行緒 selector檢測到OP_ACCEPT事件後一次只能接受maxMessagesPerRead個客戶端連線