每種ByteBuf都有相應的分配器ByteBufAllocator,類似工廠模式。我們先學習UnpooledHeapByteBuf與其對應的分配器UnpooledByteBufAllocator

如何知道alloc分配器那是個?

可以從官方下載的TimeServer 例子來學習,本專案已有原始碼可在 TestChannelHandler.class裡斷點追蹤

從圖可以看出netty 4.1.8預設的ByteBufAllocator是PooledByteBufAllocator,可以參過啟動引數-Dio.netty.allocator.type unpooled/pooled 設定

細心的讀者可以看出分配ByteBuf只有pool跟unpool,但ByteBuf有很多型別,可能出於使用方面考慮,有時不一定設計太死板,太規範反而使學習成本很大

public final class ByteBufUtil {
static final ByteBufAllocator DEFAULT_ALLOCATOR; static {
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
} else {
alloc = PooledByteBufAllocator.DEFAULT;
}
DEFAULT_ALLOCATOR = alloc;
}
}

AbstractReferenceCountedByteBuf是統計引用總數處理,用到Atomic*技術。

refCnt是從1開始,每引用一次加1,釋放引用減1,當refCnt變成1時執行deallocate由子類實現

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); private volatile int refCnt = 1; @Override
public ByteBuf retain() {
return retain0(1);
} private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
} @Override
public boolean release() {
return release0(1);
} private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
} if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
} protected abstract void deallocate();
}

對於ByteBuf I/O 操作經常用的是 writeByte readByte兩種
由於ByteBuf支援多種bytes物件,如 OutputStream、GatheringByteChannel、ByteBuffer、ByteBuf等,
我們只拿兩三種常用的API來做分析,其它邏輯大同小異
如果讀者有印象的話,通常底層只負責流程控制,實現交給應用層/子類處理,AbstractByteBuf.class writeByte/readByte 也是這種處理方式

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
//分配器
private final ByteBufAllocator alloc;
//資料
byte[] array;
//臨時ByteBuffer,用於內部快取
private ByteBuffer tmpNioBuf; private UnpooledHeapByteBuf(
ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
//省去部分程式碼同邊界處理
super(maxCapacity);
this.alloc = alloc;
array = initialArray;
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}
//獲取ByteBuffer容量
@Override
public int capacity() {
ensureAccessible();
return array.length;
}
@Override
public boolean hasArray() {
return true;
}
//獲取原始資料
@Override
public byte[] array() {
ensureAccessible();
return array;
}
//擴容/縮容
@Override
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
//newCapacity引數邊界判斷
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
} int oldCapacity = array.length;
//擴容處理,直接cp到新的array
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
//減容處理
//這裡有兩種處理情況
//1.readerIndex > newCapacity 說明還有資料未處理直接將 readerIndex,writerIndex相等 newCapacity
//2.否則 writerIndex =Math.min(writerIndex,newCapacity),取最少值,然後直接複製資料 //可以看出netty處理超出readerIndex、writerIndex 限界直接丟棄資料。。。。。。 byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex = newCapacity
this.writerIndex = writerIndex;
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
//System.arraycopy(複製來源陣列, 來源組起始座標, 目標陣列, 目標陣列起始座標, 複製資料長度); } else {
this.readerIndex = newCapacity;
this.writerIndex = newCapacity;
}
setArray(newArray);
}
return this;
}
}

AbstractByteBuf.class readBytes 呼叫子類實現 getBytes方法,區別是呼叫readBytes會改變readerIndex記錄

public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf readBytes(ByteBuffer dst) {
int length = dst.remaining();
//checkReadableBytes(length);
if (readerIndex > (writerIndex - length)) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, length, writerIndex, this));
}
//呼叫子類實現
getBytes(readerIndex, dst);
//記錄已讀長度
readerIndex += length;
return this;
}
@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
} //這裡如果index不為負的話只需要 capacity - (index + length) < 0 判斷就可以
//用到 | 運算 如果 index為-1的話 index | length 還是負數 第二個 | (index + length)運算有可能 index + length相加為負
public static boolean isOutOfBounds(int index, int length, int capacity) {
return (index | length | (index + length) | (capacity - (index + length))) < 0;
}
}
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
//支援ByteBuffer讀取
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
//checkIndex(index, dst.remaining());
if (isOutOfBounds(index, dst.remaining(), capacity())) {
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, dst.remaining(), capacity()));
}
dst.put(array, index, dst.remaining());
return this;
}
//支援ByteBuf讀取
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.capacity());
//是unsafe型別,要呼叫jdk unsafe方法複製
if (dst.hasMemoryAddress()) {
PlatformDependent.copyMemory(array, index, dst.memoryAddress() + dstIndex, length);
} else if (dst.hasArray()) { //如果是陣列即 heap型別,直接複製過去
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
dst.setBytes(dstIndex, array, index, length);
}
return this;
} //支援陣列讀取
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.length);
System.arraycopy(array, index, dst, dstIndex, length);
return this;
}
}

AbstractByteBuf.class writeBytes 呼叫子類實現 setBytes方法,區別是呼叫writeBytes會改變writerIndex記錄

public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf writeBytes(ByteBuf src) {
writeBytes(src, src.readableBytes());
return this;
} @Override
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
writeBytes(src, src.readerIndex(), length);
//讀取src資料到this.ByteBuf 所以要更改src readerIndex
src.readerIndex(src.readerIndex() + length);
return this;
}
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
//是否擴容處理
ensureWritable(length);
//呼叫子類實現
setBytes(writerIndex, src, srcIndex, length);
//記錄已寫長度
writerIndex += length;
return this;
} private void ensureWritable0(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}
//寫入資料長度大於最大空間剩餘長度拋異常
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
} //通過分配器計算,引數1寫完後的writerIndex記錄,引數2最大容量長度
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); //子類實現
capacity(newCapacity);
}
//////////////////////////////AbstractByteBufAllocator.class//////////////////////////////////////
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
} final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
} //如果新容量大於4M,不走雙倍擴大演算法,數值範圍取 minNewCapacity <= maxCapacity
if (minNewCapacity > threshold) {
// 除以threshold再乘以threshold得出的結果是 threshold的倍數,可以理解是去掉餘數
int newCapacity = minNewCapacity / threshold * threshold;
//如果剩餘容量不夠4M直接給maxCapacity,否則自增4M
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
} //newCapacity <<= 1 意思是 newCapacity*2,雙倍自增
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
} return Math.min(newCapacity, maxCapacity);
}
}
//setBytes邏輯跟getBytes一樣
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.capacity());
if (src.hasMemoryAddress()) {
PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);
} else if (src.hasArray()) {
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
} else {
src.getBytes(srcIndex, array, index, length);
}
return this;
} @Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.length);
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
}

總結:

1.writeBytes跟setBytes、readBytes跟getBytes區別是前者有記錄,後者沒有,而後者是子類的實現

2.擴容演算法是兩種策略:

  2.1.大於4M時不走double自增,數值範圍取 minNewCapacity <= maxCapacity

  2.2.少於4M時從64開始double自增

3.更改容量也是每個子類實現,要考慮兩種情況

  3.1.大於當前容量

  3.2.小於當前容量,當小於的時候要考慮 readerIndex、writerIndex邊界,當超過 readerIndex、writerIndex邊界heap的策略是丟去原來的資料

4.heap是繼承 AbstractReferenceCountedByteBuf的,當refCnt記錄為1時釋放資料