1. 程式人生 > >Netty學習之旅----ByteBuf原始碼解讀之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

Netty學習之旅----ByteBuf原始碼解讀之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

前沿:     在讀原始碼的過程中我發現有如下幾個點,感覺有問題: 1)UnpooledDirectByteBuf的 capacity(int newCapacity)方法,在readerIndex大於或等於newCapacity時,此時不需要將原ByteBuffer中的資料寫入到新的ByteBuffer中,這可以理解,但為什麼要呼叫setIndex(newCapacity,newCapacity)將readerIndex,writerIndex設定為capacity呢?設定之後,該ByteBuf不能讀也不能寫。 2)setByteBuffer 方法,在第一次oldBuffer不為空的時候,此時doNotFree為true(在構造方法中設定),為什麼第一次oldBuffer不為空的時候,不釋放掉該部分記憶體,不明白。

1、首先,我們先看一下ByteBuf的類設計圖,從中更進一步瞭解ByteBuf。


ByteBuf繼承自ReferenceCounted,引用計數,也就是ByteBuf的記憶體回收使用的是引用計數器來實現。 UnpooledHeapByteBuf是非池化的堆記憶體實現,而UnpooledDirectByteBuf是非池化的堆外記憶體(直接記憶體)。非池化的ByteBuf就是利用完之後就需要銷燬,無法重用。 2、UnpooledHeapByteBuf  的繼承鏈   UnpooledHeapByteBuf  --> AbstractReferenceCountedByteBuf-->AbstractByteBuf
2.1、AbstractByteBuf原始碼分析 AbstractByteBuf定義ByteBuf的基本屬性,諸如 readerIndex,writerIndex,markedReaderIndex,markedWriterIndex,maxCapacity,我們知道ByteBuf的容量是可以自動擴容的。 AbstractByteBuf的這兩個屬性,應該引起我們的注意: 一個是SwappedByteBuf swappedBuf; 這個是大端序列與小端序列的轉換。 二個是ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);  //這個又是一個很關鍵點,下篇文章將重點分析,Netty用來解決記憶體洩漏檢測機制。非常重要。
這裡擷取一下SwappedByteBuf的原始碼,採用了典型的裝飾模式來設計。SwappedByteBuf採用 public class SwappedByteBuf extends ByteBuf {     private final ByteBuf buf;     private final ByteOrder order;     public SwappedByteBuf(ByteBuf buf) {         if (buf == null) {             throw new NullPointerException("buf");         }         this.buf = buf;         if (buf.order() == ByteOrder.BIG_ENDIAN) {             order = ByteOrder.LITTLE_ENDIAN;         } else {             order = ByteOrder.BIG_ENDIAN;         }     }     @Override     public ByteOrder order() {         return order;     }     @Override     public ByteBuf order(ByteOrder endianness) {         if (endianness == null) {             throw new NullPointerException("endianness");         }         if (endianness == order) {             return this;         }         return buf;     } } 關於其他AbstractByteBuf,該類設計使用了典型的模板模式,對ByteBuf提供的類,實現時,提供一種模板,然後再提供一個鉤子方法,供子類實現,比如_getLong方法,_setLong等方法,由於該類的實現原理不復雜,就不做進一步的原始碼解讀。 2.2 AbstractReferenceCountedByteBuf原始碼實現,該類主要是實現引用計算的常規方法,充分利用voliate記憶體可見性與CAS操作完成refCnt變數的維護。 其原始碼實現如下:
package io.netty.buffer;

import io.netty.util.IllegalReferenceCountException;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 * Abstract base class for {@link ByteBuf} implementations that count references.
 */
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;

    static {
        AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        }
        refCntUpdater = updater;
    }

    private volatile int refCnt = 1;

    protected AbstractReferenceCountedByteBuf(int maxCapacity) {
        super(maxCapacity);
    }

    @Override
    public final int refCnt() {
        return refCnt;
    }

    /**
     * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
     */
    protected final void setRefCnt(int refCnt) {
        this.refCnt = refCnt;
    }

    @Override
    public ByteBuf retain() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, 1);
            }
            if (refCnt == Integer.MAX_VALUE) {
                throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
                break;
            }
        }
        return this;
    }

    @Override
    public ByteBuf retain(int increment) {
        if (increment <= 0) {
            throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
        }

        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, increment);
            }
            if (refCnt > Integer.MAX_VALUE - increment) {
                throw new IllegalReferenceCountException(refCnt, increment);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {
                break;
            }
        }
        return this;
    }

    @Override
    public ByteBuf touch() {
        return this;
    }

    @Override
    public ByteBuf touch(Object hint) {
        return this;
    }

    @Override
    public final boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

    @Override
    public final boolean release(int decrement) {
        if (decrement <= 0) {
            throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
        }

        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt < decrement) {
                throw new IllegalReferenceCountException(refCnt, -decrement);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                if (refCnt == decrement) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

    /**
     * Called once {@link #refCnt()} is equals 0.
     */
    protected abstract void deallocate();
}
該類,我們只需要瞭解,當一個ByteBuf被引用的次數為0時,dealocate()方法將被呼叫,該方法就是具體回收ByteBuf的操作,由具體的子類去實現。 2.3 UnpooledHeapByteBuf與UnpooledDirectByteBuf原始碼分析: 首先該類的內部結構如下:


