1. 程式人生 > >Netty框架學習之(五):細說資料容器-ByteBuf

Netty框架學習之(五):細說資料容器-ByteBuf

1. 簡介

位元組是網路資料的基本單位。 Java NIO 提供了 ByteBuffer 作為位元組容器,但是這個類使用起來過於複雜,而且也有些繁瑣。Netty使用了即易於使用又具備良好效能的ByteBuf來替代ByteBuffer。

本文將對ByteBuffer做一個簡單的總結。

2. 運作方式與使用模式

2.1 運作方式

因為所有的網路通訊都涉及位元組序列的移動, 所以高效易用的資料結構明顯是必不可少的。
讓我們來看看Netty是如何高效的實現這個需求的。

ByteBuf 維護了兩個不同的索引:一個用於讀取,一個用於寫入。當你從 ByteBuf 讀取時,它的 readerIndex 將會被遞增已經被讀取的位元組數。同樣地,當你寫入 ByteBuf 時,它的writerIndex 也會被遞增。

如果讀取位元組直到 readerIndex 達到
和 writerIndex 同樣的值時你將會到達“可以讀取的”資料的末尾。 如果試圖讀取超出該點的資料將會觸發一個 IndexOutOfBoundsException。

==在ByteBuf中,名稱以 read 或者 write 開頭的 ByteBuf 方法,將會推進其對應的索引,而名稱以 set 或
者 get 開頭的操作則不會。==

對於get和set方法,需要傳入一個相對索引的位置。當操作基於相對位置的資料超過capacity時,就會引發IndexOutOfBoundsException,例如:capacity是100,當執行byteBuf.setInt(97,100)時,由於int佔4個位元組(4+97>100),就會觸發IndexOutOfBoundsException。

2.2 使用模式

使用ByteBuf的時候,可以選擇資料的存放模式,常見的模式如下:

2.2.1 堆緩衝區

最常用的 ByteBuf 模式是將資料儲存在 JVM 的堆空間中。這種模式被稱為支撐陣列
(backing array),它能在沒有使用池化的情況下提供快速的分配和釋放,非常適合於有遺留的資料需要處理的情況。

