1. 程式人生 > >Netty4.0原始碼解析:位元組容器UnpooledHeapByteBuf

Netty4.0原始碼解析:位元組容器UnpooledHeapByteBuf

一、引言

Java NIO提供了ByteBuffer作為位元組容器,供Channel讀入和寫入資料。但ByteBuffer使用過於繁瑣,靈活性不夠強。Netty實現了ByteBuf來替代JDK的ByteBuffer。 ByteBuf有以下幾大優點: 1、它可以被使用者自定義的緩衝區型別擴充套件 2、通過內建的複合緩衝區型別實現了透明的零拷貝 3、容量可以按需增長 4、讀寫切換無需呼叫ByteBuffer的filp方法 5、讀和寫採用了不同的索引 6、支援方法鏈式呼叫 7、支援引用計數 8、支援池化

ByteBuf體系結構圖: 在這裡插入圖片描述

ByteBuf常用實現類為:UnpooledHeapByteBuf、UnpooledUnsafeHeapByteBuf、UnpooledDirectByteBuf、UnpooledUnsafeDirectByteBuf。

ByteBuf和JDK的ByteBuffer類似,分為堆緩衝區和直接緩衝區。堆緩衝區直接將資料儲存在JVM堆空間中,它能在沒有使用池化的情況下快速的分配和釋放。直接緩衝區是另外一種ByteBuf模式,它將緩衝區分配在堆外記憶體(非JVM執行時資料區),垃圾收集器不會管理這部分記憶體,它的優勢在於網路資料傳輸,但是它的分配和釋放都較為昂貴。

我們先從常用的UnpooledHeapByteBuf開始分析ByteBuf:

二、UnpooledHeapByteBuf

UnpooledHeapByteBuf是一個非池化的、記憶體空間分配在堆的ByteBuf。 例項化UnpooledHeapByteBuf通常是通過Unpooled靜態工廠方法buffer,buffer方法有多個過載方法,可以指定緩衝區的初始大小,最大大小:

private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
public static ByteBuf buffer() {
    return ALLOC.heapBuffer();
}
//省略其它過載方法...

Unpooled分配ByteBuf通過預設的ByteBufAllocator:UnpooledByteBufAllocator.DEFAULT進行分配。 UnpooledByteBufAllocator繼承了AbstractByteBufAllocator,實現了ByteBufAllocatorMetricProvider介面,它採用單例模式。最終通過newHeapBuffer來例項化UnpooledHeapByteBuf

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

InstrumentedUnpooledUnsafeHeapByteBuf和InstrumentedUnpooledHeapByteBuf是UnpooledByteBufAllocator的內部類,它們分別繼承了UnpooledUnsafeHeapByteBuf和UnpooledHeapByteBuf,繼承關係如下: 在這裡插入圖片描述

如果類載入器持有sun.misc.Unsafe,那麼預設例項化InstrumentedUnpooledUnsafeHeapByteBuf,對於HotSpot虛擬機器來說一般都會例項化這個類。如果沒有sun.misc.Unsafe,那麼預設例項化InstrumentedUnpooledHeapByteBuf。

InstrumentedUnpooledUnsafeHeapByteBuf和InstrumentedUnpooledHeapByteBuf重寫了UnpooledHeapByteBuf的allocateArray方法和freeArray方法。

UnpooledHeapByteBuf持有以下例項變數(包括父類):

//讀索引
int readerIndex;
//寫索引
int writerIndex;
//標記的讀索引
private int markedReaderIndex;
//標記的寫索引
private int markedWriterIndex;
//最大緩衝區容量(單位:位元組)
private int maxCapacity;
//位元組序包裝器
private SwappedByteBuf swappedBuf;
//引用計數
private volatile int refCnt;
//緩衝區分配器
private final ByteBufAllocator alloc;
//緩衝區陣列
byte[] array;
//臨時的JDK ByteBuffer物件
private ByteBuffer tmpNioBuf;
1、構造方法

構造UnpooledHeapByteBuf需要指定一個ByteBuf分配器ByteBufAllocator例項、初始緩衝區大小(單位:位元組)、最大緩衝區大小:

