1. 程式人生 > >Netty原始碼解析 -- ChannelOutboundBuffer實現與Flush過程

Netty原始碼解析 -- ChannelOutboundBuffer實現與Flush過程

前面文章說了,ChannelHandlerContext#write只是將資料快取到ChannelOutboundBuffer,等到ChannelHandlerContext#flush時,再將ChannelOutboundBuffer快取的資料寫到Channel中。 本文分享Netty中ChannelOutboundBuffer的實現以及Flush過程。 **原始碼分析基於Netty 4.1** 每個Channel的AbstractUnsafe#outboundBuffer 都維護了一個ChannelOutboundBuffer。 ChannelOutboundBuffer,出站資料緩衝區,負責快取ChannelHandlerContext#write​的資料。通過連結串列管理資料,連結串列節點為內部類Entry。 關鍵欄位如下 ``` Entry tailEntry; // 連結串列最後一個節點,新增的節點新增其後。 Entry unflushedEntry; // 連結串列中第一個未重新整理的節點 Entry flushedEntry; // 連結串列中第一個已重新整理但資料未寫入的節點 int flushed; // 已重新整理但資料未寫入的節點數 ``` ChannelHandlerContext#flush操作前,需要先重新整理一遍待處理的節點(主要是統計本次ChannelHandlerContext#flush操作可以寫入多少個節點資料),從unflushedEntry開始。重新整理完成後使用flushedEntry標誌第一個待寫入的節點,flushed為待寫入節點數。 前面分享Netty讀寫過程的文章說過,AbstractUnsafe#write處理寫操作時,會呼叫ChannelOutboundBuffer#addMessage將資料快取起來 ``` public void addMessage(Object msg, int size, ChannelPromise promise) { // #1 Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); } ``` `#1` 構建一個Entry,注意,這裡使用了物件池RECYCLER,後面有文章詳細解析。 主要是更新tailEntry和unflushedEntry `#2` 如果當前快取數量超過閥值WriteBufferWaterMark#high,更新unwritable標誌為true,並觸發`pipeline.fireChannelWritabilityChanged()`方法。 由於ChannelOutboundBuffer連結串列沒有大小限制,不斷累積資料可能導致 OOM, 為了避免這個問題,我們可以在unwritable標誌為true時,不再繼續快取資料。 Netty只會更新unwritable標誌,並不阻止資料快取,我們可以根據需要實現該功能。示例如下 ``` if (ctx.channel().isActive() && ctx.channel().isWritable()) { ctx.writeAndFlush(responseMessage); } else { ... } ``` addFlush方法負責重新整理節點(ChannelHandlerContext#flush操作前呼叫該方法統計可寫入節點資料數) ``` public void addFlush() { // #1 Entry entry = unflushedEntry; if (entry != null) { // #2 if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { // #3 flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; // #4 } while (entry != null); // All flushed so reset unflushedEntry // #5 unflushedEntry = null; } } ``` `#1` 從unflushedEntry節點開始處理 `#2` 賦值flushedEntry為unflushedEntry。 ChannelHandlerContext#flush寫入完成後會置空flushedEntry `#3` 增加flushed 設定節點的ChannelPromise不可取消 `#4` 從unflushedEntry開始,遍歷後面節點 `#5` 置空unflushedEntry,表示當前所有節點都已重新整理。 nioBuffers方法負責將當前快取的ByteBuf轉發為(jvm)ByteBuffer ``` public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) { assert maxCount > 0; assert maxBytes > 0; long nioBufferSize = 0; int nioBufferCount = 0; // #1 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { // #2 if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) { break; } nioBufferSize += readableBytes; // #3 int count = entry.count; if (count == -1) { //noinspection ConstantValueVariableUse entry.count = count = buf.nioBufferCount(); } int neededSpace = min(maxCount, nioBufferCount + count); if (neededSpace > nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } // #4 if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount++] = nioBuf; } else { ... } if (nioBufferCount == maxCount) { break; } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; } ``` `#1` 從執行緒快取中獲取nioBuffers變數,這樣可以避免反覆構造ByteBuffer陣列的效能損耗 `#2` maxBytes,即本次操作最大的位元組數。 `maxBytes - readableBytes < nioBufferSize`,表示如果本次操作後將超出maxBytes,退出 `#3` buf.nioBufferCount(),獲取ByteBuffer數量,CompositeByteBuf可能有多個ByteBuffer組成。 neededSpace,即nioBuffers陣列中ByteBuffer數量,nioBuffers長度不夠時需要擴容。 `#4` `buf.internalNioBuffer(readerIndex, readableBytes)`,使用readerIndex, readableBytes構造一個ByteBuffer。 這裡涉及ByteBuf相關知識,後面有文章詳細解析。 ChannelHandlerContext#flush完成後,需要移除對應的快取節點。 ``` public void removeBytes(long writtenBytes) { for (;;) { // #1 Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; // #2 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); } else { // readableBytes > writtenBytes // #3 if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); } ``` `#1` current方法返回flushedEntry節點快取資料。 結果null時,退出迴圈 `#2` 當前節點的資料已經全部寫入, progress方法喚醒資料節點上ChannelProgressivePromise的監聽者 writtenBytes減去對應位元組數 remove()方法移除節點,釋放ByteBuf,flushedEntry標誌後移。 `#3` 當前節點的資料部分寫入,它應該是本次ChannelHandlerContext#flush操作的最後一個節點 更新ByteBuf的readerIndex,下次從這裡開始讀取資料。 退出 移除資料節點 ``` public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; // #1 removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. // #2 ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry // #3 e.recycle(); return true; } ``` `#1` flushed減1 當flushed為0時,flushedEntry賦值為null,否則flushedEntry指向後一個節點。 `#2` 釋放ByteBuf `#3` 當前節點返回物件池中,以便複用。 下面來看一下ChannelHandlerContext#flush操作過程。 ChannelHandlerContext#flush -> HeadContext#flush -> AbstractUnsafe#flush ``` public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // #1 outboundBuffer.addFlush(); // #2 flush0(); } ``` `#1` 重新整理outboundBuffer中資料節點 `#2` 寫入操作 flush -> NioSocketChannel#doWrite ``` protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { // #1 if (in.isEmpty()) { clearOpWrite(); return; } // #2 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: // #3 writeSpinCount -= doWrite0(in); break; case 1: { // #4 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { // #5 incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // #6 in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // #7 ... } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); } ``` `#1` 通過ChannelOutboundBuffer#flushed判斷是否沒有資料可以寫,沒有資料則清除關注事件OP_WRITE,直接返回。 `#2` 獲取ChannelOutboundBuffer中ByteBuf維護的(jvm)ByteBuffer,並統計nioBufferSize,nioBufferCount。 `#3` 這時沒有ByteBuffer,但是可能有其他型別的資料(如FileRegion型別),呼叫doWrite0繼續處理,這裡不再深入 `#4` 只有一個ByteBuffer,呼叫SocketChannel#write將資料寫入Channel。 `#5` 如果寫入資料數量小於等於0,說明資料沒有被寫出去(可能是因為套接字的緩衝區滿了等原因),那麼就需要關注該Channel上的OP_WRITE事件,方便下次EventLoop將Channel輪詢出來的時候,能繼續寫資料。 `#6` 移除ChannelOutboundBuffer快取資料節點。 `#7` 有多個ByteBuffer,呼叫`SocketChannel#write(ByteBuffer[] srcs, int offset, int length)`,批量寫入,與上一種情況處理類似 回顧之前文章《事件迴圈機制實現原理》中對NioEventLoop#processSelectedKey方法的解析 ``` ... if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } ``` 這裡會呼叫forceFlush方法,再次寫入資料。 **FlushConsolidationHandler** ChannelHandlerContext#flush是很昂貴的操作,可能觸發系統呼叫,但資料又不能快取太久,使用FlushConsolidationHandler可以儘量達到寫入延遲與吞吐量之間的權衡。 FlushConsolidationHandler中維護了explicitFlushAfterFlushes變數, 在ChannelOutboundHandler#channelRead中呼叫flush,如果呼叫次數小於explicitFlushAfterFlushes, 會攔截flush操作不執行。 在channelReadComplete後呼叫flush,則不會攔截flush操作。 本文涉及ByteBuf元件,它是Netty中的記憶體緩衝區,後面有文章解析。 如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力! ![](https://upload-images.jianshu.io/upload_images/3804367-a3375313d1db6207.png?imageMogr2/auto-orient/strip%7CimageView2/2/