1. 程式人生 > >Netty原始碼分析 (八)----- write過程 原始碼分析

Netty原始碼分析 (八)----- write過程 原始碼分析

上一篇文章主要講了netty的read過程,本文主要分析一下write和writeAndFlush。

主要內容

本文分以下幾個部分闡述一個java物件最後是如何轉變成位元組流,寫到socket緩衝區中去的

  1. pipeline中的標準連結串列結構
  2. java物件編碼過程
  3. write:寫佇列
  4. flush:重新整理寫佇列
  5. writeAndFlush: 寫佇列並重新整理

pipeline中的標準連結串列結構

一個標準的pipeline鏈式結構如下

資料從head節點流入,先拆包,然後解碼成業務物件,最後經過業務Handler處理,呼叫write,將結果物件寫出去。而寫的過程先通過tail節點,然後通過encoder節點將物件編碼成ByteBuf,最後將該ByteBuf物件傳遞到head節點,呼叫底層的Unsafe寫到jdk底層管道

java物件編碼過程

為什麼我們在pipeline中添加了encoder節點,java物件就轉換成netty可以處理的ByteBuf,寫到管道里?

我們先看下呼叫write的code

BusinessHandler

protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
    Response response = doBusiness(request);

    if (response != null) {
        ctx.channel().write(response);
    }
}

業務處理器接受到請求之後,做一些業務處理,返回一個Response,然後,response在pipeline中傳遞,落到 Encoder節點,我們來跟蹤一下 ctx.channel().write(response);

public ChannelFuture write(Object msg) {
    return this.pipeline.write(msg);
}

呼叫了Channel中的pipeline中的write方法,我們接著看

public final ChannelFuture write(Object msg) {
    return this.tail.write(msg);
}

pipeline中有屬性tail,呼叫tail中的write,由此我們知道write訊息的時候,從tail開始,接著往下看

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = this.findContextOutbound();
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        Object task;
        if (flush) {
            task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
        } else {
            task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
        }

        safeExecute(executor, (Runnable)task, promise, m);
    }

}

中間我省略了幾個過載的方法,我們來看看第一行程式碼,next = this.findContextOutbound();

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;

    do {
        ctx = ctx.prev;
    } while(!ctx.outbound);

    return ctx;
}

通過 ctx = ctx.prev; 我們知道從tail開始找到pipeline中的第一個outbound的handler,然後呼叫 invokeWrite(m, promise),此時找到的第一個outbound的handler就是我們自定義的編碼器Encoder

我們接著看 next.invokeWrite(m, promise);

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (this.invokeHandler()) {
        this.invokeWrite0(msg, promise);
    } else {
        this.write(msg, promise);
    }

}
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
    } catch (Throwable var4) {
        notifyOutboundHandlerException(var4, promise);
    }

}

一路程式碼跟下來,我們可以知道是呼叫了第一個outBound型別的handler中的write方法,也就是第一個呼叫的是我們自定義編碼器Encoder的write方法

我們來看看自定義Encoder

public class Encoder extends MessageToByteEncoder<Response> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
        out.writeByte(response.getVersion());
        out.writeInt(4 + response.getData().length);
        out.writeBytes(response.getData());
    }
}

自定義Encoder繼承 MessageToByteEncoder ,並且重寫了 encode方法,這就是編碼器的核心,我們先來看 MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {

我們看到 MessageToByteEncoder 繼承了 ChannelOutboundHandlerAdapter,說明了 Encoder 是一個 Outbound的handler

我們來看看 Encoder 的父類 MessageToByteEncoder中的write方法

MessageToByteEncoder

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 判斷當前Handelr是否能處理寫入的訊息
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            // 強制換換
            I cast = (I) msg;
            // 分配一段ButeBuf
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
            // 呼叫encode,這裡就調回到  `Encoder` 這個Handelr中    
                encode(ctx, cast, buf);
            } finally {
                // 既然自定義java物件轉換成ByteBuf了,那麼這個物件就已經無用了,釋放掉
                // (當傳入的msg型別是ByteBuf的時候,就不需要自己手動釋放了)
                ReferenceCountUtil.release(cast);
            }
            // 如果buf中寫入了資料,就把buf傳到下一個節點
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
            // 否則,釋放buf,將空資料傳到下一個節點    
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 如果當前節點不能處理傳入的物件,直接扔給下一個節點處理
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 當buf在pipeline中處理完之後,釋放
        if (buf != null) {
            buf.release();
        }
    }
}

這裡,我們詳細闡述一下Encoder是如何處理傳入的java物件的

1.判斷當前Handler是否能處理寫入的訊息,如果能處理,進入下面的流程,否則,直接扔給下一個節點處理
2.將物件強制轉換成Encoder可以處理的 Response物件
3.分配一個ByteBuf
4.呼叫encoder,即進入到 Encoderencode方法,該方法是使用者程式碼,使用者將資料寫入ByteBuf
5.既然自定義java物件轉換成ByteBuf了,那麼這個物件就已經無用了,釋放掉,(當傳入的msg型別是ByteBuf的時候,就不需要自己手動釋放了)
6.如果buf中寫入了資料,就把buf傳到下一個節點,否則,釋放buf,將空資料傳到下一個節點
7.最後,當buf在pipeline中處理完之後,釋放節點