public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    checkNotNull(alloc, "alloc"); //alloc不能為null
    if (initialCapacity > maxCapacity)
        throw new IllegalArgumentException(String.format(
                "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    this.alloc = alloc;
    setArray(allocateArray(initialCapacity)); //分配緩衝區
    setIndex(0, 0); //將讀索引和寫索引設為0
}

1、首先呼叫父類AbstractReferenceCountedByteBuf的構造方法:

protected AbstractReferenceCountedByteBuf(int maxCapacity) {
    super(maxCapacity);
    refCntUpdater.set(this, 1);
}

AbstractReferenceCountedByteBuf持有一個變數refCnt,代表引用計數,在構造時,將引用計數通過CAS將其設為1。 AbstractReferenceCountedByteBuf父類AbstractByteBuf的構造方法:

protected AbstractByteBuf(int maxCapacity) {
    if (maxCapacity < 0)
        throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
    this.maxCapacity = maxCapacity;
}

2、檢查引數:alloc不能為null,初始容量不能大於最大容量 3、呼叫allocateArray分配緩衝區記憶體: allocateArray在UnpooledHeapByteBuf中預設實現為直接構造一個byte陣列並指定長度為initialCapacity,但在子類InstrumentedUnpooledHeapByteBuf和InstrumentedUnpooledUnsafeHeapByteBuf中重寫了這個方法:

@Override
byte[] allocateArray(int initialCapacity) {
    byte[] bytes = super.allocateArray(initialCapacity);
    ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
    return bytes;
}

首先呼叫父類的allocateArray方法分配記憶體byte陣列,如果該類是InstrumentedUnpooledUnsafeHeapByteBuf,那麼這個方法實現為:

@Override
byte[] allocateArray(int initialCapacity) {
    return PlatformDependent.allocateUninitializedArray(initialCapacity);
}

public static byte[] allocateUninitializedArray(int size) {
    return UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD < 0 || UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD > size ?
            new byte[size] : PlatformDependent0.allocateUninitializedArray(size);
}

UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD的取值如下:

UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD = javaVersion() >= 9 && PlatformDependent0.hasAllocateArrayMethod() ?
                tryAllocateUninitializedArray : -1;

只有JDK版本為9以上並且持有jdk.internal.misc.Unsafe類才會採用非new的方式來構造byte陣列,所以對於Java 8來說還是通過new byte[]的方式來分配記憶體。

byte陣列分配完成後,隨即會呼叫setArray方法將這個陣列賦給成員變數array:

private void setArray(byte[] initialArray) {
    array = initialArray;
    tmpNioBuf = null;
}

最後將讀指標和寫指標都設為0。

2、資料的寫入

一般通過ByteBuf的writeBytes方法往緩衝區中寫入位元組型別的資料,writeBytes有多個過載的方法,可支援通過不同型別的源往緩衝區中新增資料,比如byte陣列、ByteBuf、InputStream、Channel通道。

我們先從最簡單的byte陣列寫入緩衝區的方式開始介紹:

(1)byte陣列的寫入
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}

可以分為三個步驟: 1、首先呼叫ensureWritable確保有足夠的空間可供寫入

@Override
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0)
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    ensureWritable0(minWritableBytes);
    return this;
}

final void ensureWritable0(int minWritableBytes) {
    ensureAccessible(); //檢查是否擁有許可權
    if (minWritableBytes <= writableBytes()) //如果緩衝區空間充足,那麼方法返回結束
        return;
    //如果緩衝區空間不足並且如果分配後超出了最大允許的容量,那麼丟擲異常
    if (minWritableBytes > maxCapacity - writerIndex)
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    //計算出擴容後的緩衝區大小,指定最小大小為緩衝區已有的資料位元組數+本次需要寫入的位元組數
    int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
    capacity(newCapacity); //執行擴容
}

如果將byte陣列中指定的內容全部加入到緩衝區後也不會越界,那麼空間充足,方法返回。 如果空間不足並且擴容後超出最大允許的緩衝區大小(maxCapacity),那麼丟擲IndexOutOfBoundsException異常。 如果可以擴容,那麼呼叫calculateNewCapacity計算分配後的緩衝區大小:

private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; //4MB
    if (minNewCapacity == threshold) //如果正好是4MB,那麼方法返回
        return threshold;
    if (minNewCapacity > threshold) { //如果所需分配的容量仍然大於4MB
        int newCapacity = minNewCapacity / threshold * threshold;
        //如果容量超出最大允許的容量,那麼返回maxCapacity,否則返回newCapacity+threshold
        if (newCapacity > maxCapacity - threshold)
            newCapacity = maxCapacity;
        else
            newCapacity += threshold;
        return newCapacity;
    }
    //如果小於4MB
    int newCapacity = 64;
    //從64位元組開始不斷加倍,直到大於minNewCapacity
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }
    //如果超出了最大允許的容量,那麼返回maxCapacity,否則返回newCapacity
    return Math.min(newCapacity, maxCapacity);
}

計算完成後,呼叫capacity方法進行擴容:

@Override
public ByteBuf capacity(int newCapacity) {
    checkNewCapacity(newCapacity); //檢查引數
    
    int oldCapacity = array.length;
    byte[] oldArray = array; 
    if (newCapacity > oldCapacity) { //如果是擴容操作
        byte[] newArray = allocateArray(newCapacity); //分配新的陣列
        System.arraycopy(oldArray, 0, newArray, 0, oldArray.length); //將舊陣列資料拷貝到新陣列
        setArray(newArray); //將新陣列賦值給變數array
        freeArray(oldArray); //釋放舊陣列的空間
    } else if (newCapacity < oldCapacity) { //如果是減少緩衝區大小
        byte[] newArray = allocateArray(newCapacity); //分配新的陣列
        int readerIndex = readerIndex(); //獲取讀索引
        if (readerIndex < newCapacity) { //如果讀指標小於新容量
            int writerIndex = writerIndex(); //獲取寫索引
            if (writerIndex > newCapacity) //如果寫索引大於新容量
                writerIndex(writerIndex = newCapacity); //設定新的寫索引為newCapacity
            System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else { //否則將讀指標設為newCapacity,即指向新陣列最後一個元素
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray); //將新陣列賦值給變數array
        freeArray(oldArray); //釋放舊陣列的空間
    }
    return this;
}

2、呼叫setBytes方法將byte陣列內容新增到緩衝區。

@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length); //檢查索引是否越界
    //從緩衝區寫索引開始,將陣列src的srcIndex~srcIndex+length範圍內的位元組寫入
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}