public static void main(String args[]) {
        ByteBuf heapBuf = Unpooled.copiedBuffer("heap space",
                CharsetUtil.UTF_8);
        if
(heapBuf.hasArray()) { //檢查 ByteBuf 是否有一個支撐陣列.當 hasArray()方法返回 false 時, // 嘗試訪問支撐陣列將觸發一個 UnsupportedOperationException。這個模式類似於 JDK 的 ByteBuffer 的用法 byte[] array = heapBuf.array(); int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); System.out.println(Arrays.toString(array)); System.out.println(offset); System.out.println(length); } else { System.out.println("No Heap Array"); } }

==當 hasArray()方法返回 false 時,嘗試訪問支撐陣列將觸發一個 Unsupported
OperationException。這個模式類似於 JDK 的 ByteBuffer 的用法==

2.2.2 直接緩衝區

直接緩衝區是另外一種 ByteBuf 模式.
直接緩衝區的內容並不是駐留在Java的堆上,而是在本地記憶體中。Java堆上的資料在每次呼叫本地 I/O 操作之前(或者之後)需要將緩衝區的內容復
制到一箇中間緩衝區(或者從中間緩衝區把內容複製到緩衝區),而本地記憶體避開了這個操作。 這也就解釋了為何直接緩衝區對於網路資料傳輸是理想的選擇。如果你的資料包含在一個在堆上分配的緩衝區中,那麼事實上,在通過套接字傳送它之前, JVM將會在內部把你的緩衝區複製到一個直接緩衝區中。

直接緩衝區的主要缺點是,相對於基於堆的緩衝區,它們的分配和釋放都較為昂貴。如果你正在處理遺留程式碼,你也可能會遇到另外一個缺點:因為資料不是在堆上, 所以你不得不進行一次複製。顯然,與使用堆緩衝區相比,這涉及的工作更多。因此,如果事先知道容器中的資料將會被作為陣列來訪問,你可能更願意使用堆記憶體。

直接緩衝區的使用程式碼樣例如下:

public static void main(String args[]) {
        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        if (!directBuf.hasArray()) {//檢查 ByteBuf 是否由陣列支撐。如果不是,則這是一個直接緩衝區
            int length = directBuf.readableBytes();
            byte[] array = new byte[length];//分配一個新的陣列來儲存具有該長度的位元組資料
            directBuf.getBytes(directBuf.readerIndex(), array);//將位元組複製到該陣列
            System.out.println(Arrays.toString(array));
            System.out.println(length);
        }
    }

2.2.3 複合緩衝區

複合緩衝區為多個 ByteBuf 提供一個聚合檢視,可以根據需要新增或者刪除 ByteBuf 例項。實現該功能的類為CompositeByteBuf,它提供了一
個將多個緩衝區表示為單個合併緩衝區的虛擬表示.

CompositeByteBuf 中的 ByteBuf例項可能同時包含直接記憶體分配和非直接記憶體分配。
如果其中只有一個例項,那麼對 CompositeByteBuf 上的 hasArray()方法的呼叫將返回該元件上的 hasArray()方法的值;否則它將返回 false。

為了舉例說明,讓我們考慮一下一個由兩部分——頭部和主體——組成的將通過 HTTP 協議傳輸的訊息。這兩部分由應用程式的不同模組產生,將會在訊息被髮送的時候組裝。該應用程式可以選擇為多個訊息重用相同的訊息主體。當這種情況發生時,對於每個訊息都將會建立一個新的頭部。
因為我們不想為每個訊息都重新分配這兩個緩衝區,所以使用CompositeByteBuf 是一個完美的選擇。 它在消除了沒必要的複製的同時,暴露了通用的 ByteBuf API。

樣例程式碼如下:

/**
 * Netty 通過一個 ByteBuf 子類——CompositeByteBuf——實現了組合模式,它提供了一
 個將多個緩衝區表示為單個合併緩衝區的虛擬表示
 */
public class CompositeBuf {
    public static void main(String args[]){
        CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
        ByteBuf headerBuf = Unpooled.copiedBuffer("head", CharsetUtil.UTF_8); // can be backing or direct
        ByteBuf bodyBuf = Unpooled.copiedBuffer("body", CharsetUtil.UTF_8); // can be backing or direct
        messageBuf.addComponents(headerBuf, bodyBuf);
        System.out.println("Remove Head Before------------------");
        printCompositeBuffer(messageBuf);
        for (ByteBuf buf : messageBuf) {
            System.out.println(buf.toString(CharsetUtil.UTF_8));
        }
        messageBuf.removeComponent(0); // remove the header
        System.out.println("Remove Head After------------------");
        printCompositeBuffer(messageBuf);
        for (ByteBuf buf : messageBuf) {
            System.out.println(buf.toString(CharsetUtil.UTF_8));
        }
    }

    public static void printCompositeBuffer(CompositeByteBuf compBuf){
        int length = compBuf.readableBytes();
        byte[] array = new byte[length];
        compBuf.getBytes(compBuf.readerIndex(), array);
        System.out.println (Arrays.toString(array));
        System.out.println (length);
    }
}

3. 支援的操作

ByteBuf 提供了許多超出基本讀、寫操作的方法用於修改它的資料,本章節做一個簡單的介紹。

3.1 隨機訪問索引

如同在普通的 Java 位元組陣列中一樣, ByteBuf 的索引是從零開始的:第一個位元組的索引是0,最後一個位元組的索引總是 capacity() - 1,可以通過index進行訪問:

    public static void main(String args[]) {
        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        System.out.println(directBuf.getByte(5));
    }

3.2 順序訪問索引

ByteBuf 同時具有讀索引和寫索引,這兩個索引將記憶體區塊分成了三塊:

這裡寫圖片描述

3.2.1 可丟棄的位元組

可丟棄位元組的分段包含了已經被讀過的位元組。通過呼叫 discardReadBytes()方法,可以丟棄它們並回收空間。這個分段的初始大小為 0,儲存在 readerIndex 中,會隨著 read 操作的執行而增加。

下圖展示的緩衝區上呼叫discardReadBytes()方法後的結果。可以看
到, 可丟棄位元組分段中的空間已經變為可寫的了。注意,在呼叫discardReadBytes()之後,對可寫分段的內容並沒有任何的保證。
這裡寫圖片描述

頻繁地呼叫 discardReadBytes()方法可以確保可寫分段的最大化,但是這將極有可能會導致記憶體複製,因為可讀位元組(圖中標記為 CONTENT 的部分)必須被移動到緩衝區的開始位置。我們建議只在有真正需要的時候才這樣做, 例如,當記憶體非常寶貴的時候。

3.2.2 可讀位元組

ByteBuf 的可讀位元組分段儲存了實際資料。新分配的、包裝的或者複製的緩衝區的預設的
readerIndex 值為 0。任何名稱以 read 或者 skip 開頭的操作都將檢索或者跳過位於當前
readerIndex 的資料,並且將它增加已讀位元組數。

如果嘗試在緩衝區的可讀位元組數已經耗盡時從中讀取資料,那麼將會引發一個 IndexOutOfBoundsException。

3.2.3 可寫位元組

可寫位元組分段是指一個擁有未定義內容的、寫入就緒的記憶體區域。新分配的緩衝區的
writerIndex 的預設值為 0。任何名稱以 write 開頭的操作都將從當前的 writerIndex 處
開始寫資料,並將它增加已經寫入的位元組數。如果寫操作的目標也是 ByteBuf,並且沒有指定源索引的值,則源緩衝區的 readerIndex 也同樣會被增加相同的大小。

如果嘗試往目標寫入超過目標容量的資料,將會引發一個IndexOutOfBoundException。

3.2.4 例子

下面是一個關於可丟棄位元組,可讀位元組,可寫位元組的綜合例子:

        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        System.out.println("可寫位元組容量:"+directBuf.writableBytes());
        System.out.println("初始化可讀位元組:"+directBuf.readableBytes());
        System.out.println("初始化可丟棄位元組:"+directBuf.readerIndex()+"\n");
        directBuf.readBytes(2);
        System.out.println("讀取兩個位元組"+"\n");
        System.out.println("讀取後可寫位元組容量:"+directBuf.writableBytes());
        System.out.println("讀取後可讀位元組:"+directBuf.readableBytes());
        System.out.println("讀取後可丟棄位元組:"+directBuf.readerIndex()+"\n");
        directBuf.discardReadBytes();
        System.out.println("執行discardReadBytes後可寫位元組容量:"+directBuf.writableBytes());
        System.out.println("執行discardReadBytes後可讀位元組:"+directBuf.readableBytes());
        System.out.println("執行discardReadBytes後可丟棄位元組:"+directBuf.readerIndex());

輸出為:

可寫位元組容量:87
初始化可讀位元組:13
初始化可丟棄位元組:0

讀取兩個位元組

讀取後可寫位元組容量:87
讀取後可讀位元組:11
讀取後可丟棄位元組:2

執行discardReadBytes後可寫位元組容量:89
執行discardReadBytes後可讀位元組:11
執行discardReadBytes後可丟棄位元組:0

3.3 索引管理

索引管理的相關操作如下:
- 可以通過呼叫 markReaderIndex()、 markWriterIndex()、 resetWriterIndex()
和 resetReaderIndex()來標記和重置 ByteBuf 的 readerIndex 和 writerIndex。
- 可以通過呼叫 readerIndex(int)或者 writerIndex(int)來將索引移動到指定位置。
試圖將任何一個索引設定到一個無效的位置都將導致一個 IndexOutOfBoundsException。
- 可以通過呼叫 clear()方法來將 readerIndex 和 writerIndex 都設定為 0。呼叫 clear()比呼叫 discardReadBytes()輕量得多,因為它將只是重置索引而不會復
制任何的記憶體

示例如下:

        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        System.out.println("初始化可讀位元組:"+directBuf.readableBytes());
        directBuf.markReaderIndex();
        System.out.println("執行markReaderIndex"+"\n");//標記讀索引
        directBuf.readBytes(2);
        System.out.println("讀取兩個位元組"+"\n");
        System.out.println("讀取後可讀位元組:"+directBuf.readableBytes());
        directBuf.resetReaderIndex();//恢復讀索引
        System.out.println("執行resetReaderIndex後可讀位元組:"+directBuf.readableBytes());
        directBuf.clear();
        System.out.println("執行clear後可讀位元組:"+directBuf.readableBytes());
        directBuf.readBytes(2);//可讀位元組變為0,此時再讀取會丟擲IndexOutOfBoundsException

輸出如下:

初始化可讀位元組:13
執行markReaderIndex

讀取兩個位元組

讀取後可讀位元組:11
執行resetReaderIndex後可讀位元組:13
執行clear後可讀位元組:0
Exception in thread "main" java.lang.IndexOutOfBoundsException: readerIndex(0) + length(2) exceeds writerIndex(0): UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(ridx: 0, widx: 0, cap: 100)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1390)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:843)
    at com.eric.bytebuf.OperationSample.indexManage(OperationSample.java:32)
    at com.eric.bytebuf.OperationSample.main(OperationSample.java:16)