總結一點就是,Encoder節點分配一個ByteBuf,呼叫encode方法,將java物件根據自定義協議寫入到ByteBuf,然後再把ByteBuf傳入到下一個節點,在我們的例子中,最終會傳入到head節點,因為head節點是一個OutBount型別的handler

HeadContext

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

這裡的msg就是前面在Encoder節點中,載有java物件資料的自定義ByteBuf物件,進入下一節

write:寫佇列

我們來看看channel中unsafe的write方法,先來看看其中的一個屬性

AbstractUnsafe

protected abstract class AbstractUnsafe implements Unsafe {
    private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

我們來看看 ChannelOutboundBuffer 這個類

public final class ChannelOutboundBuffer {
    private final Channel channel;
    private ChannelOutboundBuffer.Entry flushedEntry;
    private ChannelOutboundBuffer.Entry unflushedEntry;
    private ChannelOutboundBuffer.Entry tailEntry;

ChannelOutboundBuffer內部維護了一個Entry連結串列,並使用Entry封裝msg。其中的屬性我們下面會詳細講

我們回到正題,接著看 unsafe.write(msg, promise);

AbstractUnsafe

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

1.呼叫 filterOutboundMessage() 方法,將待寫入的物件過濾,把非ByteBuf物件和FileRegion過濾,把所有的非直接記憶體轉換成直接記憶體DirectBuffer

@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }

        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

2.接下來,估算出需要寫入的ByteBuf的size
3.最後,呼叫 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下來,我們需要重點看一下這個方法幹了什麼事情

ChannelOutboundBuffer

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 建立一個待寫出的訊息節點
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(size, false);
}

想要理解上面這段程式碼,必須得掌握寫快取中的幾個訊息指標,如下圖

ChannelOutboundBuffer 裡面的資料結構是一個單鏈表結構,每個節點是一個 EntryEntry 裡面包含了待寫出ByteBuf 以及訊息回撥 promise,下面分別是三個指標的作用

1.flushedEntry 指標表示第一個被寫到作業系統Socket緩衝區中的節點
2.unFlushedEntry 指標表示第一個未被寫入到作業系統Socket緩衝區中的節點
3.tailEntry指標表示ChannelOutboundBuffer緩衝區的最後一個節點

初次呼叫 addMessage 之後,各個指標的情況為

fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的節點

第二次呼叫 addMessage之後,各個指標的情況為

第n次呼叫 addMessage之後,各個指標的情況為

可以看到,呼叫n次addMessage,flushedEntry指標一直指向NULL,表示現在還未有節點需要寫出到Socket緩衝區,而unFushedEntry之後有n個節點,表示當前還有n個節點尚未寫出到Socket緩衝區中去

flush:重新整理寫佇列

不管呼叫channel.flush(),還是ctx.flush(),最終都會落地到pipeline中的head節點

HeadContext

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

之後進入到AbstractUnsafe

AbstractUnsafe

public final void flush() {
   assertEventLoop();

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
   if (outboundBuffer == null) {
       return;
   }

   outboundBuffer.addFlush();
   flush0();
}

flush方法中,先呼叫 outboundBuffer.addFlush();

ChannelOutboundBuffer

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}

可以結合前面的圖來看,首先拿到 unflushedEntry 指標,然後將 flushedEntry 指向unflushedEntry所指向的節點,呼叫完畢之後,三個指標的情況如下所示

 

相當於所有的節點都即將開始推送出去

接下來,呼叫 flush0();

AbstractUnsafe

protected void flush0() {
    doWrite(outboundBuffer);
}

發現這裡的核心程式碼就一個 doWrite,繼續跟

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
        // 拿到第一個需要flush的節點的資料
        Object msg = in.current();

        if (msg instanceof ByteBuf) {
            // 強轉為ByteBuf,若發現沒有資料可讀,直接刪除該節點
            ByteBuf buf = (ByteBuf) msg;

            boolean done = false;
            long flushedAmount = 0;
            // 拿到自旋鎖迭代次數
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            // 自旋,將當前節點寫出
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            // 寫完之後,將當前節點刪除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}

這裡略微有點複雜,我們分析一下

1.第一步,呼叫current()先拿到第一個需要flush的節點的資料

 ChannelOutBoundBuffer

public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}

2.第二步,拿到自旋鎖的迭代次數

if (writeSpinCount == -1) {
    writeSpinCount = config().getWriteSpinCount();
}

3.自旋的方式將ByteBuf寫出到jdk nio的Channel