3、更新寫索引的值。

(2)Channel通道的資料獲取

在Netty中,客戶端傳送的資料通過Channel通道儲存在ByteBuf緩衝區中,供ChannelHandler處理。在這個過程中就會呼叫到int writeBytes(ScatteringByteChanne, int)這個方法

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0)
        writerIndex += writtenBytes;
    return writtenBytes;
}

步驟大體上和byte陣列差不多,主要體現在setBytes方法的不同

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    try {
        return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

private ByteBuffer internalNioBuffer() {
    ByteBuffer tmpNioBuf = this.tmpNioBuf;
    if (tmpNioBuf == null)
         his.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
    return tmpNioBuf;
}

從Channel獲取資料需要JDK的ByteBuffer的支援,所以internalNioBuffer()方法將這個ByteBuf持有的緩衝區陣列作為引數生成了一個臨時的JDK ByteBuffer物件,即ByteBuf和ByteBuffer共享一個緩衝區陣列,所有對ByteBuffer緩衝區的操作都會影響到ByteBuf的緩衝區。 然後,呼叫clear方法將ByteBuffer復位(不會清除緩衝區資料),接著呼叫position方法將索引更新到index位置,然後呼叫limit方法設定最多讀取length長度的資料。 ByteBuffer設定完畢後,通過Channel的read方法將資料寫入到ByteBuffer。

3、資料的讀取

可以呼叫ByteBuf的readBytes方法將緩衝區中的資料寫入到目標載體。readBytes方法有多個過載的方法,可以支援將緩衝區中的資料寫入到byte陣列、另外一個ByteBuf,ByteBuffer、GatheringByteChannel、OutputStream。

(1)寫入到byte陣列

我們以其中一個readBytes方法為例進行介紹:下面這個方法需要傳入三個引數:需要寫入的byte陣列、從哪個地方開始寫入以及從ByteBuf獲取多少位元組的資料。

@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;
    return this;
}

該方法的呼叫可以分為以下幾個步驟:

  • 呼叫checkReadableBytes方法檢查ByteBuf可讀的位元組數是否小於length引數,如果滿足這個條件,那麼說明這個ByteBuf無法獲取length長度的位元組寫入到byte陣列,此時會丟擲IndexOutOfBoundsException
  • 呼叫getBytes方法將資料寫入到byte陣列:
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
    checkDstIndex(index, length, dstIndex, dst.length);
    System.arraycopy(array, index, dst, dstIndex, length);
    return this;
}

在getBytes方法中,首先會檢查byte陣列寫入資料後是否會越界,如果會越界同樣丟擲IndexOutOfBoundsException,然後通過System.arraycopy快速複製緩衝區資料到目標數byte陣列。

(2)寫入到Channel通道

getBytes方法可以將ByteBuf的資料寫入到GatheringByteChannel中。

@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
    ensureAccessible(); //檢查呼叫者的許可權
    return getBytes(index, out, length, false);
}

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf;
    if (internal)
        tmpBuf = internalNioBuffer();
    else
        tmpBuf = ByteBuffer.wrap(array);
    return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
}

這裡建立臨時的ByteBuffer的過程和上面類似,這裡不再贅述。構建好ByteBuffer後,通過Channel的write方法將緩衝區資料寫入Channel通道。

4、UnpooledUnsafeHeapByteBuf

UnpooledUnsafeHeapByteBuf繼承了UnpooledHeapByteBuf,其構造方法為包訪問許可權,在類載入器可以載入sun.misc.Unsafe的情況下,這個類只能通過Unpooled靜態工廠方法例項化,它重寫了直接通過下標獲取緩衝區資料的方法(比如getByte(int index)/getShort(int index)/getLong(int index)等方法),還重寫了分配緩衝區的內部方法allocateArray(我們之前已經提到過)。

getByte等方法獲取緩衝區資料並不是直接通過array[i]這樣獲得,而是通過sun.misc.Unsafe的getByte方法直接通過地址獲取:

static byte getByte(byte[] data, int index) {
    return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}

對於一個位元組來說,可能通過array[i]這樣獲取速度更快,但是對於int、long等4位元組、8位元組的資料,採用sun.misc.Unsafe的方法獲取效率更高。