3.4 查詢操作

在ByteBuf中有多種可以用來確定指定值的索引的方法,常見方法如下:
- indexOf(int,int,byte)
- byteBuf.forEachByte(ByteBufProcessor.FIND_CR):ByteBufProcessor針對一些常見的值定義了許多便利的方法,例如:ByteBufProcessor.FIND_NUL,ByteBufProcessor.FIND_CR等

簡單的例子:

ByteBuf directBuf = Unpooled.directBuffer(100);
directBuf.writeBytes("direct buffer".getBytes());
System.out.println(directBuf.indexOf(0,directBuf.readableBytes(), (byte) 'u'));

3.5 派生緩衝區

派生緩衝區為 ByteBuf 提供了以專門的方式來呈現其內容的檢視。這類檢視是通過以下方
法被建立的:
- duplicate();
- slice();
- slice(int, int);
- Unpooled.unmodifiableBuffer(…);
- order(ByteOrder);
- readSlice(int)。

上述這些方法都將返回一個新的 ByteBuf 例項,它具有自己的讀索引、寫索引和標記
索引。 但是其內部儲存和原始物件是共享的。該種方式建立成本很低廉,但是這也意味著,如果你修改了它的內容,也同時修改了其對應的源例項,所以要小心。

如果需要一個現有緩衝區的真實副本,請使用 copy()或者 copy(int, int)方
法。不同於派生緩衝區,由這個呼叫所返回的 ByteBuf 擁有獨立的資料副本。

