1. 程式人生 > >netty原始碼解解析(4.0)-24 ByteBuf基於記憶體池的記憶體管理

netty原始碼解解析(4.0)-24 ByteBuf基於記憶體池的記憶體管理

 io.netty.buffer.PooledByteBuf<T>使用記憶體池中的一塊記憶體作為自己的資料記憶體,這個塊記憶體是PoolChunk<T>的一部分。PooledByteBuf<T>是一個抽象型別,它有4個派生類:

  • PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆記憶體的PooledByteBuffer<byte[]>。
  • PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接記憶體的PooledByteBuf<ByteBuffer>。

初始化

  PooledByteBuf的初始化過程分為兩個步驟:建立例項;初始化記憶體。這兩個步驟的程式碼如下:

    protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) {
        super(maxCapacity);
        this.recyclerHandle = recyclerHandle;
    }

    void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        init0(chunk, handle, offset, length, maxLength, cache);
    }

    private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;

        this.chunk = chunk;
        memory = chunk.memory;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle;
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;
    }

  建立例項時呼叫的構造方法只是為maxCapacity和recyclerHandler屬性賦值,構造方法是protected,不打算暴露到外面。派生類都提供了newInstance方法建立例項,以PooledHeapByteBuf為例,它的newInstance方法實現如下:

 1     private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
 2         @Override
 3         protected PooledHeapByteBuf newObject(Handle handle) {
 4             return new PooledHeapByteBuf(handle, 0);
 5         }
 6     };
 7 
 8     static PooledHeapByteBuf newInstance(int maxCapacity) {
 9         PooledHeapByteBuf buf = RECYCLER.get();
10         buf.reuse(maxCapacity);
11         return buf;
12     }

  這裡的newInstance使用RECYCLER建立例項物件。Recycler<T>是一個輕量級的,支援迴圈使用的物件池。當物件池中沒有可用物件時,會在第4行使用構造方法建立一個新的物件。

 

  init呼叫init0初始化資料記憶體,init0方法為幾個記憶體相關的關鍵屬性賦值:

  • chunk:  PoolChunk<T>物件,這個PooledByteBuf使用的記憶體就是它的一部分。
  • memory: 記憶體物件。更準確地說,PooledByteBuf使用的記憶體是它的一部分。
  • allocator: 建立這個PooledByteBuf的PooledByteBufAllocator物件。
  • cache:  執行緒專用的記憶體快取。分配記憶體時會優先從這個快取中尋找合適的記憶體塊。
  • handle:  記憶體在chunk中node的控制代碼。chunk使用handle可以計算出它對應記憶體的起始位置offset。
  • offset:  分配記憶體的起始位置。
  • length: 分配記憶體的長度,也是這個PooledByteBuf的capacity。
  • maxLength: 這塊記憶體node的最大長度。當呼叫capacity(int newCapacity)方法增加capacity時,只要newCapacity不大於這個值,就不用從新分配記憶體。

  記憶體初始化完成之後,這個PooledByteBuf可使用的記憶體範圍是memory記憶體中[offset, offset+length)。idx方法可以把ByteBuf的索引轉換成memory的索引:

1     protected final int idx(int index) {
2         return offset + index;
3     }

 

