1. 程式人生 > >Netty學習之旅------原始碼分析Netty執行緒本地分配機制與PooledByteBuf執行緒級物件池原理分析

Netty學習之旅------原始碼分析Netty執行緒本地分配機制與PooledByteBuf執行緒級物件池原理分析

    final PoolArena<byte[]> heapArena;         //使用輪叫輪詢機制,每個執行緒從heapArena[]中獲取一個,用於記憶體分配。
    final PoolArena<ByteBuffer> directArena;          //同上

    // Hold the caches for the different size classes, which are tiny, small and normal.     //針對不同大小,執行緒快取的記憶體
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;

    private int allocations;

    private final Thread thread = Thread.currentThread();            //當前執行緒
    private final Runnable freeTask = new Runnable() {               //執行緒消亡後,釋放資源,下文會重點講解。
        @Override
        public void run() {
            free0();
        }
    };

    // TODO: Test if adding padding helps under contention
    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

    /*
     * @param  heapArena  執行緒使用的PoolArena.HeapArena
     * @param  directArena 執行緒使用的PoolArena.DirectArena
     * @param tinyCacheSize, tiny記憶體快取的個數。預設為512
     * @param  smallCacheSize small記憶體快取的個數,預設為256個
     * @param normalCacheSize normalCacheSize快取的個數,預設為64
     * @param maxCacheBufferCapacity 
     *         normalHeapCaches中單個快取區域的最大大小,預設為32k  也就是normalHeapCaches[length-1]中快取的最大記憶體空間
     * @param freeSweepAllocationThreshold  在本地執行緒每分配freeSweepAllocationThreshold 次記憶體後,檢測一下是否需要釋放記憶體。
     */
    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        if (maxCachedBufferCapacity < 0) {
            throw new IllegalArgumentException("maxCachedBufferCapacity: "
                    + maxCachedBufferCapacity + " (expected: >= 0)");
        }
        if (freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + maxCachedBufferCapacity + " (expected: > 0)");
        }
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        this.heapArena = heapArena;
        this.directArena = directArena;
        if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
            smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);  
            smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);     //@1
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // The thread-local cache will keep a list of pooled buffers which must be returned to
        // the pool when the thread is not alive anymore.
        ThreadDeathWatcher.watch(thread, freeTask);   
    }
在方法前,已經對構造方法的入參加了說明,關注如下兩個方法。 程式碼@1,建立createNormalCaches  由於PoolThreadCache的設計理念與PoolArena一樣,本身並不涉及到具體記憶體的儲存,PoolThreadCache內部維護MemoryRegionCache[] tinySubpageHeapCaches,MemoryRegionCache[] smallSubpageHeapCaches,其陣列長度與PoolArena相同,MemoryRegionCaches[] normalHeapCaches,快取的是noraml記憶體,Netty把大於pageSize小於chunkSize的空間成為normal記憶體。normalHeapCaches[1] 是normalHeapCaches[0] 的2倍, 先重點關注
PoolThreadCache createNormalCaches 原始碼:
private static <T> NormalMemoryRegionCache<T>[] createNormalCaches(
            int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0) {
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);      //@1
            int arraySize = Math.max(1, max / area.pageSize);                             //@2

            @SuppressWarnings("unchecked")
            NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }
