1. 程式人生 > >Netty原始碼分析之ByteBuf(二)—記憶體分配器ByteBufAllocator

Netty原始碼分析之ByteBuf(二)—記憶體分配器ByteBufAllocator

Netty中的記憶體分配是基於ByteBufAllocator這個介面實現的,通過對它的具體實現,可以用來分配我們之前描述過的任意型別的BytebBuf例項;我們先看一下ByteBufAllocator介面中的定義的關鍵方法

一、ByteBufAllocator 構造

public interface ByteBufAllocator {

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

    //根據具體實現返回基於直接記憶體或堆內記憶體的ByteBuf
    ByteBuf buffer();

    //根據具體實現返回一個給定初始容量的基於直接記憶體或堆內記憶體的ByteBuf
    ByteBuf buffer(int initialCapacity);

    //根據具體實現返回一個給定初始容量與最大量的基於直接記憶體或堆內記憶體的ByteBuf
    ByteBuf buffer(int initialCapacity, int maxCapacity);

    //返回一個用於套接字操作的ByteBuf
    ByteBuf ioBuffer();

    //返回一個用於套接字操作的給定初始量ByteBuf
    ByteBuf ioBuffer(int initialCapacity);

    //返回一個用於套接字操作的給定初始量與最大量的ByteBuf
    ByteBuf ioBuffer(int initialCapacity, int maxCapacity);

    //返回一個基於堆內記憶體的ByteBuf
    ByteBuf heapBuffer();

    //返回一個給定初始量基於堆內記憶體的ByteBuf
    ByteBuf heapBuffer(int initialCapacity);

    //返回一個給定初始量與最大量的基於堆內記憶體的ByteBuf
    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);

    //返回一個基於直接記憶體的ByteBuf
    ByteBuf directBuffer();

    //返回一個給定初始量基於直接記憶體的ByteBuf
    ByteBuf directBuffer(int initialCapacity);

    //返回一個給定初始量與最大量的基於直接記憶體的ByteBuf
    ByteBuf directBuffer(int initialCapacity, int maxCapacity);

    //返回一個基於複合緩衝區的ByteBuf,基於堆還是直接記憶體看具體實現
    CompositeByteBuf compositeBuffer();

    //返回一個給定最大量的基於複合緩衝區的ByteBuf,基於堆還是直接記憶體看具體實現
    CompositeByteBuf compositeBuffer(int maxNumComponents);

    //返回一個基於堆記憶體的CompositeByteBuf
    CompositeByteBuf compositeHeapBuffer();

    //返回一個給定最大量基於堆記憶體的CompositeByteBuf
    CompositeByteBuf compositeHeapBuffer(int maxNumComponents);

    //返回一個基於直接記憶體的CompositeByteBuf
    CompositeByteBuf compositeDirectBuffer();

    //返回一個給定最大量的基於直接記憶體的CompositeByteBuf
    CompositeByteBuf compositeDirectBuffer(int maxNumComponents);

    //直接記憶體是否池化管理
    boolean isDirectBufferPooled();

    //計算bytebuf需要擴充套件時的新容量
    int calculateNewCapacity(int minNewCapacity, int maxCapacity);
 }

可以看到介面中定義的方法基本都是用於分配不同型別的記憶體,接下來我們看下基於ByteBufAllocator 介面的具體實現類。

 看下實現ByteBufAllocator 介面的類結構圖

 

 頂層抽象類AbstractByteBufAllocator下有兩大子類PooledByteBufAllocator與UnpooledByteBufAllocator,分別用於池化與非池化記憶體的構造;

二、ByteBufAllocator 使用

首先我們需要注意下ChannelOption.ALLOCATOR這個配置項,如果不進行特殊配置, 預設為PooledByteBufAllocator,預設ByteBuf型別為PooledUnsafeDirectByteBuf,這裡演示需要改為UnpooledByteBufAllocator

b.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);

在資料入站Handler中,我們通過使用不同的BufAllocator實現類來分配ByteBuf進行對比,主要關注下分配的ByteBuf的型別與是否池化

public class BuffHandler extends ChannelInboundHandlerAdapter{
    
    PooledByteBufAllocator pdallocator = new PooledByteBufAllocator(true);//池化直接記憶體
    
    PooledByteBufAllocator pallocator = new PooledByteBufAllocator(false);//池化堆記憶體
    
    UnpooledByteBufAllocator adllocator = new UnpooledByteBufAllocator(true);//非池化直接記憶體
    
    UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);//非池化堆記憶體
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf outBuffer = (ByteBuf) msg;
        System.err.println("outBuffer is :"+outBuffer.getClass());
        System.err.println("outBuffer's model is :"+outBuffer.isDirect());
        outBuffer = ByteBufAllocator.DEFAULT.buffer();//ByteBufAllocator預設記憶體型別
        System.err.println("ByteBufAllocator.DEFAULT.buffer() is :"+outBuffer.getClass());
        System.err.println("ByteBufAllocator.DEFAULT.buffer()'s model is :"+outBuffer.isDirect());
        outBuffer = pdallocator.buffer();////池化直接記憶體
        System.err.println("PooledByteBufAllocator(true) is :"+outBuffer.getClass());
        System.err.println("PooledByteBufAllocator(true)'s model is :"+outBuffer.isDirect());
        outBuffer = pallocator.buffer();//池化隊堆記憶體
        System.err.println("PooledByteBufAllocator(false) is :"+outBuffer.getClass());
        System.err.println("PooledByteBufAllocator(false)'s model is :"+outBuffer.isDirect());
        outBuffer = adllocator.buffer();////非池化直接記憶體
        System.err.println("UnpooledByteBufAllocator(true) is :"+outBuffer.getClass());
        System.err.println("UnpooledByteBufAllocator(true)'s  model is :"+outBuffer.isDirect());
        outBuffer = allocator.buffer();//非池化堆記憶體
        System.err.println("UnpooledByteBufAllocator(false) is :"+outBuffer.getClass());
        System.err.println("UnpooledByteBufAllocator(false)'s  model is :"+outBuffer.isDirect());
        byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e};
        outBuffer.writeBytes(sendBytes);
        ctx.writeAndFlush(outBuffer);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