重新分配記憶體

  和前面講過的ByteBuf實現一樣,PooledByteBuf也需要使用capacity(int newCapacity)改變記憶體大小,也會涉及到把資料從舊記憶體中複製到新記憶體的問題。也就是說,要解決的問題是一樣的,只是具體實現的差異。

 1     @Override
 2     public final ByteBuf capacity(int newCapacity) {
 3         checkNewCapacity(newCapacity);
 4 
 5         // If the request capacity does not require reallocation, just update the length of the memory.
 6         if (chunk.unpooled) {
 7             if (newCapacity == length) {
 8                 return this;
 9             }
10         } else {
11             if (newCapacity > length) {
12                 if (newCapacity <= maxLength) {
13                     length = newCapacity;
14                     return this;
15                 }
16             } else if (newCapacity < length) {
17                 if (newCapacity > maxLength >>> 1) {
18                     if (maxLength <= 512) {
19                         if (newCapacity > maxLength - 16) {
20                             length = newCapacity;
21                             setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
22                             return this;
23                         }
24                     } else { // > 512 (i.e. >= 1024)
25                         length = newCapacity;
26                         setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
27                         return this;
28                     }
29                 }
30             } else {
31                 return this;
32             }
33         }
34 
35         // Reallocation required.
36         chunk.arena.reallocate(this, newCapacity, true);
37         return this;
38     }

  這個方法處理兩大類情況: 不重新分配記憶體;重新分配記憶體並複製ByteBuf中的資料和狀態。

  不重新分配記憶體: 

  8行: chunk不需要回收到記憶體池中,且newCapacity沒有變化。

  11-32行: chunk需要回收到記憶體池中。

    13-14行:記憶體增大,且newcapacity不大於maxLength。把容量修改成newCapacity即可。

    20-22行: 記憶體減小,  newCapacity 大於maxLength的一半,maxLength<=512, newCapacity >maxLength - 16。 把容量修改成newCapacity, 調整readerIndex, writerIndex。

    25-27行: 記憶體減小,newCapacity大於maxLength的一半,  maxLength > 512。把容量修改成newCapacity, 調整readerIndex, writerIndex。

    31行: 記憶體不變,不做任何操作。

  需要重新分配記憶體:

  36行: 任何不滿足以上情況的都要重新分配記憶體。這裡使用Arena的reallocate方法重新分配記憶體,並把舊記憶體釋放調,程式碼如下:

 1     //io.netty.buffer.PoolArena#reallocate, 
 2     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
 3         if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
 4             throw new IllegalArgumentException("newCapacity: " + newCapacity);
 5         }
 6 
 7         int oldCapacity = buf.length;
 8         if (oldCapacity == newCapacity) {
 9             return;
10         }
11 
12         PoolChunk<T> oldChunk = buf.chunk;
13         long oldHandle = buf.handle;
14         T oldMemory = buf.memory;
15         int oldOffset = buf.offset;
16         int oldMaxLength = buf.maxLength;
17         int readerIndex = buf.readerIndex();
18         int writerIndex = buf.writerIndex();
19 
20         allocate(parent.threadCache(), buf, newCapacity);
21         if (newCapacity > oldCapacity) {
22             memoryCopy(
23                     oldMemory, oldOffset,
24                     buf.memory, buf.offset, oldCapacity);
25         } else if (newCapacity < oldCapacity) {
26             if (readerIndex < newCapacity) {
27                 if (writerIndex > newCapacity) {
28                     writerIndex = newCapacity;
29                 }
30                 memoryCopy(
31                         oldMemory, oldOffset + readerIndex,
32                         buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
33             } else {
34                 readerIndex = writerIndex = newCapacity;
35             }
36         }
37 
38         buf.setIndex(readerIndex, writerIndex);
39 
40         if (freeOldMemory) {
41             free(oldChunk, oldHandle, oldMaxLength, buf.cache);
42         }
43     }

  7-9行: 記憶體大小沒變化,返回。

  12-18行: 記錄下舊記憶體的資訊,readerIndex, writerIndex。

  20行: 為PooledByteBuf分配新記憶體。

  21-38行: 把舊記憶體中資料複製到新記憶體,並把readerIndex,writerIndex設定在正確。

  41行: 釋放就記憶體。

 

釋放記憶體

  記憶體釋放程式碼在deallocate中:

 1     @Override
 2     protected final void deallocate() {
 3         if (handle >= 0) {
 4             final long handle = this.handle;
 5             this.handle = -1;
 6             memory = null;
 7             tmpNioBuf = null;
 8             chunk.arena.free(chunk, handle, maxLength, cache);
 9             chunk = null;
10             recycle();
11         }
12     }

  關鍵是第8行程式碼,使用PoolArena的free方法釋放記憶體。然後是recycle把當前PooledByteBuf物件放到RECYCLER中迴圈使用。

 

PooledByteBufAllocator建立記憶體管理模組

  在前面分析PooledByteBuf記憶體初始化,重新分配及釋放時,看到了記憶體管理的三個核心模組: PoolArena(chunk.arena),  PoolChunk(chunk),  PoolThreadCache(cache)。PooledByteBuf的記憶體管理能力都是使用這三模組實現的,它本身沒有實現記憶體管理演算法。當需要為PooledByteBuf分配一塊記憶體時,先從一個執行緒專用的PoolThreadCache中得到一個PoolArena,  使用PoolArena的allocate找到一個滿足要求記憶體塊PoolChunk,  從這個記憶體塊中分配一塊連續的記憶體handle,計算出這塊記憶體起始位置的偏移量offset, 最後呼叫PooledByteBuf的init方法初始化記憶體完成記憶體分配。 釋放記憶體呼叫PoolArena的free方法。在記憶體分配時,會優先從PoolThreadCache中尋找合適的記憶體塊。在記憶體釋放時會把記憶體塊暫時放在PoolThreadCache中,等使用頻率過低時才會還給PoolChunk。這三個模組中PoolArena,  PoolThreadCache由PooledByteBufAllocator建立,PoolChunk由PoolArean維護。

  PooledByteBufAllocator維護了相關的幾個屬性:

  PoolArena<byte[]>[] heapArenas

  PoolArena<ByteBuffer>[] directArenas

  PoolThreadLocalCache threadCache

  headArenas和directArenas分別維護了多個PoolArena, 他們分別用來分配堆記憶體和直接記憶體。 如果使用得當,可以讓每個執行緒持有一個專用的PoolArena,  避免執行緒間資料同步的開銷。PoolThreadLocalCache會為每個執行緒建立一個專用的PoolThreadCache例項,這個例項分別持有一個heapArena和directArena。

  接下來的的幾個章節會詳細分析PoolArena和PoolThreadCache的實現程式碼。

&n