1. 程式人生 > >Netty 進階:write流程全解析

Netty 進階:write流程全解析

1. ChannelOutboundBuffer

1.1 ChannelOutboundBuffer概述

在分析write前,有必要介紹一下ChannelOutboundBuffer,顧名思義該類是用來快取寫向Socket的資料。每個 ChannelSocket 的 Unsafe 都有一個繫結的 ChannelOutboundBuffer , Netty 向站外輸出資料的過程統一通過 ChannelOutboundBuffer 類進行封裝,目的是為了提高網路的吞吐量,在外面呼叫 write 的時候,資料並沒有寫到 Socket,而是寫到了 ChannelOutboundBuffer 這裡,當呼叫 flush 的時候,才真正的向 Socket 寫入 下面看它定義的程式碼:

protected abstract class AbstractUnsafe implements Unsafe {
	private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
}

它被定義在AbstractUnsafe類中,AbstractUnsafe是AbstractChannel的內部類。在ChannelOutboundBuffer的構造方法中傳入了AbstractChannel.this物件,就是把自己傳入。可以猜到ChannelOutboundBuffer內部有一個Channel的屬性。而且每一個Channel例項化時,都會呼叫父類AbstractChannel的建構函式,程式碼如下。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();//新建unsafe
    pipeline = newChannelPipeline();
}

可見每一個Channel都有一個unsafe屬性,因此可以說每一個Channe都關聯了一個ChannelOutboundBuffer。應用關係如下:

/**
 * 表示箭頭的起點持有終點的應用
 * 
 * Channel → unsafe
 *   ↑         ↓
 * ChannelOutboundBuffer
 *
 */

當我們清楚了這層關係之後,下面來進入ChannelOutboundBuffer內部看看它的具體實現,先看它的幾個重要屬性:

//所繫結的Channel
private final Channel channel;

// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
// 表示要lush到socket的Entry起點
private Entry flushedEntry;
// 表示不flush到socket的Entry起點
private Entry unflushedEntry;
// 連結串列的尾節點
private Entry tailEntry;
// 還可以寫入的個數
private int flushed;
private int nioBufferCount;
private long nioBufferSize;
private boolean inFail;

ChannelOutboundBuffer中維護了節點元素為Entry的單向連結串列。Entry為待發送資料的一層封裝,Entry 裡面包含了待寫出ByteBuf 以及訊息回撥 promise,實際待發送資料儲存在Entry的Object msg中。列表的結果如下圖所示: 在這裡插入圖片描述 看起來像是兩個頭結點的列表,為了flushedEntry是用了虛線。當 addFlush 方法的時候會將 unflushedEntry 賦值給 flushedEntry。表示即將從這裡開始重新整理到socket。

1.2 AddMessage

當儲存資料的結構定義好了,那麼如何新增資料?addMessage方法就是新增資料到此列表。

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

先是將資料封裝為一個Entry物件,第一次呼叫三者都為空,則將新的entry賦值給tailEntry和unflushedEntry,並且將flushedEntry置為空。如果不是第一次呼叫,則將新的entry新增到tailEntry的後面,並且將tailEntry指向新的entry。另外兩個指標不變。最後一行是通過CAS增加buffer的總位元組數。

另外這裡有一個點很重要Entry.newInstance:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
    Entry entry = RECYCLER.get();
    entry.msg = msg;
    entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
    entry.total = total;
    entry.promise = promise;
    return entry;
}

Recycler是一個Netty實現的基於執行緒區域性堆疊的輕量級物件池,先不管它內部的實現,只需要知道不是每次新增資料都需要new一個Entry物件,而是通過物件池技術複用,只是取出來之後將資料以及promise賦值給Entry。這樣做的目的不多說。必然能降低開銷。

1.3 addFlush

addMessage的作用是將資料新增到ChannelOutboundBuffer維護的列表中,但是addFlush並不是消費列表中元素,它的作用是確定將要flush的元素。更準確的說是更改flushedEntry和unflushedEntry的值。

 public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

首先拿到未重新整理的頭節點。將這個 unflushedEntry 賦值給 flushedEntry,迴圈嘗試設定這些節點,能做取消操作了,如果嘗試失敗了,就將這個節點取消。同時將 totalPendingSize相應的減小。設定之後,promise 呼叫 cancel 方法就會返回 false。 至於真正的將資料flush到socket是在具體的SocketChannel中實現。

write&flush

清楚了上面的內容之後,開始分析write,一般我們呼叫channel.write、ctx.write方法,其實都是ChannelOutboundInvoker介面宣告的方法。文件中講的很到位

Request to write a message via this ChannelHandlerContext through the ChannelPipeline. This method will not request to actual flush, so be sure to call flush() once you want to request to flush all pending data to the actual transport.

請求通過ChannelPipeline通過此ChannelHandlerContext寫入訊息。 此方法不會請求實際重新整理,因此一旦要請求將所有待處理資料重新整理到實際傳輸,請務必呼叫flush()。

也就是說ChannelOutboundInvoker.write會呼叫pipeline的write方法,其實在pipeline的文章中提到過,write也是一種可以在pipeline上傳播的出站事件。想必它一定是從tail節點開始,下面看pipeline的write方法:

 @Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

然而tail節點並沒有實現write方法,因此實際呼叫的是它的父類AbstractChannelHandlerContext#write:

 @Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {

    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise);

    return promise;
}

前面是對promise的校驗,接著繼續呼叫過載方法:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

pipeline.touch(msg, next)是關於物件應用計數,實現物件複用,不是本文的重點,繼續往下看。findContextOutbound()返回 的是下一個outboundHandler,最終會流到head節點