1. 程式人生 > >Netty 零拷貝(三)Netty 對零拷貝的改進

Netty 零拷貝(三)Netty 對零拷貝的改進

Netty 零拷貝(三)Netty 對零拷貝的改進

Netty 系列目錄 (https://www.cnblogs.com/binarylei/p/10117436.html)

Netty 的“零拷貝”主要體現以下幾個方面:

  1. Netty 的接收和傳送 ByteBuffer 採用 DIRECT BUFFERS,使用堆外直接記憶體進行 Socket 讀寫,不需要進行位元組緩衝區的二次拷貝。如果使用傳統的堆記憶體(HEAP BUFFERS)進行 Socket 讀寫,JVM 會將堆記憶體 Buffer 拷貝一份到直接記憶體中,然後才寫入 Socket 中。相比於堆外直接記憶體,訊息在傳送過程中多了一次緩衝區的記憶體拷貝。

  2. Netty 提供了 CompositeByteBuf 類,它可以將多個 ByteBuf 合併為一個邏輯上的 ByteBuf,避免了傳統通過記憶體拷貝的方式將幾個小 Buffer 合併成一個大的 Buffer。

  3. 通過 FileRegion 包裝的 FileChannel.tranferTo 方法 實現檔案傳輸,可以直接將檔案緩衝區的資料傳送到目標 Channel,避免了傳統通過迴圈 write 方式導致的記憶體拷貝問題。

  4. 通過 wrap 操作,我們可以將 byte[] 陣列、ByteBuf、ByteBuffer 等包裝成一個 Netty ByteBuf 物件,進而避免了拷貝操作

一、直接緩衝區的應用

Netty 中使用直接緩衝區來讀寫資料,首先看一下 read 方法中緩衝區的建立方法。

// AbstractNioByteChannel.NioByteUnsafe#read
public final void read() {
    final ChannelConfig config = config();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    ByteBuf byteBuf = allocHandle.allocate(allocator);
    // 省略...
}

Netty 中接收緩衝區 ByteBuffer 由 ChannelConfig 分配,而 ChannelConfig 建立 ByteBufAllocator 預設使用 Direct Buffer,這就避免了讀寫資料的二次記憶體拷貝問題,從而實現了讀寫 Socket 的零拷貝功能。

// ByteBufAllocator 用於分配緩衝區
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

// RecvByteBufAllocator 可伸縮的緩衝區,根據每次接收的資料大小自動分配緩衝區大小
private volatile RecvByteBufAllocator rcvBufAllocator;
public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

AdaptiveRecvByteBufAllocator 繼承自 DefaultMaxMessagesRecvByteBufAllocator,在 allocate 分配緩衝區。

// DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.allocate
public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

// AbstractByteBufAllocator
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

PlatformDependent.hasUnsafe() 通過判斷能否載入到 sun.misc.Unsafe 類就使用直接緩衝區,正常情況下返回 true

private static final boolean HAS_UNSAFE = hasUnsafe0();
public static boolean hasUnsafe() {
    return HAS_UNSAFE;
}

// 預設如果能通過反射獲取 sun.misc.Unsafe 例項則使用直接緩衝區,因為直接緩衝區底層就是使用 Unsafe 類
private static boolean hasUnsafe0() {
    // 1. 載入 android.app.Application 類則返回 true
    if (isAndroid()) {
        return false;
    }
    
    // 2. -Dio.netty.noUnsafe=true
    if (PlatformDependent0.isExplicitNoUnsafe()) {
        return false;
    }

    // 3. 通過反射獲取 sun.misc.Unsafe 類。Unsafe.class.getDeclaredField("theUnsafe")
    try {
        boolean hasUnsafe = PlatformDependent0.hasUnsafe();
        logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
        return hasUnsafe;
    } catch (Throwable t) {
        logger.trace("Could not determine if Unsafe is available", t);
        return false;
    }
}

我們再分析一直 ByteBufAllocator allocator = ByteBufAllocator.DEFAULT 中的預設 ByteBufAllocator。

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

// ByteBufUtil,主要判斷是否將記憶體池化,因為直接緩衝區的分配和銷燬開銷比較大
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static {
    String allocType = SystemPropertyUtil.get(
            "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocator alloc;
    if ("unpooled".equals(allocType)) {
        alloc = UnpooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else if ("pooled".equals(allocType)) {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
    }

    DEFAULT_ALLOCATOR = alloc;
}

// 除了 HAS_UNSAFE 外還需要判斷 io.netty.noPreferDirect 屬性
public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());            
public static final UnpooledByteBufAllocator DEFAULT =
        new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
public static boolean directBufferPreferred() {
    return DIRECT_BUFFER_PREFERRED;
}
private static final boolean DIRECT_BUFFER_PREFERRED =
        HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);

二、CompositeByteBuf

//定義兩個ByteBuf型別的 body 和 header 
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true, header, body);

addComponents 方法將 header 與 body 合併為一個邏輯上的 ByteBuf, 這兩個 ByteBuf 在 CompositeByteBuf 內部都是單獨存在的, CompositeByteBuf 只是邏輯上是一個整體。

三、通過 FileRegion 實現零拷貝

利用 nio 提供的 transferTo 實現零拷貝。

srcFileChannel.transferTo(position, count, destFileChannel);

四、通過 wrap / slice 實現零拷貝

// wrappedBuffer 和 slice 都是共享同一記憶體,並沒有拷貝
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);

每天用心記錄一點點。內容也許不重要,但習慣很重要!