1. 程式人生 > >【7】netty4原始碼分析-write

【7】netty4原始碼分析-write

轉自 http://xw-z1985.iteye.com/blog/1970844

Netty的寫操作由兩個步驟組成:

Write:將msg儲存到ChannelOutboundBuffer中
Flush:將msg從ChannelOutboundBuffer中flush到套接字的傳送緩衝區中。
本文介紹第一個步驟write

//DefaultChannelHandlerContext  
public ChannelFuture write(Object msg) {  
        return write(msg, newPromise());  
}   
public ChannelFuture write(final Object msg, final ChannelPromise promise) {  
        if (msg == null) {  
            throw new NullPointerException("msg");  
        }  
        validatePromise(promise, true);  
        write(msg, false, promise);  
        return promise;  
    }  
private void write(Object msg, boolean flush, ChannelPromise promise) {  
        DefaultChannelHandlerContext next = findContextOutbound();  
        EventExecutor executor = next.executor();  
        if (executor.inEventLoop()) {  
            next.invokeWrite(msg, promise);  
            if (flush) {  
                next.invokeFlush();  
            }  
        } else {  
            int size = channel.estimatorHandle().size(msg);  
            if (size > 0) {  
                ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();  
                // Check for null as it may be set to null if the channel is closed already  
                if (buffer != null) {  
                    buffer.incrementPendingOutboundBytes(size, false);  
                }  
            }  
            executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));  
        }  
    }  

Write是一個Outbound事件,所以會呼叫outbound處理器的write方法。下面分析headHandler的write方法。

//HeadHandler          
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
            unsafe.write(msg, promise);  
        }  

裡面會呼叫AbstractUnsafe的write方法

// AbstractUnsafe  
  public void write(Object msg, ChannelPromise promise) {  
            if (!isActive()) {  
                // Mark the write request as failure if the channel is inactive.  
                if (isOpen()) {  
                    promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);  
                } else {  
                    promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);  
                }  
                // release message now to prevent resource-leak  
                ReferenceCountUtil.release(msg);  
            } else {  
                outboundBuffer.addMessage(msg, promise);  
            }  
        }  

outboundBuffer是AbstractUnsafe使用的一種資料結構ChannelOutboundBuffer,用來儲存待發送的訊息。該資料結構在例項化AbstractUnsafe的同時被初始化:

// ChannelOutboundBuffer  
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {  
        @Override  
        protected ChannelOutboundBuffer newObject(Handle handle) {  
            return new ChannelOutboundBuffer(handle);  
        }  
    };    
  
static ChannelOutboundBuffer newInstance(AbstractChannel channel) {  
        ChannelOutboundBuffer buffer = RECYCLER.get();  
        buffer.channel = channel;  
        buffer.totalPendingSize = 0;  
        buffer.writable = 1;  
        return buffer;  
    }  

ChannelOutboundBuffer的結構如下:

在這裡插入圖片描述

Buffer是用來儲存msg的Entry結構陣列,entry的結構如下:

在這裡插入圖片描述

ChannelOutboundBuffer例項化時,buffer陣列的大小為32,nioBuffers陣列的大小也為32.由於ChannelOutboundBuffer的例項化的代價實際上是很高的,看以下構造方法:

private ChannelOutboundBuffer(Handle handle) {  
        this.handle = handle;  
        buffer = new Entry[INITIAL_CAPACITY];  
        for (int i = 0; i < buffer.length; i++) {  
            buffer[i] = new Entry();  
        }  
        nioBuffers = new ByteBuffer[INITIAL_CAPACITY];  
    }  

所以netty使用基於thread-local的輕量級物件池Recycler對ChannelOutboundBuffer進行回收。當ChannelOutboundBuffer第一次被例項化且使用完畢後,會回收到Recycler中(見下面的recyle方法),下次需要用時,直接從Recycler中取(見下面的get方法),避免了再次例項化和垃圾回收的開銷。

