1. 程式人生 > >Netty原始碼分析第5章(ByteBuf)---->第10節: SocketChannel讀取資料過程

Netty原始碼分析第5章(ByteBuf)---->第10節: SocketChannel讀取資料過程

 

Netty原始碼分析第五章: ByteBuf

 

第十節: SocketChannel讀取資料過程

我們第三章分析過客戶端接入的流程, 這一小節帶大家剖析客戶端傳送資料, Server讀取資料的流程:

首先溫馨提示, 這一小節高度耦合第三章的第1, 2節的內容, 很多知識這裡並不會重複講解, 如果對之前的知識印象不深刻建議惡補第三章的第1, 2節的內容之後再學習這一小節

我們首先看NioEventLoop的processSelectedKey方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    
//獲取到channel中的unsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //如果這個key不是合法的, 說明這個channel可能有問題 if (!k.isValid()) { //程式碼省略 } try { //如果是合法的, 拿到key的io事件 int readyOps = k.readyOps(); //連結事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //寫事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //讀事件和接受連結事件 //如果當前NioEventLoop是work執行緒的話, 這裡就是op_read事件
//如果是當前NioEventLoop是boss執行緒的話, 這裡就是op_accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 這裡的判斷表示輪詢到大事件是op_read或者op_accept事件

之前的章節分析過, 如果當前NioEventLoop是work執行緒的話, 那麼這裡就是op_read事件, 也就是讀事件, 表示客戶端發來了資料流

這裡會呼叫unsafe的redis()方法進行讀取

如果是work執行緒, 那麼這裡的channel是NioServerSocketChannel, 其繫結的unsafe是NioByteUnsafe, 這裡會走進NioByteUnsafe的read()方法中:

public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

首先獲取SocketChannel的config, pipeline等相關屬性

 final ByteBufAllocator allocator = config.getAllocator(); 這一步是獲取一個ByteBuf的記憶體分配器, 用於分配ByteBuf

這裡會走到DefaultChannelConfig的getAllocator方法中:

public ByteBufAllocator getAllocator() {
    return allocator;
}

這裡返回的DefualtChannelConfig的成員變數, 我們看這個成員變數:

private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

這裡呼叫ByteBufAllocator的屬性DEFAULT, 跟進去:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

我們看到這裡又呼叫了ByteBufUtil的靜態屬性DEFAULT_ALLOCATOR, 再跟進去:

static final ByteBufAllocator DEFAULT_ALLOCATOR;

DEFAULT_ALLOCATOR這個屬性是在static塊中初始化的

我們跟到static塊中:

static {
    String allocType = SystemPropertyUtil.get(
            "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocator alloc;
    if ("unpooled".equals(allocType)) {
        alloc = UnpooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else if ("pooled".equals(allocType)) {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
    }
    DEFAULT_ALLOCATOR = alloc;
    //程式碼省略
}

首先判斷執行環境是不是安卓, 如果不是安卓, 在返回"pooled"字串儲存在allocType中

然後通過if判斷, 最後區域性變數alloc = PooledByteBufAllocator.DEFAULT, 最後將alloc賦值到成員變數DEFAULT_ALLOCATOR

我們跟到PooledByteBufAllocator的DEFAULT屬性中:

public static final PooledByteBufAllocator DEFAULT =
        new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

我們看到這裡直接通過new的方式, 建立了一個PooledByteBufAllocator物件, 也就是基於申請一塊連續記憶體進行緩衝區分配的緩衝區分配器

緩衝區分配器的知識, 我們之前小節進行了詳細的剖析, 這裡就不再贅述

回到NioByteUnsafe的read()方法中:

public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

這裡 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator

 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()  是建立一個handle, 我們之前的章節講過, handle是對RecvByteBufAllocator進行實際操作的物件

我們跟進recvBufAllocHandle:

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
    //如果不存在, 則建立一個handle的例項
    if (recvHandle == null) {
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

這裡是我們之前剖析過的邏輯, 如果不存在, 則建立handle的例項, 具體建立過程我們可以回顧第三章的第二小節, 這裡就不再贅述

同樣allocHandle.reset(config)是將配置重置, 第三章的第二小節也對其進行過剖析

重置完配置之後, 進行do-while迴圈, 有關迴圈終止條件allocHandle.continueReading(), 之前小節也有過詳細剖析, 這裡也不再贅述

在do-while迴圈中, 首先看 byteBuf = allocHandle.allocate(allocator) 這一步, 這裡傳入了剛才建立的allocate物件, 也就是PooledByteBufAllocator:

這裡會跑到DefaultMaxMessagesRecvByteBufAllocator類的allocate方法中:

public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

這裡的guess方法, 會呼叫AdaptiveRecvByteBufAllocator的guess方法:

public int guess() {
    return nextReceiveBufferSize;
}

這裡會返回AdaptiveRecvByteBufAllocator的成員變數nextReceiveBufferSize, 也就是下次所分配緩衝區的大小, 根據我們之前學習的內容, 第一次分配的時候會分配初始大小, 也就是1024位元組

回到DefaultMaxMessagesRecvByteBufAllocator類的allocate方法中:

這樣, alloc.ioBuffer(guess())就會分配一個PooledByteBuf

我們跟到AbstractByteBufAllocator的ioBuffer方法中:

public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

這裡首先判斷是否能獲取jdk的unsafe物件, 預設為true, 所以會走到directBuffer(initialCapacity)中, 這裡最終會分配一個PooledUnsafeDirectByteBuf物件, 具體分配流程我們再之前小節做過詳細剖析

回到NioByteUnsafe的read()方法中:

分配完了ByteBuf之後, 再看這一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):

首先看引數doReadBytes(byteBuf)方法, 這步是將channel中的資料讀取到我們剛分配的ByteBuf中, 並返回讀取到的位元組數

這裡會呼叫到NioSocketChannel的doReadBytes方法:

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

首先拿到繫結在channel中的handler, 因為我們已經建立了handle, 所以這裡會直接拿到

再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())這步, byteBuf.writableBytes()返回byteBuf的可寫位元組數, 也就是最多能從channel中讀取多少位元組寫到ByteBuf, allocate的attemptedBytesRead會把可寫位元組數設定到DefaultMaxMessagesRecvByteBufAllocator 類的attemptedBytesRead屬性中

跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead我們會看到:

public void attemptedBytesRead(int bytes) {
    attemptedBytesRead = bytes;
}

 

繼續看doReadBytes方法:

最後, 通過byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())將jdk底層的channel中的資料寫入到我們建立的ByteBuf中, 並返回實際寫入的位元組數