引數 numCaches,為SubPageMemoryRegionCache[]陣列的長度,而cacheSize,為每一個SubPageMemoryRegionCache中快取的記憶體個數,也就是SubPageMemoryRegionCache中entries[]的長度。這裡的cacheSize,就是PooledByteBufAllocator DEFAULT_TINY_CACHE_SIZE=512,DEFAULT_SMALL_CACHE_SIZE=256,DEFAULT_NORMAL_SIZE=64,其實這裡的取名為DEFAULT_TINY_CACHE_LENGTH更加貼切。 程式碼@1,其實應該不需要與area.chunkSize做比較,因為如果超過chunkSize的記憶體,netty不會重複使用,直接在整個堆空間或堆外空間申請並釋放。這裡可能是出於程式碼的自我保護,得到normalHeapCaches中單個 Entry所持有的記憶體不超過該值。 程式碼@2,計算normalHeapCaches陣列的長度,這裡有優化的空間,用位運算:int arraySize = Math.max(1,  max >> numShiftsNormalHeap   ),其中numShiftsNormalHeap為 log2(pageSize)。這樣做的原因,也就是normalHeapCaches 陣列中的元素的大小,是以2的冪倍pageSize遞增的。cacheSize預設為64,引數值來源於PooledByteBufAllocator。接下來關注PoolThreadCache的allocateTiny方法: 1.2  PoolThreadCache allocateTiny方法
/**
     * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }
/**
     * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
    }

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
        if (area.isDirect()) {
            int idx = log2(normCapacity >> numShiftsNormalDirect);
            return cache(normalDirectCaches, idx);
        }
        int idx = log2(normCapacity >> numShiftsNormalHeap);      //@1 
        return cache(normalHeapCaches, idx);
    }

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        boolean allocated = cache.allocate(buf, reqCapacity);          //@2
        if (++ allocations >= freeSweepAllocationThreshold) {      
            allocations = 0;
            trim();                                                                                   //@3
        }
        return allocated;
    }
程式碼@1,根據需要申請的記憶體定位陣列的下標,根據上文講解的陣列長度計算邏輯,相應的定位演算法就顯而易見了。 程式碼@2,MeomoryRegionCache內部持有的 Entry entries[]陣列是真正持有記憶體的單元,故現在將重點轉移到MemoryRegionCache的講解中。 程式碼@3,如果分配次數達到freeSweepAllocationThreshold,進行一次嘗試釋放一次。具體程式碼見 trim()方法的講解。 1.2.2 關於PoolThreadCache allocateForTiny 之MemoryRegionCache 原始碼解讀【針對1.2程式碼@2】 1)MemoryRegionCache屬性與構造方法詳解
private final Entry<T>[] entries;            //MemoryRegionCache真正持有記憶體的地方
/*
private static final class Entry<T> {
            PoolChunk<T> chunk;      //具體的PoolChunk            
            long handle;                      //記憶體持有偏移量,高32位儲存的是bitmaIdx,低32位儲存的是memoryMapIdx
}
*/
        private final int maxUnusedCached;   //表示允許的最大的沒有使用的記憶體數量(已經被快取),預設為size的一半。
        private int head;                                    // 作用類似於ByteBuf的readerIndex,從該位置獲取一個快取的Entiry。
        private int tail;                                       // 作用類似於ByteBuf的writerIndex,從該位置增加一個加入一個新的Entity
        private int maxEntriesInUse;                // 在使用中最大的entry數量
        private int entriesInUse;                       // 目前使用中的entry數量
        @SuppressWarnings("unchecked")
        MemoryRegionCache(int size) {  // size 預設的大小為  512, 256, 64
            entries = new Entry[powerOfTwo(size)];
            for (int i = 0; i < entries.length; i++) {
                entries[i] = new Entry<T>();
            }
            maxUnusedCached = size / 2;  //允許被快取,但沒有使用的最大數量,超過該值,則會觸發記憶體釋放操作。
        }
初始狀態的MemoryRegionCache的各個屬性的值分別為:
maxUnusedCached :   256,128,32,為size的一半;head:0 ;tail:0 ; maxEntriesInUse : 0; entriesInUse : 0 2)MemoryRegionCache的allocate方法詳解
/**
         * Allocate something out of the cache if possible and remove the entry from the cache.
         */
        public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            Entry<T> entry = entries[head];      //@1
            if (entry.chunk == null) {                  //@2
                return false;
            }
            entriesInUse ++;                               //@3
            if (maxEntriesInUse < entriesInUse) {
                maxEntriesInUse = entriesInUse;
            }
            initBuf(entry.chunk, entry.handle, buf, reqCapacity);     //@4
            // only null out the chunk as we only use the chunk to check if the buffer is full or not.
            entry.chunk = null;    //@5
            head = nextIdx(head);    //@6
            return true;
        }