public abstract class Recycler<T> {  
    private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {  
        @Override  
        protected Stack<T> initialValue() {  
            return new Stack<T>(Recycler.this, Thread.currentThread());  
        }  
    };  
    public final T get() {  
        Stack<T> stack = threadLocal.get();  
        T o = stack.pop();  
        if (o == null) {  
            o = newObject(stack);  
        }  
        return o;  
    }  
    public final boolean recycle(T o, Handle handle) {  
        @SuppressWarnings("unchecked")  
        Stack<T> stack = (Stack<T>) handle;  
        if (stack.parent != this) {  
            return false;  
        }  
        if (Thread.currentThread() != stack.thread) {  
            return false;  
        }  
        stack.push(o);  
        return true;  
    }  
    protected abstract T newObject(Handle handle);  
    public interface Handle { }  
    static final class Stack<T> implements Handle {  
        private static final int INITIAL_CAPACITY = 256;  
        final Recycler<T> parent;  
        final Thread thread;  
        private T[] elements;  
        private int size;  
        private final Map<T, Boolean> map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);  
        @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })  
        Stack(Recycler<T> parent, Thread thread) {  
            this.parent = parent;  
            this.thread = thread;  
            elements = newArray(INITIAL_CAPACITY);  
        }  
        T pop() {  
            int size = this.size;  
            if (size == 0) {  
                return null;  
            }  
            size --;  
            T ret = elements[size];  
            elements[size] = null;  
            map.remove(ret);  
            this.size = size;  
            return ret;  
        }  
  
        void push(T o) {  
            if (map.put(o, Boolean.TRUE) != null) {  
                throw new IllegalStateException("recycled already");  
            }  
  
            int size = this.size;  
            if (size == elements.length) {  
                T[] newElements = newArray(size << 1);  
                System.arraycopy(elements, 0, newElements, 0, size);  
                elements = newElements;  
            }  
  
            elements[size] = o;  
            this.size = size + 1;  
        }  
  
        @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })  
        private static <T> T[] newArray(int length) {  
            return (T[]) new Object[length];  
        }  
    }  

下面接著分析ChannelOutboundBuffer的addMessage方法。

// ChannelOutboundBuffer  
     void addMessage(Object msg, ChannelPromise promise) {  
        int size = channel.estimatorHandle().size(msg);  
        if (size < 0) {  
            size = 0;  
        }  
        Entry e = buffer[tail++];  
        e.msg = msg;  
        e.pendingSize = size;  
        e.promise = promise;  
        e.total = total(msg);  
  
        tail &= buffer.length - 1;  
  
        if (tail == flushed) {  
            addCapacity();  
        }  
        // increment pending bytes after adding message to the unflushed arrays.  
        // See https://github.com/netty/netty/issues/1619  
        incrementPendingOutboundBytes(size, true);  
    }  

每次都會將msg作為一個Entry儲存到buffer陣列的tail位置,然後將tail自增1,自增後執行這行程式碼tail &= buffer.length – 1(譬如假設length為4,當已儲存3個msg後,tail累加到4,和3執行與的結果得到0,因此下次的訊息又重新儲存到buffer的0位置)使得buffer陣列可以迴圈儲存。如果出現tail=flushed,說明空間不夠,需要將陣列擴容到原來大小的兩倍.

incrementPendingOutboundBytes則會更新totalPendingSize,將其累加本次msg的大小。如果新的totalPendingSize超過了channel的高水位線writeBufferHighWaterMark(預設值為64 * 1024),則觸發ChannelWritabilityChanged事件。(注意:如果網路很繁忙,套接字的傳送緩衝區空間 不夠,導致Msg不能及時從buffer中flush出去,那麼不斷的對channel執行write操作,會使得對陣列不斷地進行兩倍擴容,最終導致OOM。所以最好在自己的Inbound處理器裡捕獲ChannelWritabilityChanged事件,然後呼叫channel的isWritable方法,根據結果來決定是否繼續執行write操作)。

// ChannelOutboundBuffer  
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =  
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");  
  
 void incrementPendingOutboundBytes(int size, boolean fireEvent) {  
        // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets  
        // recycled while process this method.  
        Channel channel = this.channel;  
        if (size == 0 || channel == null) {  
            return;  
        }  
  
        long oldValue = totalPendingSize;  
        long newWriteBufferSize = oldValue + size;  
        while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {  
            oldValue = totalPendingSize;  
            newWriteBufferSize = oldValue + size;  
        }  
  
        int highWaterMark = channel.config().getWriteBufferHighWaterMark();  
  
        if (newWriteBufferSize > highWaterMark) {  
            if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {  
                if (fireEvent) {  
                    channel.pipeline().fireChannelWritabilityChanged();  
                }  
            }  
        }  
    }  

需要注意的是,該方法是執行緒安全的,採用了一個技巧,使用AtomicLongFieldUpdater來對totalPendingSize進行更新,實現CAS的效果,達到併發安全讀寫。相對於synchronized同步,AtomicLongFieldUpdater的開銷是比較小的。

總結可以借鑑的幾個點:

1、輕量級物件池的使用

2、buffer陣列的迴圈儲存

3、ChannelWritabilityChanged事件的觸發

4、AtomicLongFieldUpdater的使用