1. 程式人生 > >netty原始碼解解析(4.0)-25 ByteBuf記憶體池:PoolArena-PoolChunk

netty原始碼解解析(4.0)-25 ByteBuf記憶體池:PoolArena-PoolChunk

  PoolArena實現了用於高效分配和釋放記憶體,並儘可能減少記憶體碎片的記憶體池,這個記憶體管理實現使用PageRun/PoolSubpage演算法。分析程式碼之前,先熟悉一些重要的概念:

  • page: 頁,一個頁是可分配的最小的記憶體塊單元,頁的大小:pageSize = 1 << n (n <= 12)。
  • chunk: 塊,塊是多個頁的集合。chunkSize是塊中所有page的pageSize之和。
  • Tiny: <512B的記憶體塊。
  • Small: >=512B, <pageSize的記憶體塊。
  • Normal: >=pageSize, <=chunkSize的記憶體塊。
  • Huge: >chunkSize的記憶體塊。

  PoolArena維護了一個PoolChunkList組成的雙向連結串列,每個PoolChunkList內部維護了一個PoolChunk雙向連結串列。分配記憶體時,PoolArena通過在PoolChunkList找到一個合適的PoolChunk,然後從PoolChunk中分配一塊記憶體。

 

關鍵屬性

  pageSize: page的大小。必須滿足 pageSize = 1 << n (n>=12)。

  maxOrder: 完全平衡二叉樹的高度。

  chunkSize: chunk的大小。chunkSize = pageSize  * (1 << maxOrder)。

  memory: chunk的記憶體,大小必須>=chunkSize。

  offset: chunk記憶體在memory中的起始位置。 memory大小必須>=offet+chunkSize。

 

page管理

  chunk以完全平衡二叉樹的資料結構管理page, 這顆樹的節點以堆的方式儲存在陣列中, 如果這棵樹的高度maxOrder=4, 它的結構如下圖所示:

 

                                圖-1

  節點名字格式是d-i, d是節點在樹中的深度,i是節點在陣列中的索引。

  它有如下一些性質:

  1. 任意一個節點i, i的取值範圍是:  [1, 1 << (maxOrder + 1) )。i == 1節點是根節點。  
  2. 如果節點i在區間[1 << maxOrder,1 << (maxOrder +1) ), 那麼這些節點都是葉節點。
  3. 除葉節點以外的節點i, i << 1是它的左子節點,(i << 1) + 1 是它的右子節點。除根節點以外的節點i,  i >> 1是它的父節點, i ^ 1是它的另外一個兄弟節點。
  4. 對於一個節點i, 它樹中的深度d = log2(i) (d是整數)。 d相同的節點位於樹中的同一層上,他們包含相同的頁節點數,有相同的最大可分配記憶體。
  5. 任意節點i, 深度為d,  如果把同一層的節點放在一個單獨的陣列中,那麼節點i在這個資料組中的偏移量doffset=i ^ (1 << d)。
  6. 任意節點i, 深度為d, 它包含的頁節點的數量是1 << (maxOrder - d),  記憶體大小是(1 << (maxOrder - d)) * pageSize。
  7. 已知深度d, [1 << d,  1 << (d + 1) )區間內的所有節點的深度都是d。
  8. 任意節點i, 深度d,在memory中的起始位置偏移量是offset +  (1 ^ (1 << d)  * (1 << (maxOrder - d)) * pageSize。

  請記住這些性質。PoolChunk的程式碼很簡潔,可是如果不熟悉這些性質,這些簡潔的程式碼也會難以理解。

 

完全平衡二叉樹的初始化

 1     PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
 2         unpooled = false;
 3         this.arena = arena;
 4         this.memory = memory;
 5         this.pageSize = pageSize;
 6         this.pageShifts = pageShifts;
 7         this.maxOrder = maxOrder;
 8         this.chunkSize = chunkSize;
 9         this.offset = offset;
10         unusable = (byte) (maxOrder + 1);
11         log2ChunkSize = log2(chunkSize);
12         subpageOverflowMask = ~(pageSize - 1);
13         freeBytes = chunkSize;
14 
15         assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
16         maxSubpageAllocs = 1 << maxOrder;
17 
18         // Generate the memory map.
19         memoryMap = new byte[maxSubpageAllocs << 1];
20         depthMap = new byte[memoryMap.length];
21         int memoryMapIndex = 1;
22         for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
23             int depth = 1 << d;
24             for (int p = 0; p < depth; ++ p) {
25                 // in each level traverse left to right and set value to the depth of subtree
26                 memoryMap[memoryMapIndex] = (byte) d;
27                 depthMap[memoryMapIndex] = (byte) d;
28                 memoryMapIndex ++;
29             }
30         }
31 
32         subpages = newSubpageArray(maxSubpageAllocs);
33     }

  在構造方法中,19-30行初始化了兩棵完全一樣的完全平衡二叉樹(形如圖-1): memoryMap, depthMap。這兩個map都是以陣列的方式儲存二叉樹,陣列的長度都是maxSubpageAllocs << 1,  由於maxSubpageAllocs = 1 << maxOrder, 因此長度還可以表示為 1 << (maxOrder + 1)。 map陣列的0項保留,[1,  1 << maxOrder)區間中的每個項是二叉樹的一個節點,每個項的值是節點在樹中的深度。

  depthMap用來記錄每個節點在樹中的深度,初始化之後,值不會發生變化。已知一個節點在陣列中的索引id, 可以使用這個id查詢節點在樹中的深度: depthMap[id]。

  memoryMap用來記錄樹中節點被分配出去的情況,每個項的值會隨著節點分配情況變化而變化。已知一個節點在陣列中的索引id,memoryMap[id]的值會有三中情況:

  1. memoryMap[id] == depth[id]:  所有子節點都沒被分配出去。
  2. memoryMap[id] > depth[id]: 至少有一個子節點被分配出去了,  還有可以分配的子節點。
  3. memoryMap[id] == maxOrder + 1: 這個節點以及完全被分配出去了,沒有可分配的子節點了。

 