程式碼@1,從entries陣列中獲取一個entry,head指標表示下一個快取的Entry。 程式碼@2,如果entry.chunk為空,則表示執行緒裡暫未快取記憶體,返回false,表示從本地執行緒中分配失敗。 程式碼@3,每分配出一個Entry,則entriesInUse加1,表示正在使用的entry個數。 程式碼@5,用entry中的記憶體初始化ByteBuf。 程式碼@6,head指標加一,如果超過entries的length,則重新從0開始,其實也就是  (head + 1) % (entires.length - 1),這裡使用的是位運算。如果成功分配,則返回true, 結束本次記憶體的分配。 1.2.3 關於PoolThreadCache allocateForTiny 之程式碼@3,trim方法詳解: 該方法的目的是在本地執行緒分配達到一定次數後,檢測一下從本地執行緒快取分配的效率,如果總是分配不到,就是雖然本地有快取一定的記憶體,但每次分配都沒有找到合適記憶體供分配,此時需要釋記憶體回全域性分配池,避免浪費記憶體。
void trim() {
        trim(tinySubPageDirectCaches);
        trim(smallSubPageDirectCaches);
        trim(normalDirectCaches);
        trim(tinySubPageHeapCaches);
        trim(smallSubPageHeapCaches);
        trim(normalHeapCaches);
    }
private static void trim(MemoryRegionCache<?>[] caches) {
        if (caches == null) {
            return;
        }
        for (MemoryRegionCache<?> c: caches) {
            trim(c);
        }
    }
    private static void trim(MemoryRegionCache<?> cache) {
        if (cache == null) {
            return;
        }
        cache.trim();
    }
trim的具體實現是MemoryRegionCache,現在進入到MemoryRegionCache詳解:
/**
         * Free up cached {@link PoolChunk}s if not allocated frequently enough.
         */
        private void trim() {
            int free = size() - maxEntriesInUse;        //@1
            entriesInUse = 0;
            maxEntriesInUse = 0;                             //@2

            if (free <= maxUnusedCached) {           //@3
                return;
            }

            int i = head;
            for (; free > 0; free--) {
                if (!freeEntry(entries[i])) {
                    // all freed
                    break;
                }
                i = nextIdx(i);
            }

            // Update head to point to te correct entry
            // See https://github.com/netty/netty/issues/2924
            head = i;
        }
在進行該方法的實現邏輯之前,我先提供一張草圖,形象的反映head,tail等說明:



程式碼@1,size()方法返回的是  (tail-head) & (length-1),表示當前快取了但未被使用的個數。maxEntriesInUse的值,其實就是entiryesInUse的值。 程式碼@2,程式碼@3,如果快取的並且未使用的個數如果小於允許的值(maxUnusedCached)值是放棄本次記憶體釋放,否則,需要將head到tail這部分的記憶體全部釋放,返回給全域性記憶體分配池。這裡我可能沒有理解透徹,如果是我實現的話,entriesInUse該值不會設定為空,而是直接釋放掉 tail-head這部分的記憶體就好,釋放演算法在記憶體分配與釋放篇已經做過詳細解讀,這裡不重複講解:
@SuppressWarnings({ "unchecked", "rawtypes" })
        private static boolean freeEntry(Entry entry) {
            PoolChunk chunk = entry.chunk;
            if (chunk == null) {
                return false;
            }
            // need to synchronize on the area from which it was allocated before.
            synchronized (chunk.arena) {
                chunk.parent.free(chunk, entry.handle);
            }
            entry.chunk = null;
            return true;
        }
掃描一下MemoryRegionCache類,還有一個方法我們未曾分析過,就是add方法,預設一開始MemoryRegionCache類中的Entry[] entries中的PoolChunk與handle都是空的,只有通過該add方法,將執行緒用過的記憶體快取起來才能重複使用。我們要養成這樣一個習慣,一個ByteBuf用過後,需要呼叫realse方法將其釋放,具體到池化的PooledByteBuf,呼叫其realse方法,並不會將記憶體直接返還給JVM堆,而是放入到記憶體池,供重複使用,由於引入了執行緒本地快取,所以在呼叫PooledByteBuf的release方法時,並不會將它立馬返回給記憶體池(PoolArena),而是放入到本地執行緒快取中。