3.6 讀/寫操作

正如前文所提到過的,有兩種類別的讀/寫操作:
- get()和 set()操作,從給定的索引開始,並且保持索引不變;
- read()和 write()操作, 從給定的索引開始,並且會根據已經訪問過的位元組數對索
引進行調整。

4. ByteBufHolder

我們經常發現,除了實際的資料負載之外,我們還需要儲存各種屬性值。 HTTP 響應便是一
個很好的例子,除了表示為位元組的內容,還包括狀態碼、 cookie 等。
為了處理這種常見的用例, Netty 提供了 ByteBufHolder。 ByteBufHolder 也為 Netty 的高階特性提供了支援,如緩衝區池化,其中可以從池中借用 ByteBuf, 並且在需要時自動釋放。

ByteBufHolder 只有幾種用於訪問底層資料和引用計數的方法:
- content():返回由這個 ByteBufHolder 所持有的 ByteBuf
- copy():返回這個 ByteBufHolder 的一個深拷貝,包括一個其所包含的 ByteBuf 的非共享拷貝
- duplicate():返回這個 ByteBufHolder 的一個淺拷貝,包括一個其所包含的 ByteBuf 的共享拷貝.

系統預設自帶了一系列的ByteBufHolder,以MemoryFileUpload為例,該類通過封裝將filename,contentType,contentTransferEncoding屬性與對應的file進行關聯。

5. ByteBuf分配

本章節主要對ByteBuf的分配方式進行總結

5.1 使用ByteBufAllocator介面分配