從二叉樹中分配一個記憶體大小合適的節點

1     long allocate(int normCapacity) {
2         if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
3             return allocateRun(normCapacity);
4         } else {
5             return allocateSubpage(normCapacity);
6         }
7     }

  這個方法是分配記憶體節點的入口方法,引數normCapacity必須滿足normCapacity = 1 << n。第2行判斷normCapacity和pageSize的大小關係,在前面的構造方法中,subpageOverflowMask = ~(pageSize - 1), 如果pageSize=2048,  subpageOverflowMask的0-11位是0, 12-31位是1,它的二進位制值是: 1111111111111111111100000000000,  (normCapacity & subpageOverflowMask) != 0表示,normCapacity的12-31位中至少有一位是1,此時它>=pageSize, 反之比pageSize小。

  如果normCapacity >= pageSize, 呼叫allocateRun分配一個深度d < maxOrder的節點。

  如果normaCapacity < pageSize, 呼叫allocateSubpage分配一個d == maxOrder的葉葉節點, 即一個page。

  PoolChunk分配記憶體的最小單元是一個page,不能分配比一個page更小的記憶體了。

1     private long allocateRun(int normCapacity) {
2         int d = maxOrder - (log2(normCapacity) - pageShifts);
3         int id = allocateNode(d);
4         if (id < 0) {
5             return id;
6         }
7         freeBytes -= runLength(id);
8         return id;
9     }

  第2行,計算normCapacity大小的記憶體在二叉樹的最大深度d, 只有深度<=d的節點才有可以分配到>=normCapacity的記憶體。normCapacity可以表示為normCapacity = 2k,  log2(normCapacity)就是已知normCapacity求解k。pageShifts可表示為pageSize = 2pageShifts,  pageShifts = log2(pageSize)。 normCapacity在二叉樹上的反向深度 rd = log2(mormCapacity) - pageShifts,  這個表示式比較難以理解,這樣會更加直觀一些:

  pageCount = normCapacity >> log2(pageSize)

  rd = log2(pageCount) 

  pageCount是normCapacity需要的page數量。 反向深度的含義是,d=0對應二叉樹的最大深度maxOrder,  d=1對應maxOrder -1,依次類推。因此maxOrder - rd會得到最大深度d,d <= maxOrder。

  第3行,如果能夠根據d找到一個合適的節點,就會把這個節點記錄為已經使用的狀態,然後返回這個節點的索引id, id的取值區間是[0, 1 << maxOrder)。

  第7行,重新計算剩餘記憶體數。

  rungLength方法用於計算節點id的記憶體長度:

    private int runLength(int id) {
        // represents the size in #bytes supported by node 'id' in the tree
        return 1 << log2ChunkSize - depth(id);
    }

  log2ChunkSize=log2(chunkSize)在構造方法中初始化。 有性質(6)可以得到節點id的長度 length = (1 << maxOrder - depth(id)) * pageSize,它和程式碼中表達式是等價的,推導過程如下:

  已知: 

    log2ChunkSize = log2(chunkSize)

    chunkSize = (1 << maxOrder) * pageSize

    pageSize = 2k = 1 << k

  => chunkSize = (1 << maxOrder) * 2k 

         = 2maxOrder * 2k

         = 2maxOrder + k

  => log2ChunkSize = log2(chunkSize)

            = log2(2maxOrder + k)

            = maxOrder + k 

  => log2ChunkSize - depth(id) = maxOrder + k - depth(id)

  => 1 << log2ChunkSize - depth(id) = 1 << maxOrder + k - depth(id)

                   = (1 << maxOrder - depth(id)) * (1 << k)

                   = (1 << maxOrder - depth(id)) * pageSize

  

  如果需要的記憶體>=pageSize, 就會呼叫allocateNode方法,這個方法的作用是從二叉樹中分配一個節點,返回值id是這個節點的索引。

 1     private int allocateNode(int d) {
 2         int id = 1;
 3         int initial = - (1 << d); // has last d bits = 0 and rest all = 1
 4         byte val = value(id);
 5         if (val > d) { // unusable
 6             return -1;
 7         }
 8         while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
 9             id <<= 1;
10             val = value(id);
11             if (val > d) {
12                 id ^= 1;
13                 val = value(id);
14             }
15         }
16         byte value = value(id);
17         assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
18                 value, id & initial, d);
19         setValue(id, unusable); // mark as unusable
20         updateParentsAlloc(id);
21         return id;
22     }

  allocateNode方法的功能是從memoryMap樹中深度[1, d]的節點中找出一個沒有被分配出去的節點,然後把這個節點記錄為已分配的狀態。尋找順序是自上而下,從左到到右。

  第2行,從第一個節點開始,這個節點是二叉樹的根節點。

  第3行,計算一個32位initial,它的[0, d)位都是0,[d, 31]位都是1。

  第4-6行,檢查是否可以分配一個深度<=d節點, 如果不能分配記憶體失敗,返回-1。 val == maxOrder + 1時表示這個節點的記憶體已經被分配完了,val在[0, maxOrder]區間內時,表示可以分配一個深度在[val, maxOder]區間內的節點。所以在第5行檢查到val>d時表示不能分配到記憶體了。

  8-15行,能夠執行到第8行,說明在這個chunk中,二叉樹中一定至少有一個節點滿足深度等於d, 且沒有任何子節點被分配出去的節點。迴圈,滿足 val < d或(id & initial) == 0會增加一個深度繼續尋找。也就是說如果滿足val == d 且 (id & initial) == 1時,表示找到了符合調條件的節點了。第9行,增加一個深度。 第10,11行檢查左節點。 12,13行檢查右節點。

  19行, 把選中的節點id, 設定成unusable(maxOrder+1)狀態。

  20行,更新所有父節點的值。

   這個方法展示了已知memoryMap中索引為id的值val = memoryMap[id],  找到一個深度為d的空閒節點的演算法。前面已經講過val值的三種情況,其中第2中情況的時候,表示只有節點id下面只能找到深度>=val的空閒節點,索引d<val情況下,無法找到滿足深度等於d的空閒節點。影響memoryMapy[id]值的演算法在updateParentsAlloc中實現:

 1     private void updateParentsAlloc(int id) {
 2         while (id > 1) {
 3             int parentId = id >>> 1;
 4             byte val1 = value(id);
 5             byte val2 = value(id ^ 1);
 6             byte val = val1 < val2 ? val1 : val2;
 7             setValue(parentId, val);
 8             id = parentId;
 9         }
10     }

  3行,得到id的父節點。

  4-6行,取memoryMap中,取節點id和它的兄弟節點的值中交小的一個,如果相等的話就隨意取一個。

  7行,把上一步中的取值設定到父節點上。

  8,2行,深度減1,重複這個過程直到根節點為止。

 

