1. 程式人生 > >netty原始碼解析(4.0)-28 ByteBuf記憶體池:PooledByteBufAllocator-把一切組裝起來

netty原始碼解析(4.0)-28 ByteBuf記憶體池:PooledByteBufAllocator-把一切組裝起來

 

  PooledByteBufAllocator負責初始化PoolArena(PA)和PoolThreadCache(PTC)。它提供了一系列的介面,用來建立使用堆記憶體或直接記憶體的PooledByteBuf物件,這些介面只是一張皮,內部完全使用了PA和PTC的能力。初始化過程分兩個步驟,首先初始化一系列的預設引數,然後初始化PTC物件和PA陣列。

 

預設引數和它們的值

  DEFAULT_PAGE_SIZE: PoolChunk中的page的大小-pageSize,  使用-Dio.netty.allocator.pageSize設定, 預設值:8192。

  DEFAULT_MAX_ORDER: PoolChunk中二叉樹的高度: maxOrder, 使用-Dio.netty.allocator.maxOrder設定,預設值:11。

  DEFAULT_NUM_HEAP_ARENA: 使用堆記憶體的PA陣列的長度,使用-Dio.netty.allocator.numHeapArenas設定,預設值: CPU核心數 * 2。

  DEFAULT_NUM_DIRECT_ARENA: 使用直接記憶體的PA陣列的長度,使用-Dio.netty.allocator.numHeapArenas設定,預設值: CPU核心數 * 2。

  DEFAULT_TINY_CACHE_SIZE:  PTC物件中每個用來快取Tiny記憶體的MemoryRegionCache物件中queue的長度,使用-Dio.netty.allocator.tinyCacheSize設定,預設值:512。

  DEFAULT_SMALL_CACHE_SIZE: PTC物件中每個用來快取Small記憶體的MemoryRegionCache物件中queue的長度,使用-Dio.netty.allocator.smallCacheSize設定,預設值:256。

  DEFAULT_NORMAL_CACHE_SIZE: PTC物件中每個用來快取Normal記憶體的MemoryRegionCache物件中queue的長度,使用-Dio.netty.allocator.normalCacheSize設定,預設值:64。

  DEFAULT_MAX_CACHED_BUFFER_CAPACITY: PTC物件中快取Normal記憶體的大小上限。使用-Dio.netty.allocator.maxCachedBufferCapacity設定,預設值32 * 1024。

  DEFAULT_CACHE_TRIM_INTERVAL:  PTC物件中釋放快取的記憶體閾值。當PTC分配記憶體次數大於這個值時會釋放快取的記憶體。使用-Dio.netty.allocator.cacheTrimInterval設定,預設值:8192。

  DEFAULT_USE_CACHE_FOR_ALL_THREADS: 是否對所有的執行緒使用快取。使用-Dio.netty.allocator.useCacheForAllThreads設定,預設值:true。

  DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT: 直接記憶體的對齊引數,分配直接記憶體的大小必須是它的整數倍。使用-Dio.netty.allocator.directMemoryCacheAlignment設定,預設值:0, 表示不對齊。

 

初始化PoolArena陣列

  PooledByteBufAllocator維護了兩個陣列:

