netty原始碼分析(二十一)Netty資料容器ByteBuf底層資料結構深度剖析與ReferenceCounted初探
ByteBuf
ByteBuf是Netty提供的代替jdk的ByteBuffer的一個容器,首先看一下他的具體用法:
public class ByteBufTest0 {
public static void main(String[] args) {
ByteBuf byteBuf = Unpooled.buffer(10);//堆緩衝區
for(int i=0;i<byteBuf.capacity();i++){
byteBuf.writeByte(i);
}
//絕對方式 不會改變readerIndex
for (int i=0;i<byteBuf.capacity();i++){
System.out.println(byteBuf.getByte(i));
}
//相對方式 會改變writerIndex
for(int i=0;i<byteBuf.capacity();i++){
System.out.println(byteBuf.readByte());
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
Netty ByteBuf內部有幾個遊標:
/* +——————-+——————+——————+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +——————-+——————+——————+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*/
readerIndex 控制讀的遊標,writerIndex 控制寫的遊標,capacity是容量。
再看一下例子:
public class ByteBufTest1 {
public static void main(String[] args) {
/**
* ByteBuf有2種維度,一種是堆內還是堆外
* 另一種是池化還是非池化
*/
//utf-8字元編碼,一個漢字佔3個位元組
ByteBuf byteBuf = Unpooled.copiedBuffer("張hello world", Charset.forName("utf-8"));
//如果是在堆上的返回true
if(byteBuf.hasArray()){
//ByteBuf內部的堆陣列
byte[] cotent = byteBuf.array();
System.out.println(new String(cotent,Charset.forName("utf-8")));
//ByteBuf實際實現類的型別
System.out.println(byteBuf);
System.out.println(byteBuf.arrayOffset());
System.out.println(byteBuf.readerIndex());
System.out.println(byteBuf.writerIndex());
System.out.println(byteBuf.capacity());
int length = byteBuf.readableBytes();
for (int i=0;i<length;i++){
System.out.println((char)byteBuf.getByte(i));
}
System.out.println(byteBuf.getCharSequence(0,4,Charset.forName("utf-8")));
//輸出"張h"
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
Netty ByteBuf所提供的三種緩衝區型別:
1、heap buffer。
2、direct buffer。
3、composite buffer。
Heap Buffer(堆緩衝區)
這是最常用的型別,ByteBuf將資料儲存到JVM的堆空間中,並且將實際的資料存放到byte array中來實現。
優點:由於資料是儲存在JVM的堆中,因此可以快速的創建於快速的釋放,並且它提供了直接訪問內部位元組陣列的方法。
缺點:每次讀寫資料時,都需要先將資料複製到直接緩衝區中再進行網路傳輸。
Direct Buffer(直接緩衝區)
在堆之外直接分配記憶體空間,直接緩衝區並不會佔用堆的容量空間,因為它是由作業系統在本地記憶體進行的資料分配。
優點:在使用Socket進行資料傳遞時,效能非常好,因為資料直接位於作業系統的本地記憶體中,所以不需要從JVM將資料複製到直接緩衝區,效能很好。
缺點:因為Direct Buffer是直接在作業系統記憶體中的,所以記憶體的分配與釋放要比堆空間更加複雜,而且速度要慢一些。
Netty通過提供記憶體池來解決這個問題,直接緩衝區並不支援通過位元組陣列的方式來訪問資料。
重點:對於後端的業務訊息的編解碼來說,推薦使用HeapByteBuf;對於I/O通訊執行緒在讀寫緩衝區時,推薦使用DirectByteBuf。
CompositeByteBuf:
public class ByteBufTest2 {
public static void main(String[] args) {
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(10);
ByteBuf directBuf = Unpooled.directBuffer(8);
compositeByteBuf.addComponents(heapBuf,directBuf);
// compositeByteBuf.removeComponent(0);
Iterator iterator = compositeByteBuf.iterator();
while(iterator.hasNext()){
System.out.println(iterator.next());
}
compositeByteBuf.forEach(System.out::println);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
Composite Buffer(符合緩衝區)
JDK的ByteBuffer和Netty的ByteBuf之間的差異對比:
1、Netty的ByteBuf採用讀寫索引分離的策略(readerIndex與writerIndex),一個初始化(裡面尚未有任何資料)的ByteBuf的readerIndex與writerIndex值都為0.
2、當讀索引與寫索引處於同一個位置時,如果我們繼續讀取,那麼就會丟擲IndexOutOfBoundException。
3、對於ByteBuf的任何讀寫操作都會分別單獨維護讀索引與寫索引,maxCapacity最大 容量預設的限制就是Integer.MAX_VALUE
JDK的ByteBuffer的缺點:
1、final byte[] bb 這是JDK的ByteBuffer物件中用於儲存資料的物件宣告,可以看到,其位元組陣列是被宣告為final的,也就是長度是固定不變的,一旦分配好後不能動態擴容與收縮,
而且當待儲存的資料位元組很大時就很有可能出現那麼就會丟擲IndexOutOfBoundException,如果要預防這個異常,那就需要在儲存事前完全確定好待儲存的位元組大小。如果ByteBuffer的空間不足,我們只有一種解決方案:
建立一個全新的ByteBuffer物件,然後再將之前的ByteBuffer中的資料複製過去,這一切都需要開發者自己來手動完成。
2、ByteBuffer只是用一個position指標來標示位置資訊,在進行讀寫切換時就需要呼叫flip方法或是rewind方法,使用起來很不方便。
Netty的ByteBuf的優點:
1、儲存位元組的陣列是動態的,其最大值預設是Integer.MAX_VALUE,這裡的動態性是體現在write方法中的,write方法在執行時會判斷buffer容量,如果不足則自動擴容。
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);//計算新的容量
// Adjust to the new capacity.
capacity(newCapacity);//自動調節容量
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
2、ByteBuf的讀寫索引是完全分開的,使用起來很方便。
ReferenceCounted
/**
* A reference-counted object that requires explicit deallocation.
* 引用計數回收物件
*
* When a new {@link ReferenceCounted} is instantiated, it starts with the reference count of {@code 1}.
* {@link #retain()} increases the reference count, and {@link #release()} decreases the reference count.
* If the reference count is decreased to {@code 0}, the object will be deallocated explicitly, and accessing
* the deallocated object will usually result in an access violation.
*
* 當一個ReferenceCounted建立的時候他的初始引用數量是1,retain方法增加一個引用數量,release方法減少一個引用數量,如果引用數量是
* 變成0,那麼物件就會被死亡回收,加入引用一個已經被定義為死亡的物件的結果通常是會出現問題的。
*
* If an object that implements {@link ReferenceCounted} is a container of other objects that implement
* {@link ReferenceCounted}, the contained objects will also be released via {@link #release()} when the container’s
* reference count becomes 0.
*
* 如果一個實現了ReferenceCounted介面的這個物件作為一個容器,他的內部的物件也是實現了ReferenceCounted介面,那麼當外邊的容器的
* count引用數量變為0的時候,容器內部的物件也會別回收。
*/
引用加1的邏輯:
public ByteBuf retain(int increment) {
return retain0(checkPositive(increment, "increment"));
}
private ByteBuf retain0(int increment) {
for (;;) {//迴旋鎖
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
//如果refCnt 是0那麼就會出現nextCnt = increment的情況,但是這樣違背了netty的回收計數器的原則,程式就可以往下走,這是
//不合法的,當為0的時候正常的情況是要被回收的。
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
// private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
// AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
//首先使用的是AtomicIntegerFieldUpdater進行的cas操作(基於硬體的更新實現),其次refCnt是
// private volatile int refCnt = 1;即是volatile 型別的,在多執行緒的情況下保證相互之間的可見性。
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {//cas操作增加引用計數
break;
}
}
return this;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
此處用了迴旋鎖+cas保證操作的原子性。
AtomicIntegerFieldUpdater使用反射 更新某個類的內部的一個int型別的並且是volatitle的變數。
這裡提一下AtomicIntegerFieldUpdater:
1、更新器更新的必須int型別的變數 ,不能是其包裝型別。
2、更新器更新的必須是volatitle型別變數,確保執行緒之間共享變數時的立即可見性。
AtomicIntegerFieldUpdater.newUpdater()方法的實現是AtomicIntegerFieldUpdaterImpl:
AtomicIntegerFieldUpdaterImpl的構造器會對型別進行驗證:
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
由此驗證我們說的第一和第二條原則。
3、變數不能是static的,必須是例項變數,因為Unsafe.objectFieldOffset()方法不支援靜態變數(cas操作本質上是通過物件例項的偏移量來直接進行賦值)
4、更新器只能修改可見範圍內的變數,因為更新器是通過反射來得到這個變數,如果變數不可見就會報錯。
實際驗證:
public classAtomicUpdatorTest {
public static void main(String[] args) {
Person person = new Person();
/* for(int i=0;i<10;++i){
Thread thread = new Thread(() -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( person.age++);
});
thread.start();
//會有嚴重的問題,會出現多次列印1之類的問題。。這個時候是AtommicIntegerUpdator登場的時候了
}*/
//原子方式更新
AtomicIntegerFieldUpdater<Person> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class,"age");
for(int i=0;i<10;++i){
Threadthread = newThread(() -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( atomicIntegerFieldUpdater.getAndIncrement(person));
});
thread.start();
}
}
}
classPerson{
volatileintage = 1;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
volatile變數自身具有下列特性。
·可見性。對一個volatile變數的讀,總是能看到(任意執行緒)對這個volatile變數最後的寫
入。
·原子性:對任意單個volatile變數的讀/寫具有原子性,但類似於volatile++這種複合操作不
具有原子性。
對volatile寫和volatile讀的記憶體語義:
·執行緒A寫一個volatile變數,實質上是執行緒A向接下來將要讀這個volatile變數的某個執行緒
發出了(其對共享變數所做修改的)訊息。
·執行緒B讀一個volatile變數,實質上是執行緒B接收了之前某個執行緒發出的(在寫這個volatile
變數之前對共享變數所做修改的)訊息。
·執行緒A寫一個volatile變數,隨後執行緒B讀這個volatile變數,這個過程實質上是執行緒A通過
主記憶體向執行緒B傳送訊息。