分配一個小於pageSize的子頁subpage

  當需要分配的記憶體小於pageSize時,仍然會分配一個page,因為PoolChunk能分配的最小記憶體單元是一個page。這時候只需分配一個也節點就可以了。

 1     private long allocateSubpage(int normCapacity) {
 2         // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
 3         // This is need as we may add it back and so alter the linked-list structure.
 4         PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
 5         synchronized (head) {
 6             int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
 7             int id = allocateNode(d);
 8             if (id < 0) {
 9                 return id;
10             }
11 
12             final PoolSubpage<T>[] subpages = this.subpages;
13             final int pageSize = this.pageSize;
14 
15             freeBytes -= pageSize;
16 
17             int subpageIdx = subpageIdx(id);
18             PoolSubpage<T> subpage = subpages[subpageIdx];
19             if (subpage == null) {
20                 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
21                 subpages[subpageIdx] = subpage;
22             } else {
23                 subpage.init(head, normCapacity);
24             }
25             return subpage.allocate();
26         }
27     }

  6-10行,分配一個深度d=maxOrder的葉節點。

  17,18行,從subpages取出一個PoolSubpage快取。subpages在構造方法中初始化,subpages = new PoolSubpage[maxSubpageAllocs], maxSubpageAllocs = 1 << maxOrder。subpages的長度就是chunk中的page數量。

  19-24行,如果快取中沒有,建立一個新的。如果有直接初始PoolSubpage。

  25行,分配一個子頁。

  關於PoolSubpage子頁面管理的功能,後面會詳細分析,這裡只涉及和PoolChunk相關的內容。

 

