1. 程式人生 > >Netty原始碼解析 -- 零拷貝機制與ByteBuf

Netty原始碼解析 -- 零拷貝機制與ByteBuf

本文來分享Netty中的零拷貝機制以及記憶體緩衝區ByteBuf的實現。 **原始碼分析基於Netty 4.1.52** #### Netty中的零拷貝 Netty中零拷貝機制主要有以下幾種 1.檔案傳輸類DefaultFileRegion#transferTo,呼叫FileChannel#transferTo,直接將檔案緩衝區的資料傳送到目標Channel,減少使用者緩衝區的拷貝(通過linux的sendfile函式)。 使用read 和 write過程如下 ![](https://upload-images.jianshu.io/upload_images/3804367-70105a3605e483eb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 使用sendfile ![](https://upload-images.jianshu.io/upload_images/3804367-4272d9e251db8793.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 可以看到,使用sendfile函式可以減少資料拷貝以及使用者態,核心態的切換 可參考: [作業系統和Web伺服器那點事兒](https://mp.weixin.qq.com/s?__biz=MzAxOTc0NzExNg==&mid=2665515079&idx=1&sn=ad7a5a0cb4d85fdb90ef3cb42fde0d63&chksm=80d67004b7a1f912556d87c79b76df162d0e31094caf02bf1bcbb9cb6ff96c4b4c47806e9059&mpshare=1&scene=1&srcid=1024iZjrBFKQBmsagaZyhsF4&sharer_sharetime=1599873774124&sharer_shareid=4f9ab13f2197aa52ba0a1547b29d3a69&exportkey=A8j8NlHBcrquMTpEIMQrKQM%3D&pass_ticket=hewQ611a2U7Z2Kfi8nXOBwNeW2piPGiE5QX7ZHaaYEXYIcyWelOXR1QxIdvnbU51&wx_header=0#rd) 2.Netty中提供了一些操作記憶體緩衝區的方法,如 Unpooled#wrappedBuffer方法,將byte資料,(jvm)ByteBuffer轉換為ByteBuf CompositeByteBuf#addComponents方法,合併ByteBuf ByteBuf#slice方法,提取ByteBuf中部分資料片段 ByteBuf#duplicate,複製一個記憶體緩衝區 這些方法都是基於物件引用的操作,並沒有記憶體拷貝,而是記憶體共享 3.使用堆外記憶體(jvm)ByteBuffer對Socket讀寫 如果使用JVM的堆記憶體讀取Socket資料,JVM會將Socket資料讀取到直接記憶體,再拷貝一份到堆記憶體中,寫入資料到Socket也需要將堆記憶體拷貝一份到直接記憶體中,然後才寫入Socket中。 因為作業系統進行io操作需要一個穩定的連續空間的位元組空間, 但是java堆上的位元組空間會隨著gc進行而進行移動, 如果作業系統讀取堆上的空間, 就會出錯。 使用堆外記憶體可以避免該拷貝操作。 注意,這裡從核心緩衝區拷貝到使用者緩衝區的操作並不能省略,畢竟我們需要對資料進行操作,所以還是要拷貝到使用者態的。 可參考: [知乎--Java NIO中,關於DirectBuffer,HeapBuffer的疑問](https://www.zhihu.com/question/57374068) [知乎--Java NIO direct buffer的優勢在哪兒?](https://www.zhihu.com/question/60892134) #### ByteBuf ByteBuf是用於與Channel互動的記憶體緩衝區,提供順序訪問和隨機訪問。 Netty4中將ByteBuf調整為抽象類,從而提升吞吐量。 1.ByteBuffer 先了解一下ByteBuffer,ByteBuffer是JVM提供的位元組記憶體緩衝區。ByteBuf是在ByteBuffer上進行的擴充套件,底層還是使用ByteBuffer。 ByteBuffer有兩個子類,DirectByteBuffer和HeapByteBuffer。 HeapByteBuffer使用ByteBuffer#hb(byte[])儲存資料。 DirectByteBuffer是堆外記憶體,使用的是作業系統的直接記憶體,它維護了一個引用address指向了底層資料,從而操作資料。(並沒有使用ByteBuffer#buff) Buffer核心屬性 ``` int position; //當前操作位置。 int mark; //為某一讀過的位置做標記,便於某些時候回退到該位置。 int capacity; //初始化時候的容量。 int limit; // 讀寫的限制位置,讀寫超出該位置會報錯 ``` 讀寫操作都是基於position,並以limit為限制的。mark,position,limit,capacity關係如下 ``` 0 <= mark <= position <= limit <= capacity ``` ByteBuffer提供瞭如下方法調整這些標誌位置: * clear limit = position = 0 一般在把資料寫入Buffer前呼叫 * flip limit = position position = 0 一般在從Buffer讀出資料前呼叫 * rewind position=0 limit不變 一般在把資料重寫入Buffer前呼叫。 * compacting 清除已經讀過的資料。任何未讀的資料都被移到緩衝區的起始處,新寫入的資料將放到緩衝區未讀資料的後面 ByteBuffer還提供了一些操作緩衝區的方法 * duplicate 建立新位元組緩衝區,共享當前緩衝區內容 * slice 建立新位元組緩衝區,共享當前緩衝區內容子序列。 Netty的ByteBuf使用readerIndex標誌讀位置,writerIndex標誌寫位置,比(jvm)ByteBuffer設計更優雅。 ``` +-------------------+------------------+------------------+ | discardable bytes | readable bytes | writable bytes | | | (CONTENT) | | +-------------------+------------------+------------------+ | | | | 0 <= readerIndex <= writerIndex <= capacity ``` ByteBuf提供readerIndex/writerIndex等方法獲取或設定這兩個值,非常直觀。另外,ByteBuf提供瞭如下方法操作緩衝區 * discardReadBytes 清除已經讀過的資料。未讀的資料都被移到緩衝區的起始處,新寫入的資料將放到緩衝區未讀資料的後面 * duplicate 建立新位元組緩衝區,共享當前緩衝區內容 * slice(int index, int length) 建立共享記憶體的ByteBuf,從index開始,長度為length * readSlice(int length) 建立共享記憶體的ByteBuf,從readerIndex開始,長度為length * retainedDuplicate() 建立共享記憶體的ByteBuf,並且當前ByteBuf的引用計數加1 2.介面關係 ![160.png](https://upload-images.jianshu.io/upload_images/3804367-9cbc77ade31dc5a8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) AbstractByteBuf:實現一些公共邏輯,如讀寫前檢查位置。 AbstractReferenceCountedByteBuf,新增引用計數邏輯,實現引用計數回收直接記憶體。 PooledByteBuf:實現池化ByteBuf的公共邏輯。關於Netty中的記憶體池後面有文章解析。 PooledByteBuf#memory是底層的記憶體儲存,PooledDirectByteBuf該欄位是ByteBuffer,PooledHeapByteBuf則是byte[]。 下面可以分為Unsafe,No_Unsafe兩個維度。Unsafe就是sun.misc.Unsafe。 使用Unsafe可以提高效能,但Unsafe是JDK內部的類,並非公開標準,不一定所有JDK都存在這個類, JDK以後也有可能去掉這個類,所以Netty提供了兩套實現。 3.記憶體分配 後面有文章解析Netty記憶體池,分享Netty中如何分配記憶體給ByteBuf。這裡先不深入。 4.讀寫過程 下面看一下ByteBuf與Channel如何互動資料。 前面分享Netty讀寫過程的文章說過了,NioByteUnsafe#read方法讀取資料。 NioByteUnsafe#read -> NioSocketChannel#doReadBytes -> AbstractByteBuf#writeBytes -> PooledByteBuf#setBytes ``` public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { try { return in.read(internalNioBuffer(index, length)); } catch (ClosedChannelException ignored) { return -1; } } ``` index引數就是writerIndex,internalNioBuffer方法會構造一個新的ByteBuffer,並設定ByteBuffer#position為index 直接呼叫ReadableByteChannel#read讀取資料 在《ChannelOutboundBuffer與flush操作》中已經分享過, ChannelOutboundBuffer#nioBuffers也是通過internalNioBuffer方法生成ByteBuffer, 作為引數呼叫NioSocketChannel#doWrite方法,直接將資料拷貝到Channel。 ByteBuf#internalNioBuffer -> PooledByteBuf#_internalNioBuffer ``` final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) { index = idx(index); ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer(); buffer.limit(index + length).position(index); return buffer; } ``` newInternalNioBuffer由子類實現,構建對應的DirectByteBuffer或者HeapByteBuffer,注意,這裡的記憶體是共享的。 5.引用計數 由於使用了直接記憶體,不能依賴JVM垃圾回收器釋放記憶體,Netty使用引用計數演算法釋放記憶體。 ReferenceCounted介面,代表需要顯式釋放的引用計數物件,retain方法增加引用計數,release方法減少引用計數。 AbstractReferenceCountedByteBuf實現了ReferenceCounted介面,它維護了refCnt變數作為引用計數。 構造一個AbstractReferenceCountedByteBuf時,refCnt為1。 當引用計數release到0時,呼叫deallocate()方法釋放記憶體。 PooledByteBuf#deallocate ``` protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; tmpNioBuf = null; chunk.arena.free(chunk, handle, maxLength, cache); chunk = null; recycle(); } } ``` 這裡呼叫的是PoolArena#free。 PoolArena可以理解為一個記憶體池,這裡free實際是將記憶體放回記憶體池中,由記憶體池決定是否需要銷燬底層直接記憶體。 PoolArena後面有對應文章解析。 6.記憶體銷燬 銷燬DirectByteBuf,有兩個方式 利用反射獲取Unsafe,呼叫Unsafe#freeMemory 利用反射獲取DirectByteBuffer#cleaner(sun.misc.Cleaner),通過反射呼叫cleaner#clean方法 因為Netty不確認JDK中是否存在sun.misc.Cleaner,所以它也實現了兩套機制。 PoolArenaDirect#free -> Arena#destroyChunk ``` protected void destroyChunk(P