PoolArena<byte[]>[] heapArenas; 
PoolArena<ByteBuffer>[] directArenas;

  heapArenas用來管理堆記憶體,directArenas用來管理直接記憶體。這兩個陣列在構造方法中初始化,構造方法的定義是:

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment)

  prefreDirect: 建立PooledByteBuf時,是否優先使用直接記憶體。

  nHeapArena: 預設使用DEFAULT_NUM_HEAP_ARENA。

  nDirectArena: 預設使用DEFAULT_NUM_DIRECT_ARENA。

  pageSize: 預設使用的DEFAULT_PAGE_SIZE。

  maxOrder: 預設使用DEFAULT_MAX_ORDER。

  tinyCacheSize:  預設使用DEFAULT_TINY_CACHE_SIZE。

  smallCacheSize: 預設使用DEFAULT_SMALL_CACHE_SIZE。

  normalCacheSize: 預設使用DEFAULT_NORMAL_CACHE_SIZE。

  useCacheForAllThreads: 預設使用DEFAULT_USE_CACHE_FOR_ALL_THREADS。

  directMemoryCacheAlignment: 預設使用DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT。

  

  這兩陣列的初始化程式碼如下:

 1       int pageShifts = validateAndCalculatePageShifts(pageSize);
 2 
 3         if (nHeapArena > 0) {
 4             heapArenas = newArenaArray(nHeapArena);
 5             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
 6             for (int i = 0; i < heapArenas.length; i ++) {
 7                 PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
 8                         pageSize, maxOrder, pageShifts, chunkSize,
 9                         directMemoryCacheAlignment);
10                 heapArenas[i] = arena;
11                 metrics.add(arena);
12             }
13             heapArenaMetrics = Collections.unmodifiableList(metrics);
14         } else {
15             heapArenas = null;
16             heapArenaMetrics = Collections.emptyList();
17         }
18 
19         if (nDirectArena > 0) {
20             directArenas = newArenaArray(nDirectArena);
21             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
22             for (int i = 0; i < directArenas.length; i ++) {
23                 PoolArena.DirectArena arena = new PoolArena.DirectArena(
24                         this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
25                 directArenas[i] = arena;
26                 metrics.add(arena);
27             }
28             directArenaMetrics = Collections.unmodifiableList(metrics);
29         } else {
30             directArenas = null;
31             directArenaMetrics = Collections.emptyList();
32         }

  1行,計算pageShifts,演算法是pageShifts = Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize) = 31 - Integer.numberOfLeadingZeros(pageSize)。 Integer.numberOfLeadingZeros(pageSize)是pageSize(32位整數)從最高位起連續是0的位數,因此pageShifts可以簡化為pageShifts = log2(pageSize)。

  4,20行,建立陣列,new PoolArena[size]。  

  6-12,22-17行, 初始化陣列中的PoolArena物件,分別使用PooArena的兩個內部類: HeapArena, DirectArena。

 

初始化PoolThreadCache

   PoolThreadCache使用PoolThreadLocalCache(PTLC)間接初始化,PTLC是PooledByteBufAllocator的內部內,它的定義如下:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>

  這個類派生自io.netty.util.concurrent.FastThreadLocal<T>,  和java.lang.ThreadLocal<T>功能一樣,實現了執行緒本地儲存(TLS)的功能,不同的是FastThreadLocal<T>優化了訪問效能。PTLC覆蓋了父類的initialValue方法,這個方法負責初始化執行緒本地的PoolThreadCache物件。當第一次呼叫PTLC物件的get方法時,這個方法會被呼叫。

 1         @Override
 2         protected synchronized PoolThreadCache initialValue() {
 3             final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
 4             final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
 5 
 6             if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
 7                 return new PoolThreadCache(
 8                         heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
 9                         DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
10             }
11             // No caching for non FastThreadLocalThreads.
12             return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
13         }

  3,4行,分別從headArenas,directArenas中取出一個使用次數最少的PoolArena物件。PoolArena有一個numThreadCaches屬性,這個屬性是AtomicInteger型別的原子變數。它的作用是在用來記錄被PoolThreadCache物件使用的次數。PoolThreadCache物件建立時會在構造方法中會呼叫它的getAndIncrement方法,釋放時在free0方法中呼叫他的getAndDecrement方法。

  6行,  如果執行每個執行緒都使用快取(userCacheForAllThreads==true),或者當成執行緒物件是FastThreadLocalThread時, 在第8行建立一個執行緒專用的PTC物件。

 

PoolChunkList(PCKL)

關鍵屬性

  PoolChunkList<T> nextList

  PoolChunkList<T> prevList

  這兩個屬性表明PCKL物件是一個雙向連結串列的節點。

  PoolChunk<T> head

  這個屬性表明PCKL物件還維護的一個PCK型別的連結串列,head指向這個連結串列的頭。

  int minUsage;

  int maxUsage;

  int maxCapacity;

  minUsage是PCK連結串列中每個PCK物件記憶體的最小使用率,maxUseage是PCK的最大使用率。這兩個值是百分比,例如:minUsage=10, maxUse=50,表示PCK連結串列中只能儲存使用率在[10%,50%)的PCK物件。 maxCapacity表示PCK最大可分配的記憶體數,演算法是: maxCapacity = (int)(chunkSize * (100L - minUseage) / 100L)。

 