釋放記憶體

  分配記憶體成功後會返回一個long型的handle,64位的handle被分為兩部分,[0, 32)位是二叉樹中的節點索引,可以使用memoryMapIdx(handle)方法取出。[32, 64)位是PoolSubpage中子頁面的索引,可以使用bitMapIdx(handler)方法取出。釋放一個handle時,可能需要同時釋放二叉樹中的節點和PoolSubpage中子頁面,free(int handle)方法實現了這個記憶體釋放過程:

 1     void free(long handle) {
 2         int memoryMapIdx = memoryMapIdx(handle);
 3         int bitmapIdx = bitmapIdx(handle);
 4 
 5         if (bitmapIdx != 0) { // free a subpage
 6             PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
 7             assert subpage != null && subpage.doNotDestroy;
 8 
 9             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
10             // This is need as we may add it back and so alter the linked-list structure.
11             PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
12             synchronized (head) {
13                 if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
14                     return;
15                 }
16             }
17         }
18         freeBytes += runLength(memoryMapIdx);
19         setValue(memoryMapIdx, depth(memoryMapIdx));
20         updateParentsFree(memoryMapIdx);
21     }

  2,3行,分別取出二叉樹的節點id和PoolSubpage中子頁的id。

  5-17行,釋放PoolSubpage子頁。子頁記憶體被釋放之後,subpages陣列中仍然儲存著PoolSubpages物件。13行只有subpage中所有的子頁都釋放完了才會釋放subpage持有的page。

  18-20行,釋放二叉樹中的節點。呼叫setValue把被釋放的節點memoryMap值設定成它原本的深度depth(memoryMapIdx)。 呼叫updateParentsFree, 修改memoryMap記錄,這個方法實現了updateParentsAlloc的逆過程。

  updateParentsFree釋放二叉樹節點的關鍵,如果一個節點被釋放,它的父節點在memoryMap值可能會發生變化。這個方法的實現如下:

 1     private void updateParentsFree(int id) {
 2         int logChild = depth(id) + 1;
 3         while (id > 1) {
 4             int parentId = id >>> 1;
 5             byte val1 = value(id);
 6             byte val2 = value(id ^ 1);
 7             logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
 8 
 9             if (val1 == logChild && val2 == logChild) {
10                 setValue(parentId, (byte) (logChild - 1));
11             } else {
12                 byte val = val1 < val2 ? val1 : val2;
13                 setValue(parentId, val);
14             }
15 
16             id = parentId;
17         }
18     }

  第2行,計算節點id的子節點深度logChild。

  第3行,確保id不是根節點。

  第4行,得到父節點id。

  第5,6行,得到節點id及其兄弟節點memoryMap值: val1, val2。

  第7行,把logChild變成id的深度。

  第9,10行, 如果id及其兄弟節點的指定都是depth(id),表示這兩個節都已經完全釋放,把父節點的指定還原成depth(parentId) == logChild -1 。

  第12,13行,如果id及其兄弟節點至少有一個沒有完全釋放,把較小的值設定到父節點上。

  第16行,深度上移,繼續上面的過程。

  