根據BufAllocator具體實現類與preferDirect引數會分配不同型別的ByteBuf,輸出結果如下:

outBuffer is :class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
outBuffer's model is :true
ByteBufAllocator.DEFAULT.buffer() is :class io.netty.buffer.PooledUnsafeDirectByteBuf
ByteBufAllocator.DEFAULT.buffer()'s model is :true
PooledByteBufAllocator(true) is :class io.netty.buffer.PooledUnsafeDirectByteBuf
PooledByteBufAllocator(true)'s model is :true
PooledByteBufAllocator(false) is :class io.netty.buffer.PooledUnsafeHeapByteBuf
PooledByteBufAllocator(false)'s model is :false
UnpooledByteBufAllocator(true) is :class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
UnpooledByteBufAllocator(true)'s  model is :true
UnpooledByteBufAllocator(false) is :class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf
UnpooledByteBufAllocator(false)'s  model is :false

 上面示例中,BufAllocator具體實現基本可以分為以下四種,下面我們對四種分配模式的具體實現進行下追蹤與分析

new PooledByteBufAllocator(true);//池化直接記憶體
new PooledByteBufAllocator(false);//池化堆記憶體
new UnpooledByteBufAllocator(true);//非池化直接記憶體
new UnpooledByteBufAllocator(false);//非池化堆記憶體

三、ByteBufAllocator 實現

ByteBufAllocator.DEFAULT.buffer()

這裡使用了 ByteBufUtil   DEFAULT_ALLOCATOR,我們進入ByteBufUtil類內部看下具體實現

    static final ByteBufAllocator DEFAULT_ALLOCATOR;

    static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();//讀取io.netty.allocator.type配置

        //根據配置型別例項化不同型別的BufAllocator實現類
        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            //  DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
            alloc = UnpooledByteBufAllocator.DEFAULT; 
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            // DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = alloc;

        THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);
        logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);

        MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);
        logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
    }

ByteBufUtil 的靜態構造中會根據io.netty.allocator.type配置的不同例項化不同型別的BufAllocator實現類,下面我們看下BufAllocator的兩個具體實現類PooledByteBufAllocator與UnpooledByteBufAllocator

 PooledByteBufAllocator

在PooledByteBufAllocator建構函式中,首先會進行記憶體池的詳細配置

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        super(preferDirect);
        //宣告一個PoolThreadLocalCache用於記憶體申請
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        checkPositiveOrZero(nHeapArena, "nHeapArena");
        checkPositiveOrZero(nDirectArena, "nDirectArena");

        checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
        if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
            throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
        }

        if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: power of two)");
        }

        int pageShifts = validateAndCalculatePageShifts(pageSize);

        if (nHeapArena > 0) {
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }

        if (nDirectArena > 0) {
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        metric = new PooledByteBufAllocatorMetric(this);
    }

 緊接著看下PooledByteBufAllocator具體的記憶體分配方法

newDirectBuffer 分配直接記憶體 
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;//具體的記憶體分配類PoolArena

        final ByteBuf buf;
        if (directArena != null) {//PoolArena就是Netty的記憶體池實現類。實現具體記憶體分配
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
heapBuffer 分配堆內記憶體 
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<byte[]> heapArena = cache.heapArena;

        final ByteBuf buf;
        if (heapArena != null) {  //通過PoolArena分配堆內記憶體
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

 UnpooledByteBufAllocator

 由於採用非池化管理,UnpooledByteBufAllocator建構函式中需要指定記憶體清理策略

    public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) {
        super(preferDirect);
        this.disableLeakDetector = disableLeakDetector;
        //記憶體清理策略,預設noCleaner需要使用 unSafe 的 freeMemory 方法釋放記憶體
        //noCleaner 使用 unSafe.freeMemory(address);
        //hasCleaner 使用 DirectByteBuffer 的 Cleaner 的 clean 方法。
        noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe()
                && PlatformDependent.hasDirectBufferNoCleanerConstructor();
    }

 緊接著看下UnpooledByteBufAllocator具體的記憶體分配方法

newDirectBuffer 分配直接記憶體 
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        final ByteBuf buf;
        if (PlatformDependent.hasUnsafe()) {//判斷是否適用Unsafe操作
            buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        return disableLeakDetector ? buf : toLeakAwareBuffer(buf);//是否進行堆外記憶體洩露監控
    }

newHeapBuffer 分配堆內記憶體 
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        return PlatformDependent.hasUnsafe() ?
                new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

 四、總結

通過以上內容我們梳理了Netty中BufAllocator的具體實現及分配記憶體的型別,從記憶體管理模式上分為池化與非池化,從記憶體分配型別上分為直接記憶體與堆內記憶體,本文我們只是初步對其進行了總結,Netty分配記憶體的具體實現及精巧設計都還未涉及,後續我們會繼續對其進行進一步的探究,希望本文對大家能有所幫助,其中如有不足與不正確的地方還望指出與海涵。

 

   關注微信公眾號,檢視更多技術文章。