初始化PCKL連結串列

  PCKL連結串列有PoolArena負責維護,在PoolArena的構造方法中初始化:

 1 // io.netty.buffer.PoolArena#PoolArena(PooledByteBufAllocator parent, int pageSize,
 2 //          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment)
 3 
 4         q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
 5         q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
 6         q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
 7         q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
 8         q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
 9         qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
10 
11         q100.prevList(q075);
12         q075.prevList(q050);
13         q050.prevList(q025);
14         q025.prevList(q000);
15         q000.prevList(null);
16         qInit.prevList(qInit); 

  4-9行,初始化PCKL節點。每個節點的名字q{num},其中num表示這個節點的最小使用率minUsage,如q075節點的minUsage=%75。

  11-16行,把PCKL節點組裝成一個連結串列。

  使用q(minUsage, maxUsage)表示一個節點,那麼:

  qInit = q(Integer.MIN_VALUE, 25%)

  q000 = q(1%, 50%)

  q025 = q(25%, 75%)

  q075 = q(75%, 100%)

  q100 = q(100%, Integer.MAX_VALUE)

  這個連結串列的結構如下圖所示:

  

PoolChunk(PCK)在PoolChunkList(PCKL)中移動

  一個新建立的PCK物件,它的記憶體使用率是usage=%0,被放進qInit節節點。每次從這個PCK物件中分配記憶體,都會導致它的使用率增加,當usage>=25%,即大於等於qInit的maxUsage時,會把它移動到q000中。繼續從PCK物件中分配記憶體,它的usage繼續增加,當usage大於等於它所屬PCKL的maxUsage時,把它移動到PKCL連結串列中的下一個節點,直到q100為止。下面是記憶體分配導致PCK移動的程式碼:

 1     //io.netty.buffer.PoolChunkList#allocate
 2     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
 3         if (head == null || normCapacity > maxCapacity) {
 4             // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
 5             // be handled by the PoolChunks that are contained in this PoolChunkList.
 6             return false;
 7         }
 8 
 9         for (PoolChunk<T> cur = head;;) {
10             long handle = cur.allocate(normCapacity);
11             if (handle < 0) {
12                 cur = cur.next;
13                 if (cur == null) {
14                     return false;
15                 }
16             } else {
17                 cur.initBuf(buf, handle, reqCapacity);
18                 if (cur.usage() >= maxUsage) {
19                     remove(cur);
20                     nextList.add(cur);
21                 }
22                 return true;
23             }
24         }
25     }

  9-12行,嘗試從PCK連結串列中的所有PCK節點分配所需的記憶體。

  14行,沒有找到能分配記憶體的PCK節點。

  17行,從cur節點分配到所需的記憶體,並初始化PooledByteBuf物件。

  18-21行,如cur節點的使用率大於等於當前PCKL節點maxUsage,呼叫remove方法把cur從head連結串列中刪除,然後呼叫PCKL連結串列中的下一個節點的add方法,把cur移動到下一個節點中。

 

  如果持續地釋放記憶體,把記憶體還給PCK物件,會導致usage持續減小,當usage小於它所屬的PCKL的minUsage時,把它移動到PCKL連結串列中的前一個節點,直到q000位為止。當釋放記憶體導致PCK物件的usage等於%0,會銷燬這個PCK物件,釋放整個chunk的記憶體。下面是釋放記憶體導致PCK物件移動的程式碼:

 1     //io.netty.buffer.PoolChunkList#free
 2     boolean free(PoolChunk<T> chunk, long handle) {
 3         chunk.free(handle);
 4         if (chunk.usage() < minUsage) {
 5             remove(chunk);
 6             // Move the PoolChunk down the PoolChunkList linked-list.
 7             return move0(chunk);
 8         }
 9         return true;
10     }
11 
12     //io.netty.buffer.PoolChunkList#move0
13     private boolean move0(PoolChunk<T> chunk) {
14         if (prevList == null) {
15             // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
16             // all memory associated with the PoolChunk will be released.
17             assert chunk.usage() == 0;
18             return false;
19         }
20         return prevList.move(chunk);
21     }

  第3行,釋放記憶體,把記憶體返還給PCK物件。

  4-7行,如PCK的使用率小於當前PCKL的minUsage,呼叫remove方法把PCK物件從當前PCKL物件中刪除,然後呼叫move0方法把它移動到前一個PCKL節點。

  13-31行,移動PCK到前一個PCKL。

 