for (int i = writeSpinCount - 1; i >= 0; i --) {
    int localFlushedAmount = doWriteBytes(buf);
    if (localFlushedAmount == 0) {
        setOpWrite = true;
        break;
    }

    flushedAmount += localFlushedAmount;
    if (!buf.isReadable()) {
        done = true;
        break;
    }
}

doWriteBytes 方法跟進去

protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

我們發現,出現了 javaChannel(),表明已經進入到了jdk nio Channel的領域,我們來看看 buf.readBytes(javaChannel(), expectedWrittenBytes);

public int readBytes(GatheringByteChannel out, int length) throws IOException {
    this.checkReadableBytes(length);
    int readBytes = this.getBytes(this.readerIndex, out, length);
    this.readerIndex += readBytes;
    return readBytes;
}

我們來看關鍵程式碼 this.getBytes(this.readerIndex, out, length)

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
    this.checkIndex(index, length);
    if (length == 0) {
        return 0;
    } else {
        ByteBuffer tmpBuf;
        if (internal) {
            tmpBuf = this.internalNioBuffer();
        } else {
            tmpBuf = ((ByteBuffer)this.memory).duplicate();
        }

        index = this.idx(index);
        tmpBuf.clear().position(index).limit(index + length);
        //將tmpBuf中的資料寫到out中
        return out.write(tmpBuf);
    }
}

我們來看看out.write(tmpBuf)

public int write(ByteBuffer src) throws IOException {
    ensureOpen();
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.write(fd, src, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

和read實現一樣,SocketChannelImpl的write方法通過IOUtil的write實現:關鍵程式碼 n = IOUtil.write(fd, src, -1, nd);

static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    //如果是DirectBuffer,直接寫,將堆外快取中的資料拷貝到核心快取中進行傳送
    if (var1 instanceof DirectBuffer) {
        return writeFromNativeBuffer(var0, var1, var2, var4);
    } else {
        //非DirectBuffer
        //獲取已經讀取到的位置
        int var5 = var1.position();
        //獲取可以讀到的位置
        int var6 = var1.limit();

        assert var5 <= var6;
        //申請一個原buffer可讀大小的DirectByteBuffer
        int var7 = var5 <= var6 ? var6 - var5 : 0;
        ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);

        int var10;
        try {

            var8.put(var1);
            var8.flip();
            var1.position(var5);
            //通過DirectBuffer寫,將堆外快取的資料拷貝到核心快取中進行傳送
            int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            var10 = var9;
        } finally {
            //回收分配的DirectByteBuffer
            Util.offerFirstTemporaryDirectBuffer(var8);
        }

        return var10;
    }
}

程式碼邏輯我們就不再講了,程式碼註釋已經很清楚了,這裡我們關注一點,我們可以看看我們前面的一個方法 filterOutboundMessage(),將待寫入的物件過濾,把非ByteBuf物件和FileRegion過濾,把所有的非直接記憶體轉換成直接記憶體DirectBuffer

說明到了這一步所有的 var1 意境是直接記憶體DirectBuffer,就不需要走到else,就不需要write兩次了

4.刪除該節點

節點的資料已經寫入完畢,接下來就需要刪除該節點

ChannelOutBoundBuffer

public boolean remove() {
    Entry e = flushedEntry;
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
    }

    // recycle the entry
    e.recycle();

    return true;
}

首先拿到當前被flush掉的節點(flushedEntry所指),然後拿到該節點的回撥物件 ChannelPromise, 呼叫 removeEntry()方法移除該節點

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

這裡的remove是邏輯移除,只是將flushedEntry指標移到下個節點,呼叫完畢之後,節點圖示如下

writeAndFlush: 寫佇列並重新整理

理解了write和flush這兩個過程,writeAndFlush 也就不難了

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } 
}

可以看到,最終,通過一個boolean變數,表示是呼叫 invokeWriteAndFlush,還是 invokeWriteinvokeWrite便是我們上文中的write過程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    invokeWrite0(msg, promise);
    invokeFlush0();
}

可以看到,最終呼叫的底層方法和單獨呼叫 write 和 flush 是一樣的

private void invokeWrite(Object msg, ChannelPromise promise) {
        invokeWrite0(msg, promise);
}

private void invokeFlush(Object msg, ChannelPromise promise) {
        invokeFlush0(msg, promise);
}

由此看來,invokeWriteAndFlush基本等價於write方法之後再來一次flush

總結

1.pipeline中的編碼器原理是建立一個ByteBuf,將java物件轉換為ByteBuf,然後再把ByteBuf繼續向前傳遞
2.呼叫write方法並沒有將資料寫到Socket緩衝區中,而是寫到了一個單向連結串列的資料結構中,flush才是真正的寫出
3.writeAndFlush等價於先將資料寫到netty的緩衝區,再將netty緩衝區中的資料寫到Socket緩衝區中,寫的過程與併發程式設計類似,用自旋鎖保證寫成功
4.netty中的緩衝區中的ByteBuf為DirectByteBuf