對於非池化的UnpooledByteBuf,內部就是使用array來儲存資料,相對簡單,所以原始碼分析,我還是側重於UnpooledDirectByteBuf。 1、容量的擴容 2、記憶體的分配 2.3.1 我們重點關注一下capacity(int newCapacity)方法
public ByteBuf capacity(int newCapacity) {
        ensureAccessible();     // @1 
        if (newCapacity < 0 || newCapacity > maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int readerIndex = readerIndex();
        int writerIndex = writerIndex();

        int oldCapacity = capacity;
        if (newCapacity > oldCapacity) {   // @2
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);  //@21
            oldBuffer.position(0).limit(oldBuffer.capacity());          //@22
            newBuffer.position(0).limit(oldBuffer.capacity());        //@23
            newBuffer.put(oldBuffer);                                            //@24
            newBuffer.clear();                                                         //@25
            setByteBuffer(newBuffer);                                            //@26
        } else if (newCapacity < oldCapacity) { //@3
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                oldBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.put(oldBuffer);
                newBuffer.clear();
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setByteBuffer(newBuffer);
        }
        return this;
    }
程式碼@1,檢測一下訪問性,可達性,就是引用數必須大於0,否則該ByteBuf的內部空間已經被回收了(堆外記憶體) 程式碼@2,擴容操作,思路新建一個快取區,然後將原先快取區的資料全部寫入到新的快取區,然後釋放舊的快取區。 程式碼@21、22,申請一個直接快取區,然後將原緩衝區的postion設定為0,將limit設定為capacity,處於釋放狀態(從快取區讀)。 程式碼@23,將新快取區的postion,limit屬性設定為0,老快取區limit。 程式碼@24,將原緩衝區寫入到新的快取區,然後將快取區置的position設定為0,limt設定為capacity,其實這裡設定position,capacity的意義不大,因為ByteBuf並不會利用內部的ByteBuffer的limit,postion屬性,而是使用readerIndex,wriateIndex。 程式碼@26,關聯新的ByteBuffer,並釋放原快取區的空間。 程式碼@3,壓縮快取區。實現思路是新建一個快取區,如果readerIndex大於新建的ByteBuffer的capacity,則無需將舊的快取區內容寫入到新的快取區中。如果readerIndex小於新capacity,那需要將readerIndex 至(  Math.min(writerIndex, newCapacity) )直接的內容寫入到新的快取,然後釋放舊的快取區。值得注意一點是,如果readerIndex > newCapcity,該ByteBuf的 readerIndex,writerIndex將會被設定為容量值,意味著如果不對readerIndex設定為0,或呼叫discardReadBytes,該快取區是不可以使用的,所以,我不明白這裡為什麼要這樣做,是為了安全?。 我們在重點關注一下setByteBuffer(newBuffer)方法,該方法還負責銷燬原先的ByteBuffer。
private void setByteBuffer(ByteBuffer buffer) {
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                freeDirect(oldBuffer);
            }
        }

        this.buffer = buffer;
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }
釋放原先的記憶體。 2.3.2 記憶體的分配。 Netty在為記憶體的分配,單獨封裝,相關類圖:
目前,先關注UnpooledByteBufAllocator,物件池的ByteBuf在後續章節中重點關注。 結合原始碼,有如下兩個方法引起了我的注意: 1、容量擴容規則(容量增長規則)calculateNewCapacity方法 2、直接記憶體的分配。newDirectBuffer方法 2.3.2.1 calculateNewCapacity
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        if (minNewCapacity < 0) {
            throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
        }
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        final int threshold = 1048576 * 4; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {               //@1
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;c   // @2
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }
引數:minNewCapacity,本次需要申請的最小記憶體 引數:macCapacity,最大總記憶體申請值。 程式碼@1,如果最小需要的記憶體超過設定的 threshold(闊值的話),則迴圈,每次增加threshold,然後看是否達到本次申請目標。 程式碼@2,如果需要申請的記憶體小於闊值,則以64個位元組以2的冪增長。 這裡提現了記憶體擴容時的一個優化點。 2.3.2.2 newDirectBuffer方法
@Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        ByteBuf buf;
        if (PlatformDependent.hasUnsafe()) {
            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
該方法中,除了見到申請一個直接記憶體外,還將該buf 變成一個可感知的物件 toLeakAwareBuffer方法,用於該物件被引用的情況,因為UnpooledDirectByteBuf是一個聚合物件,內部維護了一個java.nio.ByteBuffer的直接對外記憶體空間,在什麼是釋放UnpooledDirectByteBuf中的堆外記憶體呢?在UnpooledDirectByteBuf被java垃圾回收的時候,應該於此同時需要釋放指向的堆外記憶體,但堆外記憶體不受JVM GC的管理,所以我們只有感知到UnpooledDirectByteBuf被JVM虛擬機器回收後,手動去釋放堆外記憶體,大家想想都知道,我們可以通過JAVA提供的引用機制,來實現跟蹤垃圾回收器的收集工作,虛引用的作用來了,下一篇,我將會以這個為入口點,重點分析Netty堆外記憶體如何管理,也就是記憶體洩露檢測等方面的課題。