完整的記憶體分配釋放流程

記憶體分配

  入口方法:

  io.netty.buffer.AbstractByteBufAllocator#heapBuffer(int, int),建立使用堆記憶體的ByteBuf, 呼叫newHeapBuffer方法。

  io.netty.buffer.AbstractByteBufAllocator#directBuffer(int, int), 建立使用直接記憶體的ByteBuf,  呼叫newDirectBuffer方法。

  具體實現:

  io.netty.buffer.PooledByteBufAllocator#newHeapBuffer(int initialCapacity, int maxCapacity)。

  io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int initialCapacity, int maxCapacity)。 

  這兩個方法都是從PoolThreadCache物件中得到執行緒專用的PoolArena物件,然後呼叫PoolArena的allocate方法建立PoolByteBuf物件。

  PoolArena入口方法:

  io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int),這個方法是PoolArena分配記憶體,建立PoolByteBuf物件的入口方法。它先呼叫子類實現的newByteBuf建立一個PoolByteBuf物件,這個方法有兩個實現:

  io.netty.buffer.PoolArena.HeapArena#newByteBuf(int maxCapacity),建立使用堆記憶體的PooledByteBuf物件。

  io.netty.buffer.PoolArena.DirectArena#newByteBuf(int maxCapacity),建立使用直接記憶體PooledByteBuf物件。

  然後呼叫io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf<T>, int)方法為PoolByteBuf物件分配記憶體,這個方法是分配記憶體的核心方法,下面來重點分析一下它的程式碼:

 1      private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
 2         final int normCapacity = normalizeCapacity(reqCapacity);
 3         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
 4             int tableIdx;
 5             PoolSubpage<T>[] table;
 6             boolean tiny = isTiny(normCapacity);
 7             if (tiny) { // < 512
 8                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
 9                     // was able to allocate out of the cache so move on
10                     return;
11                 }
12                 tableIdx = tinyIdx(normCapacity);
13                 table = tinySubpagePools;
14             } else {
15                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
16                     // was able to allocate out of the cache so move on
17                     return;
18                 }
19                 tableIdx = smallIdx(normCapacity);
20                 table = smallSubpagePools;
21             }
22 
23             final PoolSubpage<T> head = table[tableIdx];
24 
25             /**
26              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
27              * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
28              */
29             synchronized (head) {
30                 final PoolSubpage<T> s = head.next;
31                 if (s != head) {
32                     assert s.doNotDestroy && s.elemSize == normCapacity;
33                     long handle = s.allocate();
34                     assert handle >= 0;
35                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
36                     incTinySmallAllocation(tiny);
37                     return;
38                 }
39             }
40             synchronized (this) {
41                 allocateNormal(buf, reqCapacity, normCapacity);
42             }
43 
44             incTinySmallAllocation(tiny);
45             return;
46         }
47         if (normCapacity <= chunkSize) {
48             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
49                 // was able to allocate out of the cache so move on
50                 return;
51             }
52             synchronized (this) {
53                 allocateNormal(buf, reqCapacity, normCapacity);
54                 ++allocationsNormal;
55             }
56         } else {
57             // Huge allocations are never served via the cache so just call allocateHuge
58             allocateHuge(buf, reqCapacity);
59         }
60     }

  第2行,根據需要的記憶體大小reqCapacity,計算可以分配的標準記憶體大小normCapacity。必須滿足(1)normCapacity>=reqCapacity, (2)normCapacity是directMemoryCacheAlignment的整數倍,此外,還要根據reqCapacity的大小分3中情況:

    reqCapacity>=chunkSize:normCapacity取同時滿足(1),(2)的最小值。

    reqCapacity>=512且reqCapacity<chunkSize: (3)normCapacity>=512*2k, (4)normCapacity<=chunkSize,normCapacit取同時滿足(1),(2),(3),(4)的最小值。

    reqCapacity<412: (5)normCapacity<512, (6)normCapacity是16的整數倍,normCapacity取同時滿足(1),(2),(5),(6)的最小值。

  8-13行,分配Tiny型別的記憶體(<512)。 8-10行,如果PoolThreadCache快取物件中分配到記憶體,分配內流程結束。12-13行,如果快取中沒有,就從Tiny記憶體池中分配一塊記憶體。

  15-20行,分配Small型別的記憶體(>=512且<pageSize)。和分配Tiny記憶體的邏輯相同。

  29-27行,  使用從前兩個步驟中得到的Tiny或Small記憶體的索引,從子頁面池中分配一塊記憶體。33行,從子頁面中分配記憶體。35行,使用分配到的記憶體初始化PoolByteBuf物件,如果能到這裡,分配記憶體流程結束。

  41行,如果子頁面池中還沒有記憶體可用,呼叫allocateNormal方法從PoolChunk物件中分配一個子頁面,再從子頁面中分配所需的記憶體。

  47-55行,分配Normal型別的記憶體(>=pageSize且<chunkSize)。48,49行,從快取中分配記憶體,如果成功,分配記憶體流程結束。53行,快取中沒有可用的記憶體,呼叫allocateNormal方法從PoolChunk中分配記憶體。

  58行,如果分配的是>chunkSize的記憶體。這塊記憶體不會進入PCKL連結串列中。

  

  上面程式碼中的allocateNormal方法封裝了建立PCK物件,從PCK物件中分配記憶體,再把PCK物件放入到PCKL連結串列中的邏輯,也是十分重要的程式碼。

 1     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
 2         if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
 3             q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
 4             q075.allocate(buf, reqCapacity, normCapacity)) {
 5             return;
 6         }
 7 
 8         // Add a new chunk.
 9         PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