使用分配的記憶體初始化PooledByteBuf

  使用allocate分配記憶體得到一個handle之後,需要呼叫PooledByteBuf的init方法使用handle對應的記憶體初始化。初始化的關鍵是計算出handle對應的記憶體在memory中的偏移量和長度。前面講的lenthRun可以計算出記憶體的長度,剩下的就是計算記憶體偏移量方法runOffset。PoolChunk的initBuf方法用來初始化一個PooledByteBuf物件:

 1     void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
 2         int memoryMapIdx = memoryMapIdx(handle);
 3         int bitmapIdx = bitmapIdx(handle);
 4         if (bitmapIdx == 0) {
 5             byte val = value(memoryMapIdx);
 6             assert val == unusable : String.valueOf(val);
 7             buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
 8                      arena.parent.threadCache());
 9         } else {
10             initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
11         }
12     }

  第2,3行,在分析free程式碼中解釋過。

  第4-8行,表示這塊記憶體是二叉樹中的一個節點,直接使用init方法初始化。runOffset的演算法是 (memoryMapIdx ^ 1 << depth(memoryMapIdx)) * runLength(memoryMapIdx), 根據性質(5)可知,memoryMapIdx ^ depth(memoryMapIdx) 是節點memoryMepIdx在深度為depth(memoryMapIdx)層上的偏移量doffset,  即這一層前面還有doffset個節點,根據性質(4)可知每個節點的記憶體大小是runLength(memoryMapIdx),所以doffset * runLength(memoryMapIdx)是節點memoryMapIdx在chunk記憶體上的偏移量。最後還要再加上一個offset,它是chuk在memory上的偏移量。

  第10行,表示這塊記憶體是一個subpage,使用initBufWithSubpage初始化。

 1     void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
 2         initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
 3     }
 4 
 5     private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
 6         assert bitmapIdx != 0;
 7 
 8         int memoryMapIdx = memoryMapIdx(handle);
 9 
10         PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
11         assert subpage.doNotDestroy;
12         assert reqCapacity <= subpage.elemSize;
13 
14         buf.init(
15             this, handle,
16             runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
17                 reqCapacity, subpage.elemSize, arena.parent.threadCache());
18     }

  關鍵部分在第二個過載方法。的第14-17行。這個計算記憶體偏移量的演算法是runOffst(memoryMapIdx) + offset + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize,它可以拆分成兩部分:

  memoryMapIdx表示的page在記憶體中的偏移量pageOffset = runOffset(memoryMapIdx) + offset

  子頁面subpage在page中的偏移量: subpOffset = (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize

  其中subpOffset是個陌生的東西,會在後面PoolSubpage相關章節詳細分析。

&n