Netty學習篇⑥--ByteBuf原始碼分析
什麼是ByteBuf?
ByteBuf在Netty中充當著非常重要的角色;它是在資料傳輸中負責裝載位元組資料的一個容器;其內部結構和陣列類似,初始化預設長度為256,預設最大長度為Integer.MAX_VALUE。
ByteBuf資料結構
* <pre> * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | | (CONTENT) | | * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity * </pre>
ByteBuf位元組緩衝區主要由discardable
、readable
、writable
三種類型的位元組組成的;
ByteBuf位元組緩衝區可以操控readerIndex
、writerIndex
二個下標;這兩個下標都是單獨維護
的
名詞 | 解釋 | 方法 |
---|---|---|
discardable bytes | 丟棄的位元組;ByteBuf中已經讀取的位元組 | discardReadBytes(); |
readable bytes | 剩餘的可讀的位元組 | |
writable bytes | 已經寫入的位元組 | |
readerIndex | 位元組讀指標(陣列下標) | readerIndex() |
writerIndex | 位元組寫指標(陣列下標) | writerIndex() |
ByteBuf中主要的類-UML圖
ByteBuf怎麼建立的?
ByteBuf是通過Unpooled來進行建立;預設長度為256,可自定義指定長度,最大長度為Integer.MAX_VALUE;
ByteBuf建立的型別有哪幾種?
1. 基於記憶體管理分類
型別 | 解釋 | 對應的位元組緩衝區類 |
---|---|---|
Pooled | 池化; 簡單的理解就是pooled擁有一個pool 池空間 (poolArea),凡是建立過的位元組緩衝區都會被快取進去, 有新的連線需要位元組緩衝區會先從快取中 get ,取不到則在進行建立; |
1.PooledDirectByteBuf 3.PooledUnsafeDirectByteBuf 4.PooledUnsafeHeapByteBuf |
Unpooled | 非池化; 每次都會建立一個位元組緩衝區 |
1.UnpooledDirectByteBuf 2.UnpooledHeapByteBuf 3.UnpooledUnsafeDirectByteBuf 4.UnpooledUnsafeHeapByteBuf |
優缺點:
- 在頻繁的建立申請位元組緩衝區的情況下,池化要比非池化要好很多,池化減少了記憶體的建立和銷燬,重複使用
- 在非頻繁的情況下,非池化的效能要高於池化,不需要管理維護物件池,所以在不需要大量使用ByteBuf的情況下推薦使用非池化來建立位元組緩衝區
2. 基於記憶體分類
型別 | 解釋 | 特點 | 構造方法 |
---|---|---|---|
heapBuffer(常用) | 堆位元組緩衝區; | 底層就是JVM的堆記憶體,只是IO讀寫需要從堆記憶體拷貝到核心中(類似之前學過的IO多路複用) | buffer(128) |
directBuffer(常用) | 直接記憶體位元組緩衝區; | 直接存於作業系統核心空間(堆外記憶體) | directBuffer(256) |
優缺點:
- heapBuffer是在JVM的堆記憶體中分配一個空間,使用完畢後通過JVM回收機制進行回收,但是資料傳輸到Channel中需要從堆記憶體中拷貝到系統核心中
- directBuffer直接在堆外,系統核心中開闢一個空間,在資料傳輸上要比heapBuffer高(減少了記憶體拷貝),但是由於不受JVM管理在建立和回收上要比heapBuffer更加耗時耗能;
每一種都有自己優勢的地方,我們要根據實際的業務來靈活的運用;如果涉及到大量的檔案操作建議使用directBuffer(搬來搬去確實挺耗效能);大部分業務還是推薦使用heapBuffer(heapBuffer,普通的業務搬來搬去相比在核心申請一塊記憶體和釋放記憶體來說要更加優)。
ByteBuf是怎麼樣回收的
1. heapBuffer
heapBuffer是基於堆記憶體來進行建立的,回收自然而然通過JVM的回收機制進行回收
2. directBuffer回收記憶體的方法
可以通過DirectByteBuffer中的Cleaner來進行清除
或者依靠unsafe的釋放記憶體(freeMemory方法)也可以進行回收
原始碼分析
ByteBuf記憶體分配
ByteBuf的記憶體分配主要分為heap(堆記憶體)和direct(堆外記憶體);
1. heap堆記憶體的分配:通過UnpooledByteBufAllocator類進行記憶體分配
1.1 建立堆緩衝區
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
// 是否支援unsafe
return PlatformDependent.hasUnsafe() ?
// 建立unsafe非池化堆位元組緩衝區
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
// 建立非池化堆位元組緩衝區
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
1.2 unsafe和非unsafe都是通過例項 UnpooledHeapByteBuf 來分配記憶體
// InstrumentedUnpooledUnsafeHeapByteBuf/InstrumentedUnpooledHeapByteBuf
// 最終都是通過這個方法來建立分配記憶體;後面會講講unsafe和普通的非unsafe的區別
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 設定最大容量
super(maxCapacity);
// 檢查記憶體分配類是否為空
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
// allocateArray初始化一個initialCapacity長度的位元組陣列
setArray(allocateArray(initialCapacity));
// 初始化讀寫索引為0
setIndex(0, 0);
}
// 初始化一個initialCapacity長度的位元組陣列
byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
// 初始化讀寫索引為0
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
}
setIndex0(readerIndex, writerIndex);
return this;
}
final void setIndex0(int readerIndex, int writerIndex) {
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}
從原始碼可以得知,堆記憶體ByteBuf通過判斷系統環境是否支援unsafe來判斷是建立UnsafeHeapByteBuf還是heapByteBuf; 如果支援unsafe則返回 InstrumentedUnpooledUnsafeHeapByteBuf
例項,反之則返回 InstrumentedUnpooledHeapByteBuf
例項;但它們都是分配一個byte陣列來進行儲存位元組資料。
1.3 unsafe和非unsafe建立的ByteBuf有什麼區別呢
unsafe和非unsafe建立的heapByteBuf區別在於獲取資料;非unsafe獲取資料直接是通過陣列索引來進行獲取的;而unsafe獲取資料則是通過UNSAFE操控記憶體來獲取;我們可以通過原始碼來看看
heapByteBuf獲取資料
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
// 直接返回陣列對應索引的值
static byte getByte(byte[] memory, int index) {
return memory[index];
}
unsafeHeapByteBuf獲取資料
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] array, int index) {
return PlatformDependent.getByte(array, index);
}
public static byte getByte(byte[] data, int index) {
return PlatformDependent0.getByte(data, index);
}
static byte getByte(byte[] data, int index) {
// 通過unsafe來獲取
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
2. direct記憶體分配:unsafe建立和非unsafe建立
2.1 建立directBuffer
// PlatformDependent檢測執行環境的變數屬性,比如java環境,unsafe是否支援等
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
// 支援unsafe
if (PlatformDependent.hasUnsafe()) {
// 執行環境是否使用不清空的direct記憶體
buf = PlatformDependent.useDirectBufferNoCleaner() ?
new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
// 建立非unsafe例項ByteBuf
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
2.2 unsafe返回 UnpooledUnsafeDirectByteBuf
例項,非unsafe返回 UnpooledDirectByteBuf
例項
// 建立unsafe direct位元組緩衝區
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 設定最大容量
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
// allocateDirect建立DirectByteBuffer(java nio)分配記憶體
setByteBuffer(allocateDirect(initialCapacity), false);
}
// 非unsafe建立direct記憶體
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
2.3 分配 direct記憶體,返回 java nio ByteBuffer例項
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
/**
* Allocates a new direct byte buffer. 分配一個新的直接記憶體位元組緩衝區
*
* <p> The new buffer's position will be zero, its limit will be its
* capacity, its mark will be undefined, and each of its elements will be
* initialized to zero. Whether or not it has a
* {@link #hasArray backing array} is unspecified.
*
* @param capacity
* The new buffer's capacity, in bytes
*
* @return The new byte buffer
*
* @throws IllegalArgumentException
* If the <tt>capacity</tt> is a negative integer
*/
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
// allocateDirect建立DirectByteBuffer(java nio)
DirectByteBuffer(int cap) { // package-private
// 設定檔案描述, 位置等資訊
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 通過unsafe類來分配記憶體
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 例項化cleaner,用於後續回收
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
2.4 設定ByteBuffer的屬性(unsafe和非unsafe)
unsafe設定ByteBuffer
/**
* buffer 資料位元組
* tryFree 嘗試釋放,預設為false
*/
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
// 全域性buffer設定成舊buffer
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
// 釋放舊緩衝區的記憶體
freeDirect(oldBuffer);
}
}
}
// 將當前傳入的buffer設定成全域性buffer
this.buffer = buffer;
// 記錄記憶體地址
memoryAddress = PlatformDependent.directBufferAddress(buffer);
// 將臨時buff設定為null
tmpNioBuf = null;
// 記錄容量大小
capacity = buffer.remaining();
}
// 獲取物件在記憶體中的地址
static long directBufferAddress(ByteBuffer buffer) {
return getLong(buffer, ADDRESS_FIELD_OFFSET);
}
// 通過unsafe操控系統記憶體,獲取物件在記憶體中的地址
private static long getLong(Object object, long fieldOffset) {
return UNSAFE.getLong(object, fieldOffset);
}
非unsafe設定ByteBuffer
// 非unsafe設定屬性
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
根據原始碼,direct通過判斷執行系統環境是否使用useDirectBufferNoCleaner
來例項不同的ByteBufferedReader(unsafe和非unsafe),但是他們最終都是通過ByteBuffer來分配記憶體,底層都是通過在不同的ByteBuf例項中構建一個ByteBuffer來進行儲存位元組資料的(具體可以看看UnpooledDirectByteBuf
的set方法)
2.5 unsafe和非unsafe建立的directByteBuf的區別
unsafe獲取資料:UNSAFE通過索引的記憶體地址來獲取對應的值
@Override
protected byte _getByte(int index) {
// UnsafeByteBufUtil unsafe工具類獲取
return UnsafeByteBufUtil.getByte(addr(index));
}
// addr 獲取索引的記憶體地址
long addr(int index) {
return memoryAddress + index;
}
static byte getByte(long address) {
return PlatformDependent.getByte(address);
}
public static byte getByte(long address) {
return PlatformDependent0.getByte(address);
}
// 通過UNSAFE獲取記憶體地址的值
static byte getByte(long address) {
return UNSAFE.getByte(address);
}
非unsafe獲取資料: 直接通過對應索引的ByteBuffer獲取值
@Override
public byte getByte(int index) {
// 檢查授權,及ByteBuffer物件是否還有引用
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
// 通過索引獲取值
return buffer.get(index);
}
3. ByteBuf擴容
每次我們再往位元組緩衝區中寫入資料的時候都會判斷當前容量是否還能寫入資料,當發現容量不夠時,此時ByteBuf會總動進行擴容;當然我們也可以手動更改ByteBuf的容量;詳細見程式碼分析。
public static void main(String[] args) {
// 利用非池化Unpooled類建立位元組緩衝區
ByteBuf byteBuf = Unpooled.buffer(2);
System.out.println("initCapacity: " + byteBuf.capacity());
byteBuf.writeByte(66);
byteBuf.writeByte(67);
byteBuf.readBytes(1);
System.out.println("readerIndex: " + byteBuf.readerIndex());
System.out.println("writerIndex: " + byteBuf.writerIndex());
// 丟棄已經閱讀的位元組
byteBuf.discardReadBytes();
byteBuf.writeByte(68);
byteBuf.writeByte(69);
System.out.println("readerIndex: " + byteBuf.readerIndex());
System.out.println("writerIndex: " + byteBuf.writerIndex());
System.out.println("capacity: " + byteBuf.capacity());
}
// 執行結果
initCapacity: 2
readerIndex: 1
writerIndex: 2
readerIndex: 0
writerIndex: 3
capacity: 64
上面程式碼的操作步驟:初始化ByteBuf --- 寫入資料 --- 讀取資料 --- 丟棄資料 --- 再寫入資料;
丟棄了一個位元組數的資料又寫入了2個位元組數的資料,初始化容量的緩衝區明顯不夠發生了自動擴容,擴容後的容量:64;它是怎麼進行擴容的呢?什麼時候擴容的呢?看下原始碼
public ByteBuf writeByte(int value) {
// 確保可以寫入(判斷是否容量夠不夠寫入)
ensureWritable0(1);
// 設定寫索引、存值
_setByte(writerIndex++, value);
return this;
}
// minWritableBytes預設為1 因為writeByte每次只能寫入一個位元組數
final void ensureWritable0(int minWritableBytes) {
// 檢查是否還有佔有權和是否還有引用
ensureAccessible();
// writableBytes() = capacity - writerIndex 剩餘可寫容量
if (minWritableBytes <= writableBytes()) {
return;
}
// ByteBuf雖然支援自動擴容但是也有上限(Integer.MAX_VALUE)
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 開始進行擴容 newCapacity = writerIndex + minWritableBytes
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// 將新的容量寫入到ByteBuf
capacity(newCapacity);
}
/**
* 計算新的容量
* minNewCapacity 寫入的最小容量
* maxCapacity 最大容量及Integer.MAX_VALUE 2147483647
*/
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// 4兆大小 4194304
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// 如果超過了4兆,
if (minNewCapacity > threshold) {
// 新的容量擴容為超過的倍數的容量
int newCapacity = minNewCapacity / threshold * threshold;
// 如果超過了最大的容量則直接設定為最大容量
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// 預設擴容大小為64
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
// 左移一位 newCapacity = newCapacity*2
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
從上面的原始碼可知,自動擴容在4兆
的範圍內變化的話,每次擴容都是64 * 2的N字方
(N >= 1); 一旦超過了4兆
則遞增倍數為(newCapacity / 4194304) * 4194304
即表示的是基於4兆增長的倍數。
4. ByteBuf和ByteBuffer的區別
- ByteBuf單獨維護
讀寫
兩個陣列下標,ByteBuffer只有一個下標索引,讀寫的時候需要手動設定(呼叫flip和rewind) - ByteBuffer不支援動態擴容(final型),ByteBuf支援動態擴容上限為Integer.MAX_VALUE
- ByteBuf支援建立對外記憶體(direct記憶體)儲存資料
- ByteBuf支援的Api更加豐富
雖然Netty中使用的ByteBuf來進行快取位元組資料,但是最後在Channel中還是以ByteBuffer(java nio)來進行傳輸
參考文獻:
https://www.cnblogs.com/stateis0/p/9062152.html
https://www.jianshu.com/p/1585e32cf6b4
https://blog.csdn.net/ZBylant/article/details/83037421