回到NioByteUnsafe的read()方法中:

繼續看allocHandle.lastBytesRead(doReadBytes(byteBuf))這步

剛才我們剖析過doReadBytes(byteBuf)返回的是世界寫入ByteBuf的位元組數

再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:

public final void lastBytesRead(int bytes) {
    lastBytesRead = bytes;
    totalBytesRead += bytes;
    if (totalBytesRead < 0) {
        totalBytesRead = Integer.MAX_VALUE;
    }
}

這裡會賦值兩個屬性, lastBytesRead代表最後讀取的位元組數, 這裡賦值為我們剛才寫入ByteBuf的位元組數, totalBytesRead表示總共讀取的位元組數, 這裡將寫入的位元組數追加

繼續看NioByteUnsafe的read()方法:

如果最後一次讀取資料為0, 說明已經將channel中的資料全部讀取完畢, 將新建立的ByteBuf釋放迴圈利用, 並跳出迴圈

allocHandle.incMessagesRead(1)這步是增加訊息的讀取次數, 因為我們迴圈最多16次, 所以當增加訊息次數增加到16會結束迴圈

讀取完畢之後, 會通過pipeline.fireChannelRead(byteBuf)將傳遞channelRead事件, 有關channelRead事件, 我們在第四章也進行了詳細的剖析

這裡讀者會有疑問, 如果一次讀取不完, 就傳遞channelRead事件, 那麼server接收到的資料有可能就是不完整的, 其實關於這點, netty也做了相應的處理, 我們會在之後的章節詳細剖析netty的半包處理機制

 

迴圈結束後, 會執行到allocHandle.readComplete()這一步

我們知道第一次分配ByteBuf的初始容量是1024, 但是初始容量不一定一定滿足所有的業務場景, netty中, 將每次讀取資料的位元組數進行記錄, 然後之後次分配ByteBuf的時候, 容量會盡可能的符合業務場景所需要大小, 具體實現方式, 就是在readComplete()這一步體現的

我們跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:

public void readComplete() {
    record(totalBytesRead());
}

這裡呼叫了record方法, 並且傳入了這一次所讀取的位元組總數

跟到record方法中:

private void record(int actualReadBytes) { 
    if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { 
        if (decreaseNow) { 
            index = Math.max(index - INDEX_DECREMENT, minIndex); 
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) { 
        index = Math.min(index + INDEX_INCREMENT, maxIndex); 
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

首先看判斷條件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) 

這裡index是當前分配的緩衝區大小所在的SIZE_TABLE中的索引, 將這個索引進行縮排, 然後根據縮排後的所以找出SIZE_TABLE中所儲存的記憶體值, 再判斷是否大於等於這次讀取的最大位元組數, 如果條件成立, 說明分配的記憶體過大, 需要縮容操作, 我們看if塊中縮容相關的邏輯

首先 if (decreaseNow) 會判斷是否立刻進行收縮操作, 通常第一次不會進行收縮操作, 然後會將decreaseNow設定為true, 代表下一次直接進行收縮操作

假設需要立刻進行收縮操作, 我們看收縮操作的相關邏輯:

 index = Math.max(index - INDEX_DECREMENT, minIndex) 這一步將索引縮排一步, 但不能小於最小索引值

然後通過 nextReceiveBufferSize = SIZE_TABLE[index] 獲取設定索引之後的記憶體, 賦值在nextReceiveBufferSize, 也就是下次需要分配的大小, 下次就會根據這個大小分配ByteBuf了, 這樣就實現了縮容操作

再看 else if (actualReadBytes >= nextReceiveBufferSize) 

這裡判斷這次讀取位元組的總量比上次分配的大小還要大, 則進行擴容操作

擴容操作也很簡單, 索引步進, 然後拿到步進後的索引所對應的記憶體值, 作為下次所需要分配的大小

NioByteUnsafe的read()方法中:

經過了縮容或者擴容操作之後, 通過pipeline.fireChannelReadComplete()傳播ChannelReadComplete()事件

以上就是讀取客戶端訊息的相關流程

 

 

第五章總結

 

        本章主要剖析了ByteBuf的基本操作以及緩衝區分配等相關知識.

 

        緩衝區分配, 分為通過呼叫jdk的api的方式和分配一塊連續記憶體的方式

 

        其中, 通過分配連續記憶體的方式分配緩衝區中, 又介紹了在page級別分配的邏輯和在subpage級別分配的邏輯

 

        page級別分配時通過操作記憶體二叉樹的方式記錄分配情況

 

        subpage級別分配是通過點陣圖的方式記錄分配情況

 

        最後介紹了NioSocketChannel處理讀事件的相關邏輯

 

        總體來說, 這一章的內容難度是比較大的, 希望同學課後通過多除錯的方式進行熟練掌握