10         long handle = c.allocate(normCapacity);
11         assert handle > 0;
12         c.initBuf(buf, handle, reqCapacity);
13         qInit.add(c);
14     }

  2-5行,依次嘗試從每個PCKL節點中分配記憶體,如果成功,分配記憶體流程結束。

  9-13行,先建立一個新的PCK物件,然後從中分配記憶體,使用記憶體初始化PooledByteBuf物件,最後把PCK物件新增PCKL連結串列頭節點qInit中。PKCL物件的add方法會和allocate一樣,根據PCK物件的記憶體使用率,把它移動到連結串列中合適的位置。

  

記憶體釋放

  io.netty.buffer.PooledByteBuf#deallocate方法呼叫io.netty.buffer.PoolArena#free方法,這個free方法負責整個記憶體釋放過程。

 1     void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
 2         if (chunk.unpooled) {
 3             int size = chunk.chunkSize();
 4             destroyChunk(chunk);
 5             activeBytesHuge.add(-size);
 6             deallocationsHuge.increment();
 7         } else {
 8             SizeClass sizeClass = sizeClass(normCapacity);
 9             if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
10                 // cached so not free it.
11                 return;
12             }
13 
14             freeChunk(chunk, handle, sizeClass);
15         }
16     }

  這段程式碼重點在8-14行。第8,9行,優先把記憶體放到快取中,這樣下次就能快速地從快取中直接取用。第14行,在不能放進快取的情況下把記憶體返回給PCK物件。

 1     void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
 2         final boolean destroyChunk;
 3         synchronized (this) {
 4             switch (sizeClass) {
 5             case Normal:
 6                 ++deallocationsNormal;
 7                 break;
 8             case Small:
 9                 ++deallocationsSmall;
10                 break;
11             case Tiny:
12                 ++deallocationsTiny;
13                 break;
14             default:
15                 throw new Error();
16             }
17             destroyChunk = !chunk.parent.free(chunk, handle);
18         }
19         if (destroyChunk) {
20             // destroyChunk not need to be called while holding the synchronized lock.
21             destroyChunk(chunk);
22         }
23     }

  第17行,掉用PCKL物件的free方法把記憶體還給PCK物件,移動PCK物件在PCKL連結串列中位置。如果此時這個PCK物件的使用率變成0,destroyChunk=true。

  第21行,呼叫destroyChunk方法銷燬掉PCK物件。