Netty學習之旅----原始碼分析記憶體分配與釋放原理
阿新 • • 發佈:2019-02-14
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
buf.maxCapacity(maxCapacity);
return buf;
}
這裡有個RECYLER.get(),在這裡首先先知道是Netty關於ByteBuf的一個輕量級的物件池的實現,Netty會在本地執行緒變數中重複利用釋放掉的ByteBuf,關於本地執行緒快取機制,請看下文關於本地執行緒記憶體分配專題講解。建立一個PooledByteBuf物件後,然後進入到PoolArena.allocate方法,為建立的PooledByteBuf物件分配記憶體(PooledByteBuf內部快取區的)。private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity); //@1
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize //@2
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(normCapacity)) { // < 512 //@21
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {//@211
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else { //@22
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
//@221
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
synchronized (this) { //@3
final PoolSubpage<T> head = table[tableIdx];
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
return;
}
}
} else if (normCapacity <= chunkSize) { // @4
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
} else { //@5
// Huge allocations are never served via the cache so just call allocate // Huge
allocateHuge(buf, reqCapacity);
return;
}
allocateNormal(buf, reqCapacity, normCapacity); //@6
}
程式碼@1,先對reqCapacity進行處理,將reqCapacity轉換成一個合適的容量tiny,small記憶體,tiny記憶體為16,32,48,到512,tiny記憶體為[0,512),而small記憶體為1024,2048,4096成倍增長,直到(pageSize-1),該方法已經在原始碼分析記憶體分配前置篇中有過解讀,在此不做重複講解。
程式碼@2,申請記憶體小於pageSize
程式碼@4,申請記憶體小於chunkSize,
程式碼@5,申請記憶體大於chunkSize
程式碼@21,@22,從PoolArena中會快取將tiny,small記憶體按照記憶體大小進行組織。相同大小的tiny,small會以連結串列的形式儲存。首先嚐試從本地執行緒申請,申請失敗後,才會從全域性記憶體中進行分配。對於第一次記憶體申請,最後會走程式碼@6。由於已經對PoolArena,PoolChunk的資料結構等都講解清楚了,故該方法不難理解,故先重點關注allocateNormal方法。private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if ( q050.allocate(buf, reqCapacity, normCapacity)
|| q025.allocate(buf, reqCapacity, normCapacity)
|| q000.allocate(buf, reqCapacity, normCapacity)
|| qInit.allocate(buf, reqCapacity, normCapacity)
|| q075.allocate(buf, reqCapacity, normCapacity)
|| q100.allocate(buf, reqCapacity, normCapacity)) {
return;
} // @1
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSiz e); //@2
long handle = c.allocate(normCapacity); //@3
assert handle > 0;
c.initBuf(buf, handle, reqCapacity); //@4
qInit.add(c); //@5
}
程式碼@1,首先從PoolArena的PoolChunkList中去選擇一個合適的PoolChunk進行分配,前文已經詳細介紹PoolArena按照PoolChunk的使用率區間來組織。PoolArena按照使用率區間有如下PoolChunkList,qInt(Integer.MIN_VALUE,25)--->q000(1,50)--->q025(25-75)-->q050(50-100)-->q075(75,100)--->q100(100-Integer.MAX_VALUE)
。然後每個PoolChunkList維護一個PoolChunk的連結串列,這裡的PoolChunk的使用率區間是一致的,當隨著使用率的增加,PoolChunk沿著上面的連結串列從一個PoolChunkList進入到下一個PoolChunkList。下面將羅列出這塊程式碼,PoolChunkList的allocate方法,只是負責選出一個PoolChunk,然後具體的記憶體分配還是有PoolChunk來分配,故不做重點講解。
程式碼@2,建立一個新的PoolChunk,建立具體的記憶體,堆PoolArena的建立有不同的子類PoolArena.HeapArena和PoolArena.DirectArena實現。
程式碼@3,從PoolChunk中分配記憶體,重點關注。根據下文的講解,原來PoolChunk的allocate方法返回只是一個long型別的資料,其高32位儲存的是待分配的PoolSubpage的bitmapIdx,低32位儲存的是PoolChunk的memoryMap的下標id。當然,目前所有的分析還是申請的記憶體小於PageSize。也就是在PoolSubpage中完成記憶體分配。
程式碼@4,將分配的記憶體與PooledByteBuf物件建立起連線。這步是真正為PooledByteBuf關聯記憶體。詳細解讀請看關注下文2.4。
程式碼@5,將建立的PoolChunk加入到PoolArena的PoolChunkList中。
2.3 PoolChunk的allocate分配方法詳解long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity); //@2
} else {
return allocateSubpage(normCapacity); //@1
}
}
程式碼@1,分配記憶體大於pageSize,這裡返回的是memoryMap的id。
程式碼@2,分配記憶體小於PageSize,這裡返回的是一個long型別的變數,高32位表示PoolSubpage中的bitmap中的bitmapIdx,表示第多少個記憶體區域,低32位表示memoryMap的id。
2.3.1 PoolChunk allocateSubpage,分配小於pageSize的記憶體/**
* Create/ initialize a new PoolSubpage of normCapacity
* Any PoolSubpage created/ initialized here is added to subpage pool in the PoolArena that owns this PoolChunk
*
* @param normCapacity normalized capacity
* @return index in memoryMap
*/
private long allocateSubpage(int normCapacity) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d); //@1
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id); // @2
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(this, id, runOffset(id), pageSize, normCapacity); // 程式碼@3
subpages[subpageIdx] = subpage;
} else { //@4
subpage.init(normCapacity);
}
return subpage.allocate(); //程式碼@5
}
首先講解一下整體分配思路,先根據PoolChunk內部維護的各個PoolChunk的佔用情況,返回一個可以PoolChunk的id,這裡的id就是memoryMap陣列的下標。然後從PoolChunk維護的PoolSubpage陣列物件中獲取一個與之對應的PoolSubpage,如果為空,先建立一個PoolSubpage。然後再在PoolSubpage中分配記憶體。如果已經存在PoolSubpage,則直接分配。
程式碼@1,首先,我們需要從meomryMap中尋找一個可供分配的PoolSubpage。memoryMap就是記錄整個PoolSubpage的佔用情況。
程式碼@2,根據我們現在掌握的知識應該知道,memoryMap[ 2的maxOrder]存放的是第一個PoolSubpage,memoryMap[ 2的 (maxOrder+1)的冪再減去1]存放的是最後一個PoolSubpage,所以接下來我們需要將id轉換為實際的PoolSubpage。從上面的講解,其實
id 與 2的maxOrder的偏移,,比如基數2048對應0,2049對應1,基數是2048,根據id求陣列中的下標應該清楚了吧,給出如下三個演算法實現,請看下午的關於subpageIdx方法的詳解。
程式碼@3,如果id對應的PoolSubpage沒有被初始化,則新建一個PoolSubpage。在記憶體分配前置篇的時候特意講解了PoolSubpage幾個關鍵屬性,當然包括這裡的runOffset;在這裡再重複一遍:在整個Netty記憶體管理中,其實真正持有記憶體的類是PoolChunk, 如果是直接記憶體,就是
java.nio.ByteBuffer memory;如果是堆記憶體的話,就是byte[] memory,我們以堆記憶體來講解,比較直觀,一個PoolChunk一旦建立,就會分配記憶體 byte[] memory = new byte[chunkSize];然後PoolChunk由一個一個的PoolSubpage組成,也就是PoolSubpage[] subpages;那們我們如果儲存 memoryMap[id]位置代表的PoolSubPage的記憶體呢?使用一個偏移量,相對於PoolChunk的byte[] meomry的偏移量。那怎麼計算呢?先得出id所在平衡二叉樹的深度,就能得到該層有多個節點,所謂的偏移量,就是針對同級的。當然,真正的偏移量主要還是針對PoolSubpage,整個Netty的記憶體管理,真正涉及到記憶體的只有PoolChunk,PoolSubpage維護一個偏移量,其實就是一個指標,表示PoolChunk的T
memory的(從runOffset 至 (runOffset+pageSize)被這個PoolSubpage佔用。
程式碼@5,PoolSubpage的具體分配方法。
2.3.1.1 關於程式碼@1PoolChunk allocateSubpage 的allocateNode的詳細解讀如下:
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable
return -1;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0 //@1
id <<= 1; //@2
val = value(id); //@3
if (val > d) { //@4
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // mark as unusable //@5
updateParentsAlloc(id); //@6
return id;
}
private byte value(int id) {
return memoryMap[id];
}
這裡再重複一次PoolChunk裡面維護一顆平衡二叉樹來對映PoolChunk每個PoolSubpage的是否已經被分配的情況,平衡二叉樹的深度為maxOrder,也就是平衡二叉數的所有葉子節點代表所有的PoolSubpage,而整棵二叉樹從根到下,從左到右被對映為一個數組memoryMap,memoryMap長度為(
2的 (maxOrder=1)),memoryMap[0]處不存放任何元素,從 id=1到 2的(maxOrder-1)冪-1,舉個更直觀的例子,比如maxOrder為10,則memoryMap[]的長度為2048,memoryMap[1] 存放平衡二叉樹的根節點,memory[2],memory[3]存放深度為1的節點,依次類推,memoryMap[2047]存放最後一個葉子節點。具體的演算法是從根節點(dept=0)開始,一直向下找,直到到達層為d。找合適的節點。
程式碼@2,找到id的左子樹(左節點)
程式碼@3,獲取該id所在二叉樹的深度。
程式碼@4,val > d 表示已經被佔用,需要找id的兄弟,也就是右節點,在平衡二叉樹,已知左節點,求右節點的下標,使用id^1即可,然後看右節點是否被佔用,如果沒有,直接返回。理解這個演算法,要知道,如果memoryMap[id]不可用,則不會繼續去查詢其子節點,也就是說,如果memoryMap[id]可用,就代表了至少有一個節點是沒有被佔用的。如果全被佔用,id的值會為
2的(maxOrder+1)的冪,再減去1,如果按照上面的例子,id=2047
程式碼@5,將memoryMap[id]=unusable,(maxOrder+1),代表PoolSubpage已經被佔用。
程式碼@6,遞迴更新父節點的佔用情況,這裡有個哲學,memoryMap[id]中存的值,代表可以至少可以分配的深度數,比如memoryMap[8] = 2,則表示id為8的節點可以勝任分配深度>=2的記憶體大小,我們知道,深度越大,節點代表的記憶體就越小。比如id=3的節點,最小可以分配的深度為2,那他的子節點id為(6,7)能分配的最小深度為3,那比如id=6的節點已被分配,那它的直接父節點id=3,此時就不能再分配深度為2的記憶體了,因為已經被佔用了一半,只能分配深度為3。updateParentsAlloc方法就是按照上述思路實現的,具體情況分析如下:
private void updateParentsAlloc(int id) {
while (id > 1) {// @1
int parentId = id >>> 1; // @2
byte val1 = value(id); //@3
byte val2 = value(id ^ 1); //@4
byte val = val1 < val2 ? val1 : val2; //@5
setValue(parentId, val);
id = parentId;
}
}
程式碼@1,迴圈呼叫,知道根節點。
程式碼@2,找到父節點
程式碼@3,@4,得到parentId兩個子節點當前可分配的深度值
程式碼@5,父節點可分配的深度值為兩個子節點可分配的深度值的最小值。
2.3.1.2 PoolChunk的allocateSubpage程式碼@2 的 subpageIdx方法:
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
//我們一般簡單的實現:
//return memoryMapIdx - maxSubpageAllocs;
//或 return memoryMapIdx & ( ~( (1 << maxOrder) -1) );
}
2.3.1.3 PoolChunk的allocateSubpage程式碼@5 poolSubpage.allocate方法詳解:/**
* Returns the bitmap index of the subpage allocation.
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) { //@1
return -1;
}
final int bitmapIdx = getNextAvail(); //@2
int q = bitmapIdx >>> 6; //@3
int r = bitmapIdx & 63; //@4
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r; //@5
if (-- numAvail == 0) {
removeFromPool(); //@6
}
return toHandle(bitmapIdx); //@7
}
程式碼@1,如果可用記憶體塊小於0,則直接返回-1
程式碼@2,從bitmap中,獲取下一個可用的記憶體區域,PoolSubpage由大小相等的記憶體區域組成,總共maxNumElems個,每個區域大小為elemSize。如果沒有可用的區域,返回-1,否則返回0-(maxNumElms)之間的一個值。
程式碼@3;q表示在bitmap中的下標
程式碼@4,r表示 bitmapInx % 64 的餘數
程式碼@5,就是要將即將返回的PoolSubpag所代表的位設定為1, 1L<<<r,然後與bimap[q] | 1L<<r即可實現這樣的邏輯。
程式碼@6,如果該PoolSubpage所有記憶體都被分配後,從PoolSubpage連結串列中移除,這是在哪個連結串列中呢?是PoolChunk的連結串列?不是,因為PoolChunk只維護一個所有的PoolSubpage陣列,是移除PoolArena的PoolSubpage連結串列,從這裡也可以看出PoolArena維護的tinyPoolSubpages[]和smallPoolSubpages[]中的PoolSubpage都是未全部用完的PoolSubpage。
程式碼@7,返回一個long型別的變數,該變數的後32位儲存中該PoolSubpage在PoolChunk的memoryMap中的下標id,高32位儲存bitmapIdx。
關於PoolSubpage allocate 程式碼@2詳解:getNextAvail方法:
private int getNextAvail() {
int nextAvail = this.nextAvail;
if (nextAvail >= 0) { //@1
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();
}
如果是第一次分配,直接返回0,即可,因為下一個可用的就是0
如果不是第一次分配,然後需要從bitmap中獲取一個可用的PoolSubpage。進入findNextAvail()方法。
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) { //@2
long bits = bitmap[i];
if (~bits != 0) { //@3
return findNextAvail0(i, bits); //@4
}
}
return -1;
}
程式碼@2,遍歷bitmap,根據當前的elmSize可以得知當前用來多少個long來代表總的長度。
程式碼@3,如果取反不為0,則說明用這個long代表的64個PoolSubpage未使用完,可用從這裡獲取一個,故進入到程式碼@4。
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6; //@5
for (int j = 0; j < 64; j ++) { //遍歷,一個long總代表64個PoolSubpage
if ((bits & 1) == 0) { //@6
int val = baseVal | j; //@8
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1; //@7
}
return -1;
}
程式碼@5,baseVal = i << 6,也就是i * 64,如果i在bitmap陣列中的下標為0,baseVal則等於
0,如果下標為1,則baseVal=64,如果下標為2,則baseVal=128。
程式碼@6,用bits&1判斷是否等於0,也就是判斷bits的低位是否被佔用,如果為1,則表示被佔用,如果為0,表示未被佔用,如果結果為0,則可以返回該值了,如果不為0,則無符號向右移動1位,相當與去掉最後一位,繼續比較。從這裡就可以看出,一個long從低位開始被標記。我們舉例說明一下,比如現在 用兩個long型別可以標記所有的PoolSubpage,比如bitmap[0]
= (00000000 00000000 00000000 00000000 00000000 00000000 00000000 0111111),表示已經分配了7個PoolSubpage。
程式碼 toHandle方法詳解
private long toHandle(int bitmapIdx) {
return 0xb000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
該值,主要是用一個long型別的值的低32位來表示 memoryMapIdx,用高32位表示 bitmapIdx,這裡為什麼需要與0xb000000000000000L進行或運算呢?據我的目前掌握的知識看,主要是將0對映為一個數字,主要來區分是在PoolSubpage中分配的,還是記憶體大於pageSize,在記憶體釋放的時候,可以通過bitmaIdx
>> 32 & 0x3FFFFFFF 得出原來的索引。
2.2.2.1.4關於 PoolArena allocate程式碼@4 initBuf詳解:
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = (int) handle;//@1
int bitmapIdx = (int) (handle >>> Integer.SIZE); //@2
if (bitmapIdx == 0) {//@3
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx));
} else {//@4
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
首先這裡對入參 long handle做一個詳解:
如果需要分配的記憶體大於pageSize,則返回的高32為為0,低32位為memoryMap的下標id。具體參照下文的allocateRun方法講解。
如果需要分配的記憶體小於pageSize,則返回的高32為為bitmapIdx,低32位同樣表示memoryMap的id。
程式碼@1,從long中獲取memoryMapIdx
程式碼@2,從long中獲取bitmapIdx。
程式碼@3,如果bitmapIdx為0,偏移量就是PoolSubpage的偏移量。為什麼呢?bitmapIdx為0在記憶體申請大於pageSize和小於pageSize時都會出現:
如果申請的記憶體數小於pageSize,bitmapIdx表示該pageSubpage是第一次分配。
如果申請的記憶體數大於pageSize,表明該節點所以子節點都是第一次被分片。故偏移量就是memory[id]所代表的偏移量。
程式碼@4,如果bitmapIdx不為0,需要計算偏移量,與PoolSubpage的elemSize相關,具體請看2)PoolChunk的initBufWithSubpage方法,該方法,最終還是要呼叫PooledByteBuf的init方法,分配記憶體,這裡只是需要計算偏遠量。
1)PooledByteBuf的 int方法。
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk;
this.handle = handle;
memory = chunk.memory; // @1
this.offset = offset; // @2
this.length = length; // @3
this.maxLength = maxLength; // @4
setIndex(0, 0); //@5
tmpNioBuf = null;
initThread = Thread.currentThread();//@6
}
程式碼@1,PooledByteBuf的記憶體,直接指向PoolChunk的memory;
程式碼@2,offset,在memory的起始偏移量。
程式碼@3,memory的offset + length之間的記憶體被該PooledByteBuf佔用,其他ByteBuf無法使用。
程式碼@4,maxLength的作用是什麼呢?目前我的理解是,PooledByteBuf自動擴容時,只要最終長度不超過maxLength,就可以在當前的記憶體中完成,不需要去申請新的空間,再進行記憶體負責。
程式碼@5,初始化readIndex,writeIndex。
程式碼@6,記錄該PooledByteBuf的初始化執行緒,方便本地執行緒池的使用。
2) PoolChunk的iinitBufWithSubpage方法詳解
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = (int) handle;
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
}
2.3.2 PoolChunk allocate關於程式碼@allocateRun,超過pageSize記憶體分配原始碼分析/**
* Allocate a run of pages (>=1)
*
* @param normCapacity normalized capacity
* @return index in memoryMap
*/
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts); //@1
int id = allocateNode(d); //@2
if (id < 0) {
return id;
}
freeBytes -= runLength(id); //@3
return id;
}
該方法的實現,就是要在平衡二叉樹中要找到一個可以滿足normCapacity的節點,從前文的介紹中得知,memoryMap[id]存放的是,該id能分配的最小深度代表的容量。超過pageSize的記憶體節點,肯定不是在葉子節點。如果讓我實現查詢合適id的演算法,我想應該是這樣的:
1、算成需要分配的記憶體大小是 pageSize的倍數,再直觀點就是normalCapacity 是 pageSize 的倍數n,然後算成n是2的多少次冪,比如pageSize=4,而normalCapacity是16,得出的倍數是4,4是2的2次冪,最終要得到這個值。標記為r
2、然後從平衡二叉樹,沿著最底層(高度為0,深度為maxOrder),向上找r級,即能知道這一層次的節點可以容納下normaCapacity的節點。
程式碼@1,先算出normCapacity,2的對(2的冪)然後減去pageSize(2的冪),得出思路1的r的值,然後用maxOrder-r,就表示深度最大為maxOrder-r。
程式碼@2,沿著平衡二叉樹,從根節點開始,尋找一個合適的節點。如果無法分配,返回-1。
allocateNode(id)在上文中已經講解,為了增深映像,在這裡將程式碼再次貼上:
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable
return -1;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // mark as unusable
updateParentsAlloc(id);
return id;
}
2.2.2.5 關於PoolArena 2.2.2 allocate方法的程式碼@5allocateHuge方法詳解在netty記憶體管理中,如果超過chunkSize的記憶體,為大記憶體,不重複使用。
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
}
PoolArena.HeapArena:
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, new byte[capacity], capacity);
}
PoolChunk非池管理的PoolChunk:
/** Creates a special chunk that is not pooled. */
PoolChunk(PoolArena<T> arena, T memory, int size) {
unpooled = true;
this.arena = arena;
this.memory = memory;
memoryMap = null;
depthMap = null;
subpages = null;
subpageOverflowMask = 0;
pageSize = 0;
pageShifts = 0;
maxOrder = 0;
unusable = (byte) (maxOrder + 1);
chunkSize = size;
log2ChunkSize = log2(chunkSize);
maxSubpageAllocs = 0;
}
大記憶體的PoolChunk,不快取使用,故內部不會再細化為PoolSubpage等資料結構。
void initUnpooled(PoolChunk<T> chunk, int length) {
assert chunk != null;
this.chunk = chunk;
handle = 0;
memory = chunk.memory;
offset = 0;
this.length = maxLength = length;
setIndex(0, 0);
tmpNioBuf = null;
initThread = Thread.currentThread();
}
PooledByteBuf來初始化是,lengt,maxLenth等於需要申請的記憶體。看過tiny,small記憶體的分配後,大記憶體的分配比較簡單,就不做每行程式碼的解讀。不容易呀,Netty記憶體的分配就講解完畢了。接下來注重分析兩個方面:記憶體釋放與PooledByteBuf動態擴容。
3、原始碼分析Netty記憶體釋放
在講解Netty記憶體釋放之前,我們還是簡單的再回顧一下ReferenceCounted,Netty的ByteBuf繼承該介面,這也表明,Netty內部的記憶體管理是基於引用計數來進行記憶體的回收的。具體的實現是AbstractReferenceCounted。所以,我們在使用PooledByteBuf時,用完後要記得呼叫release方法。線上程本地分配時會再次強調,如果PooledByteBuf在用完後,沒有呼叫realse方法,是無法被執行緒共用的。記憶體的釋放入口,是ByteBuf的relase方法,也就是AbstractReferenceCountedByteBuf的release方法:
public final boolean release(int decrement) {
if (decrement <= 0) {
throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
}
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
該方法的方式是迴圈利用CAS將引用減少,直到引用為0,則呼叫釋放方法,ByteBuf的記憶體釋放,不同子類,不同的實現,故deallocate方法為抽象方法;/**
* Called once {@link #refCnt()} is equals 0.
*/
protected abstract void deallocate();
3.1 PooledByteBuf deallocate方法詳解
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
boolean sameThread = initThread == Thread.currentThread();
initThread = null;
chunk.arena.free(chunk, handle, maxLength, sameThread); //@1
recycle(); //@2
}
}
記憶體釋放的基本實現:首先將PooledByteBuf內部的handle修改為-1,將memory設定為null,,然後釋放佔用的記憶體,然後將該PooledByteBuf放入物件池。從這裡也可以看出,PooledByteBuf ,不僅ByteBuf內部持有的快取區會被回收,連PooledByteBuf這個物件本身,也會放入到物件池,供重複利用(執行緒級)3.1.1 關於PooledByteBuf deallocate 程式碼@1 PoolArena的free方法,記憶體釋放原始碼分析
/**
* @param chunk
* @param handle,記憶體分配,儲存著bitmapIdx,memoryMapIdx
* @param normaCapacity 申請的記憶體(釋放的記憶體,取值為PooledByteBuf的 maxLenth)
* @param 釋放該PooledByteBuf的執行緒釋放時建立該PooledByteBuf的執行緒
*/
void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) {
if (chunk.unpooled) { //@1
destroyChunk(chunk);
} else {
if (sameThreads) { //@2
PoolThreadCache cache = parent.threadCache.get();
if (cache.add(this, chunk, handle, normCapacity)) {
// cached so not free it.
return;
}
}
synchronized (this) { //@3
chunk.parent.free(chunk, handle);
}
}
}
如果是非池的記憶體,也就是大記憶體,直接釋放,如果釋放的執行緒是建立該PooledByteBuf的執行緒,則放入執行緒本地變數中,重複利用,此時不需要釋放。如果不是,則釋放記憶體。
程式碼@1,我們看一下PoolArena.HeapArena和DirectArena的分別實現:
PoolArena.HeapArena
protected void destroyChunk(PoolChunk<byte[]> chunk) {
// Rely on GC.
}
PoolArena.DirectArena:
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
PlatformDependent.freeDirectBuffer(chunk.memory); //釋放堆外記憶體
}
程式碼@2,放入本地執行緒快取中,這個稍後重點關注。
程式碼@3,釋放記憶體,這裡的記憶體釋放與第一步的記憶體釋放不同,第一步的記憶體釋放,是直接將記憶體還給JVM堆、或作業系統。而這裡的是在PoolChunk中進行標記與釋放而已。chunk.parent指的是該PoolChunk的PoolChunkList物件。先重點進入到PoolChunk的free方法。
3.1.1.1 關於PoolArena.free方法 程式碼@3的講解,此處主要是呼叫PoolChunk方法進行記憶體的釋放:
/**
* Free a subpage or a run of pages
* When a subpage is freed from PoolSubpage, it might be added back to subpage pool of the owning PoolArena
* If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize, we can
* completely free the owning Page so it is available for subsequent allocations
*
* @param handle handle to free
*/
void free(long handle) {
int memoryMapIdx = (int) handle; //@1
int bitmapIdx = (int) (handle >>> Integer.SIZE); //@2
if (bitmapIdx != 0) { // free a subpage //@3
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
if (subpage.free(bitmapIdx & 0x3FFFFFFF)) {
return;
}
}
freeBytes += runLength(memoryMapIdx); //@4
setValue(memoryMapIdx, depth(memoryMapIdx)); //@5
updateParentsFree(memoryMapIdx); //@6
}
程式碼@1,獲取在memoryMap的下標。
程式碼@2,獲取bitmapIdx
程式碼@3,如果bitmapIdx不等於0,表示該記憶體是從PoolSubpage中直接分配的(記憶體申請時,小於pageSize)。此時記憶體的釋放,直接呼叫PoolSubpage的free方法。
程式碼@4,@5,@6,如果申請的記憶體超過了pageSize,首先將freeBytes增加,然後在程式碼@5,將memoryMapIdx處的值更新為原先depth的值,代表可以重新分片深度為depth[id]的記憶體了,級聯更新memoryMap,的父節點的釋放狀態。
3.1.1.1.3,關於 PoolArena.free方法,程式碼@3, PoolSubpage.free方法詳解。
/**
* @return {@code true} if this subpage is in use.
* {@code false} if this subpage is not used by its chunk and thus it's OK to be released.
*/
boolean free(int bitmapIdx) {
if (elemSize == 0) {
return true;
}
int q = bitmapIdx >>> 6; //@1
int r = bitmapIdx & 63; //@2
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r; //@3
setNextAvail(bitmapIdx); //@4
if (numAvail ++ == 0) { //@5 start
addToPool();
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {
// Subpage not in use (numAvail == maxNumElems)
if (prev == next) {
// Do not remove if this subpage is the only one left in the pool.
return true;
}
// Remove this subpage from the pool if there are other subpages left in the pool.
doNotDestroy = false;
removeFromPool();
return false;
} //@5 end
}
程式碼@1,求出bitmapIdx在bitmap陣列中的下標。
程式碼@3,就是bitmapIdx在bitmap[q]下所對應的高位1,設定為0,表示可以再次被用來分配。
程式碼@4,將bitmapIdx設定為下一個可以用nextAvail,這時在nextAvail>0,分配時候就會直接先用釋放的,保證記憶體的連續性。
程式碼@5,如果PoolSubpage有剩餘空間後,加入到PoolArena的tinyPoolSubpages[]或smallPoolSubpages[]中。如果空間全部分配後,或一個都沒分配,從PoolArena的tinyPoolSubpages[],smallPoolSubpages[]中移除。
3.1.1.6關於PoolArena.free方法,程式碼@6,級聯更新釋放狀態詳解/**
* Update method used by free
* This needs to handle the special case when both children are completely free
* in which case parent be directly allocated on request of size = child-size * 2
*
* @param id id
*/
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
if (val1 == logChild && val2 == logChild) {
setValue(parentId, (byte) (logChild - 1));
} else {
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}
id = parentId;
}
}
這個演算法在理解了上文的updateParentsAllocator()方法後,不難理解,只要我們明確一點,memoryMap[id]=order,表示id代表的節點,可以勝任大於等於order深度的記憶體分配需要。
4、PooledByteBuf記憶體擴容
在講解這個問題的時候,不知道大家有沒有注意到PooledByteBuf的maxLength屬性?該屬性有和作用,請看下文分解。
PooledByteBuf擴容演算法,請看capacity(int newCapacity);@Override
public final ByteBuf capacity(int newCapacity) {
ensureAccessible();
// If the request capacity does not require reallocation, just update the length of the memory.
if (chunk.unpooled) { //@1
if (newCapacity == length) {
return this;
}
} else {
if (newCapacity > length) { //@2
if (newCapacity <= maxLength) {
length = newCapacity;
return this;
}
} else if (newCapacity < length) { //@3
if (newCapacity > maxLength >>> 1) {
if (maxLength <= 512) {
if (newCapacity > maxLength - 16) {
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
} else { // > 512 (i.e. >= 1024)
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
} else {
return this;
}
}
// Reallocation required.
chunk.arena.reallocate(this, newCapacity, true); //@4
return this;
}
程式碼@1,如果是非池化的,並且新的容量等於length的話,可以直接返回,否則需要重新去申請記憶體。
程式碼@2,如果需要的記憶體大於length,此時newCapacity小於等於maxLength,則不需要擴容,如果需要的大小超過maxLength,則需要重新去申請記憶體。那這裡的maxLength是什麼呢?其實,如果PooledByteBuf的記憶體是在PoolSubpage中分配,那maxLength為PooledSubpage的記憶體區域中的總容量,PoolSubpage的MemoryRegion的總大小。也就是在是同一個時刻,一個PoolSubpage的MemoryRegion只能被一個PooledByteBuf所佔用。
程式碼@3,如果需要申請的記憶體小於length,為了避免下一次需要擴容,再進一步判斷,如果是tiny記憶體的話,判斷newCapaciy與 maxLength-16的關係,如果不大於的話,本次也進行重新分配記憶體,然後維護PooledByteBuf的length,readerIndex,writerIndex。
程式碼@4,記憶體的重新分配 reallocate方法;
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
PoolChunk<T> oldChunk = buf.chunk;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
allocate(parent.threadCache.get(), buf, newCapacity); //@1
if (newCapacity > oldCapacity) { //@2 start
memoryCopy(
oldMemory, oldOffset,
buf.memory, buf.offset, oldCapacity);
} else if (newCapacity < oldCapacity) {
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex = newCapacity;
}
memoryCopy(
oldMemory, oldOffset + readerIndex,
buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
} else {
readerIndex = writerIndex = newCapacity;
}
} // @2end
buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread()); //@3
}
}
記憶體重新分配的演算法如下:
首先重新申請記憶體,然後進行記憶體複製,最後釋放原先的記憶體。
PoolArena.HeapArena的memoryCopy實現程式碼:protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
if (length == 0) {
return;
}
System.arraycopy(src, srcOffset, dst, dstOffset, length);
}
總結: Netty記憶體的組織,主要由PoolArena、PoolChunk、PoolSubpage,當然再加上執行緒本地分配(下個專題單獨分析),在這裡真正持有記憶體的單元是PoolChunk,俗稱塊記憶體,PoolChunk持有2的maxOrder次冪的PoolSubpage,PoolSubpage維護一個相對於PoolChunk的記憶體偏移量,每個PoolSubpage又由大小相等的記憶體區域構成,命名為MomeryRegion,為了方便快速的位運算,記憶體的大小,pageSize等大小,都是2的冪。 Netty的另一層次的程式碼組織,我把它稱為執行時記憶體組織也是挺富有想象力的,PoolArena按照PoolChunk的內部的使用率區間,維護qint、q000、q025、q050、q075、q100的PoolChunk連結串列,並且qint,q000也是一個連結串列,這樣動態跟蹤了PoolChunk的使用,通樣的道理,PoolArena內部通用按照PoolSubpage,按照MeomryRegion的大小,又維護一個一個的佇列,PoolSubpage[] tinySubpagePools[],smallSubpagePools[]。 介紹這些資料儲存後,重點深入原始碼,分析了記憶體的分配,釋放,PooledByteBuf的動態擴容。下一個研究專題,Netty本地執行緒分配與PooledByteBuf本地物件池機制。