1. 程式人生 > >netty原始碼分析(二十一)Netty資料容器ByteBuf底層資料結構深度剖析與ReferenceCounted初探

netty原始碼分析(二十一)Netty資料容器ByteBuf底層資料結構深度剖析與ReferenceCounted初探

ByteBuf
ByteBuf是Netty提供的代替jdk的ByteBuffer的一個容器,首先看一下他的具體用法:

public class ByteBufTest0 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.buffer(10);//堆緩衝區
        for(int i=0;i<byteBuf.capacity();i++){
            byteBuf.writeByte(i);
        }
//絕對方式  不會改變readerIndex
        for
(int i=0;i<byteBuf.capacity();i++){ System.out.println(byteBuf.getByte(i)); } //相對方式 會改變writerIndex for(int i=0;i<byteBuf.capacity();i++){ System.out.println(byteBuf.readByte()); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Netty ByteBuf內部有幾個遊標:

/* +——————-+——————+——————+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +——————-+——————+——————+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*/
readerIndex 控制讀的遊標,writerIndex 控制寫的遊標,capacity是容量。

再看一下例子:

public class ByteBufTest1 {
    public static void main(String[] args) {

        /**
         * ByteBuf有2種維度,一種是堆內還是堆外
         * 另一種是池化還是非池化
         */
        //utf-8字元編碼,一個漢字佔3個位元組
        ByteBuf byteBuf = Unpooled.copiedBuffer("張hello world", Charset.forName("utf-8"));

        //如果是在堆上的返回true
if(byteBuf.hasArray()){ //ByteBuf內部的堆陣列 byte[] cotent = byteBuf.array(); System.out.println(new String(cotent,Charset.forName("utf-8"))); //ByteBuf實際實現類的型別 System.out.println(byteBuf); System.out.println(byteBuf.arrayOffset()); System.out.println(byteBuf.readerIndex()); System.out.println(byteBuf.writerIndex()); System.out.println(byteBuf.capacity()); int length = byteBuf.readableBytes(); for (int i=0;i<length;i++){ System.out.println((char)byteBuf.getByte(i)); } System.out.println(byteBuf.getCharSequence(0,4,Charset.forName("utf-8"))); //輸出"張h" } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

Netty ByteBuf所提供的三種緩衝區型別:
1、heap buffer。
2、direct buffer。
3、composite buffer。

Heap Buffer(堆緩衝區)
這是最常用的型別,ByteBuf將資料儲存到JVM的堆空間中,並且將實際的資料存放到byte array中來實現。
優點:由於資料是儲存在JVM的堆中,因此可以快速的創建於快速的釋放,並且它提供了直接訪問內部位元組陣列的方法。

缺點:每次讀寫資料時,都需要先將資料複製到直接緩衝區中再進行網路傳輸。

Direct Buffer(直接緩衝區)

在堆之外直接分配記憶體空間,直接緩衝區並不會佔用堆的容量空間,因為它是由作業系統在本地記憶體進行的資料分配。

優點:在使用Socket進行資料傳遞時,效能非常好,因為資料直接位於作業系統的本地記憶體中,所以不需要從JVM將資料複製到直接緩衝區,效能很好。
缺點:因為Direct Buffer是直接在作業系統記憶體中的,所以記憶體的分配與釋放要比堆空間更加複雜,而且速度要慢一些。

Netty通過提供記憶體池來解決這個問題,直接緩衝區並不支援通過位元組陣列的方式來訪問資料。
重點:對於後端的業務訊息的編解碼來說,推薦使用HeapByteBuf;對於I/O通訊執行緒在讀寫緩衝區時,推薦使用DirectByteBuf。

CompositeByteBuf:

public class ByteBufTest2 {
    public static void main(String[] args) {
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
        ByteBuf heapBuf = Unpooled.buffer(10);
        ByteBuf directBuf = Unpooled.directBuffer(8);
        compositeByteBuf.addComponents(heapBuf,directBuf);
       // compositeByteBuf.removeComponent(0);
        Iterator iterator = compositeByteBuf.iterator();
        while(iterator.hasNext()){
            System.out.println(iterator.next());
        }
        compositeByteBuf.forEach(System.out::println);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

Composite Buffer(符合緩衝區)

JDK的ByteBuffer和Netty的ByteBuf之間的差異對比:
1、Netty的ByteBuf採用讀寫索引分離的策略(readerIndex與writerIndex),一個初始化(裡面尚未有任何資料)的ByteBuf的readerIndex與writerIndex值都為0.
2、當讀索引與寫索引處於同一個位置時,如果我們繼續讀取,那麼就會丟擲IndexOutOfBoundException。
3、對於ByteBuf的任何讀寫操作都會分別單獨維護讀索引與寫索引,maxCapacity最大 容量預設的限制就是Integer.MAX_VALUE

JDK的ByteBuffer的缺點:
1、final byte[] bb 這是JDK的ByteBuffer物件中用於儲存資料的物件宣告,可以看到,其位元組陣列是被宣告為final的,也就是長度是固定不變的,一旦分配好後不能動態擴容與收縮,
而且當待儲存的資料位元組很大時就很有可能出現那麼就會丟擲IndexOutOfBoundException,如果要預防這個異常,那就需要在儲存事前完全確定好待儲存的位元組大小。如果ByteBuffer的空間不足,我們只有一種解決方案:
建立一個全新的ByteBuffer物件,然後再將之前的ByteBuffer中的資料複製過去,這一切都需要開發者自己來手動完成。
2、ByteBuffer只是用一個position指標來標示位置資訊,在進行讀寫切換時就需要呼叫flip方法或是rewind方法,使用起來很不方便。

Netty的ByteBuf的優點:
1、儲存位元組的陣列是動態的,其最大值預設是Integer.MAX_VALUE,這裡的動態性是體現在write方法中的,write方法在執行時會判斷buffer容量,如果不足則自動擴容。

    final void ensureWritable0(int minWritableBytes) {
        ensureAccessible();
        if (minWritableBytes <= writableBytes()) {
            return;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);//計算新的容量

        // Adjust to the new capacity.
        capacity(newCapacity);//自動調節容量
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2、ByteBuf的讀寫索引是完全分開的,使用起來很方便。

ReferenceCounted

/**
* A reference-counted object that requires explicit deallocation.
* 引用計數回收物件
*


* When a new {@link ReferenceCounted} is instantiated, it starts with the reference count of {@code 1}.
* {@link #retain()} increases the reference count, and {@link #release()} decreases the reference count.
* If the reference count is decreased to {@code 0}, the object will be deallocated explicitly, and accessing
* the deallocated object will usually result in an access violation.
*


* 當一個ReferenceCounted建立的時候他的初始引用數量是1,retain方法增加一個引用數量,release方法減少一個引用數量,如果引用數量是
* 變成0,那麼物件就會被死亡回收,加入引用一個已經被定義為死亡的物件的結果通常是會出現問題的。
*


* If an object that implements {@link ReferenceCounted} is a container of other objects that implement
* {@link ReferenceCounted}, the contained objects will also be released via {@link #release()} when the container’s
* reference count becomes 0.
*


* 如果一個實現了ReferenceCounted介面的這個物件作為一個容器,他的內部的物件也是實現了ReferenceCounted介面,那麼當外邊的容器的
* count引用數量變為0的時候,容器內部的物件也會別回收。
*/

引用加1的邏輯:

    public ByteBuf retain(int increment) {
        return retain0(checkPositive(increment, "increment"));
    }

    private ByteBuf retain0(int increment) {
        for (;;) {//迴旋鎖
            int refCnt = this.refCnt;
            final int nextCnt = refCnt + increment;

            // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
            //如果refCnt 是0那麼就會出現nextCnt = increment的情況,但是這樣違背了netty的回收計數器的原則,程式就可以往下走,這是
            //不合法的,當為0的時候正常的情況是要被回收的。
            if (nextCnt <= increment) {
                throw new IllegalReferenceCountException(refCnt, increment);
            }
            //    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
           //     AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
           //首先使用的是AtomicIntegerFieldUpdater進行的cas操作(基於硬體的更新實現),其次refCnt是
           //    private volatile int refCnt = 1;即是volatile 型別的,在多執行緒的情況下保證相互之間的可見性。
            if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {//cas操作增加引用計數
                break;
            }
        }
        return this;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

此處用了迴旋鎖+cas保證操作的原子性。
AtomicIntegerFieldUpdater使用反射 更新某個類的內部的一個int型別的並且是volatitle的變數。
這裡提一下AtomicIntegerFieldUpdater:
1、更新器更新的必須int型別的變數 ,不能是其包裝型別。
2、更新器更新的必須是volatitle型別變數,確保執行緒之間共享變數時的立即可見性。
AtomicIntegerFieldUpdater.newUpdater()方法的實現是AtomicIntegerFieldUpdaterImpl:
AtomicIntegerFieldUpdaterImpl的構造器會對型別進行驗證:


            if (field.getType() != int.class)
                throw new IllegalArgumentException("Must be integer type");

            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

由此驗證我們說的第一和第二條原則。
3、變數不能是static的,必須是例項變數,因為Unsafe.objectFieldOffset()方法不支援靜態變數(cas操作本質上是通過物件例項的偏移量來直接進行賦值)
4、更新器只能修改可見範圍內的變數,因為更新器是通過反射來得到這個變數,如果變數不可見就會報錯。
實際驗證:

    public classAtomicUpdatorTest {
    public static void main(String[] args) {
        Person person = new Person();
 /*       for(int i=0;i<10;++i){
            Thread thread = new Thread(() -> {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println( person.age++);
            });
            thread.start();
            //會有嚴重的問題,會出現多次列印1之類的問題。。這個時候是AtommicIntegerUpdator登場的時候了
        }*/

//原子方式更新
        AtomicIntegerFieldUpdater<Person> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class,"age");
        for(int i=0;i<10;++i){
            Threadthread = newThread(() -> {
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println( atomicIntegerFieldUpdater.getAndIncrement(person));
            });
            thread.start();
        }

    }

}

classPerson{
   volatileintage = 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

volatile變數自身具有下列特性。
·可見性。對一個volatile變數的讀,總是能看到(任意執行緒)對這個volatile變數最後的寫
入。
·原子性:對任意單個volatile變數的讀/寫具有原子性,但類似於volatile++這種複合操作不
具有原子性。

對volatile寫和volatile讀的記憶體語義:
·執行緒A寫一個volatile變數,實質上是執行緒A向接下來將要讀這個volatile變數的某個執行緒
發出了(其對共享變數所做修改的)訊息。
·執行緒B讀一個volatile變數,實質上是執行緒B接收了之前某個執行緒發出的(在寫這個volatile
變數之前對共享變數所做修改的)訊息。
·執行緒A寫一個volatile變數,隨後執行緒B讀這個volatile變數,這個過程實質上是執行緒A通過
主記憶體向執行緒B傳送訊息。