為了降低分配和釋放記憶體的開銷, Netty 通過 interface ByteBufAllocator 實現了
(ByteBuf 的)池化,它可以用來分配我們所描述過的任意型別的 ByteBuf 例項(基於堆緩衝區的,基於直接緩衝區的,基於複合緩衝區的)。

可以通過 Channel(每個都可以有一個不同的 ByteBufAllocator 例項)或者繫結到
ChannelHandler 的 ChannelHandlerContext可以獲取到一個 ByteBufAllocator 的引用,Netty提供了兩種ByteBufAllocator的實現:
- PooledByteBufAllocator:池化了ByteBuf的例項以提高效能並最大限度地減少記憶體碎片
- UnpooledByteBufAllocator:不池化ByteBuf例項,並且在每次它被呼叫時都會返回一個新的例項

雖然Netty預設使用了PooledByteBufAllocator,但這可以很容易地通過ChannelConfig.setAllocator(ByteBufAllocator) 或者在引導應用程式時指定一個不同型別的ByteBufAllocator分配器。

ByteBufAllocator使用示例程式碼如下:

        ByteBufAllocator allocator= new PooledByteBufAllocator();
        ByteBuf directByteBuf = allocator.directBuffer();
        directByteBuf.writeBytes("Get Instance from ByteBufAllocator".getBytes());
        byte[] dst=new byte[directByteBuf.readableBytes()];
        directByteBuf.readBytes(dst);
        System.out.println(new String(dst));

5.2 使用Unpooled緩衝區分配

可能某些情況下,你未能獲取一個到 ByteBufAllocator 的引用。對於這種情況, Netty 提供了一個簡單的稱為 Unpooled 的工具類, 它提供了靜態的輔助方法來建立未池化的 ByteBuf例項.提供的方法如下:
- buffer:返回一個未池化的基於堆記憶體儲存的ByteBuf
- directBuffer:返回一個未池化的基於直接記憶體儲存的 ByteBuf
- wrappedBuffer:返回一個包裝了給定資料的 ByteBuf
- copiedBuffer:返回一個複製了給定資料的 ByteBuf

簡單示例如下:

        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        System.out.println(directBuf.indexOf(0,directBuf.readableBytes(), (byte) 'u'));

==Unpooled 類還使得 ByteBuf 同樣可用於那些並不需要 Netty 的其他元件的非網路專案,
使得其能得益於高效能的可擴充套件的緩衝區 API。==

5.3 使用ByteBufUtil工具類

ByteBufUtil 提供了用於操作 ByteBuf 的靜態的輔助方法。常用方法如hexdump(),equals()等

6. 引用計數

引用計數是一種通過在某個物件所持有的資源不再被其他物件引用時釋放該物件所持有的資源來優化記憶體使用和效能的技術。很多地方都用了該計數,例如JVM的垃圾回收機制。 Netty 在第 4 版中為 ByteBuf 和 ByteBufHolder 引入了引用計數技術,它們都實現了 interface ReferenceCounted

用計數背後的想法並不是特別的複雜;它主要涉及跟蹤到某個特定物件的活動引用的數
量。一個 ReferenceCounted 實現的例項將通常以活動的引用計數為 1 作為開始。只要引用計數大於 0,就能保證物件不會被釋放。當活動引用的數量減少到 0 時,該例項就會被釋放。注意,雖然釋放的確切語義可能是特定於實現的,但是至少已經釋放的物件應該不可再用了。

引用計數對於池化實現(如 PooledByteBufAllocator)來說是至關重要的,它降低了
記憶體分配的開銷。

7. ByteBuf的優點

本文最後來總結下ByteBuf的優點,首先從上文可以看出Netty 的資料處理 主要通過ByteBuf 和ByteBufHolder,ByteBuf優點可總結為:
- 它可以被使用者自定義的緩衝區型別擴充套件;
- 通過內建的複合緩衝區型別實現了透明的零拷貝;
- 容量可以按需增長(類似於 JDK 的 StringBuilder);
- 在讀和寫這兩種模式之間切換不需要呼叫 ByteBuffer 的 flip()方法;
- 讀和寫使用了不同的索引;
- 支援方法的鏈式呼叫;
- 支援引用計數;
- 支援池化。