1. 程式人生 > >死磕Netty原始碼之ChannelPipeline原始碼解析(一)

死磕Netty原始碼之ChannelPipeline原始碼解析(一)

前言

ChannelPipeline資料管道是ChannelHandler資料處理器的容器,負責ChannelHandler的管理和事件的攔截與排程

原始碼分析

初始化

在Channel初始化時會呼叫到AbstractChannel的構造方法,Pipeline的初始化動作是在AbstractChannel構造方法中完成的

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

接下來我們繼續跟進newChannelPipeline(),探究一下DefaultChannelPipeline底層實現細節

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");

    tail = new
TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }

在newChannelPipeline()中呼叫了DefaultChannelPipeline的構造方法,DefaultChannelPipeline構造方法主要完成了三件事。首先將與之關聯的Channel儲存在屬性Channel中,然後例項化了兩個物件(一個是TailContext例項tail,一個是HeadContext例項head),然後將head和tail相互指向構成了一個雙向連結串列。資料結構如下圖
ChannelPipeline資料結構


DefaultChannelPipeline中的每個節點是一個ChannelHandlerContext物件(一個ChannelPipeline中可以有多個ChannelHandler例項,而每一個ChannelHandler例項與ChannelPipeline之間的橋樑就是ChannelHandlerContext例項)

新增節點

我們在初始化bootstrap的時候應該對下面這段程式碼不陌生,它是將ChannelHandler新增到pipeline的雙向連結串列中去的

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new NettyMessageDecoder(serializer));
        socketChannel.pipeline().addLast(new NettyMessageEncoder(serializer));
    }
})

下面我們來跟下addLast方法

public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }
    return this;
}

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.禁止非Sharable的handler重複新增到不同的pipeline中
        checkMultiplicity(handler);

        // 2.建立節點 => DefaultChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);

        // 3.新增節點
        addLast0(newCtx);

        // 如果channel沒有與eventloop繫結
        // 則建立一個任務 這個任務會在channel被register的時候呼叫
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        // 4.回撥使用者方法
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

這裡簡單地用synchronized方法是為了防止多執行緒併發操作pipeline底層的雙向連結串列

檢查是否有重複Handler

檢查是否有重複Handler的實現在checkMultiplicity()方法中

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

ChannelHandlerAdapter使用一個成員變數added標識一個Channel是否已經新增過,如果當前要新增的Handler是非共享並且已經新增過那就丟擲異常,否則標識該Handler已經新增。由此可見一個Handler如果是Sharable的就可以無限次被新增到Pipeline中,我們客戶端程式碼如果要讓一個Handler被共用,只需要加一個@Sharable標註即。而如果Handler是Sharable的一般就通過Spring的注入的方式使用,不需要每次都新建物件

建立節點DefaultChannelHandlerContext

建立節點呼叫了newContext()方法,程式碼如下

newCtx = newContext(group, filterName(name, handler), handler);

這裡我們需要先分析filterName(name, handler)這段程式碼,這個方法用於給handler建立一個唯一性的名字

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        // 1.如果傳入的name為空 則生成
        // Netty生成的name預設為=> 簡單類名#0
        // 如果簡單類名#0已存在則將基數+1 生成name為簡單類名#1 以此遞增
        return generateName(handler);
    }
    // 2.檢查是否有重名 檢查通過則返回
    checkDuplicateName(name);
    return name;
}

檢查是否存在重名的Context,如果已存在重名的Context物件則丟擲異常

private void checkDuplicateName(String name) {
    // 判斷是否已有重名的Context
    if (context0(name) != null) {
        // 如果已存在重名的Context則丟擲異常
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

private AbstractChannelHandlerContext context0(String name) {
    // 從頭結點開始遍歷所有的Context
    // 只要發現某個Context的名字與待新增的name相同 就返回該Context
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}

在處理完name之後就進入到建立context的過程,程式碼如下

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    ...
}

由前面的呼叫鏈得知group為null,因此此處的childExecutor(group)也返回null

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

在DefaultChannelHandlerContext構造方法中標記了Handler是否是Inboud型別或OutBound型別並呼叫了父類的構造方法,最後儲存了Handler的引用,判斷Handler是Inbound還是OutBound的程式碼如下

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;MessageToMessageCodec
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

呼叫父類建構函式,初始化成員變數

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    // 此處exeutor為null
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // 初次ordered為true
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

如果一個Handler實現了兩類介面,那麼它既是一個inBound型別的Handler,又是一個outBound型別的Handler,如下圖
ChannelDuplexHandler類圖
常用的將decode操作和encode操作合併到一起的codec,一般會繼承MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler

新增Context節點到Pipeline的雙向連結串列中

在建立完Context之後呼叫addLast0()將Context新增至Pipeline的雙向連結串列中

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

新增節點其實就是個雙向連結串列的插入操作,具體步驟如下圖所示
Pipeline新增節點過程

操作完畢,該Context就加入到Pipeline中
Pipeline新增節點完成

至此Pipeline新增節點的操作就完成了,其他的addXXX方法原理類似

Context節點新增完畢回撥Handler方法

回撥使用者方法邏輯在callHandlerAdded0()方法中完成,程式碼如下

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
        ...
    }
}

Handler可以重寫handlerAdded方法在Handler被新增成功後 執行部分操作

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 節點被新增完畢之後回撥到此
    }
}

接下來,設定該節點的狀態

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}

用cas修改節點的狀態至REMOVE_COMPLETE(說明該節點已經被移除)或者ADD_COMPLETE(說明節點已被新增)

刪除節點

Netty有個最大的特性之一就是Handler可插拔做到動態編織Pipeline,比如在首次建立連線的時候需要通過進行許可權認證,在認證通過之後就可以將此Context移除,下次Pipeline在傳播事件的時候就就不會呼叫到許可權認證處理器

下面是許可權認證Handler最簡單的實現,第一個資料包傳來的是認證資訊,如果校驗通過就刪除此Handler,否則直接關閉連線

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        // ...
    }
}

重點就在ctx.pipeline().remove(this)這段程式碼

public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    return this;
}

remove操作可以分為如下三個步驟

1.找到待刪除的節點
2.調整雙向連結串列指標刪除節點
3.Context節點刪除完畢回撥Handler方法

找到待刪除的節點

private AbstractChannelHandlerContext getContextOrDie(String name) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
    if (ctx == null) {
        throw new NoSuchElementException(name);
    } else {
        return ctx;
    }
}

public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

通過遍歷連結串列方式根據Handler找到對應的Context節點(判斷依據 => Context的Handler和當前Handler相同)

調整雙向連結串列指標刪除節點

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    // 頭結點和為節點不能刪除
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 調整雙向連結串列指標刪除節點
        remove0(ctx);

        // 如果channel沒有與eventloop繫結
        // 則建立一個任務 這個任務會在channel被register的時候呼叫
        if (!registered) {
            callHandlerCallbackLater(ctx, false);
            return ctx;
        }

        // 回撥使用者函式
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    callHandlerRemoved0(ctx);
    return ctx;
}

刪除節點是通過remove0方法實現的,程式碼如下

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next;
    next.prev = prev;
}

刪除節點其實就是個雙向連結串列的刪除操作,具體步驟如下圖所示
Pipeline刪除節點過程

操作完畢,該Context就從Pipeline中移除
Pipeline刪除節點完成

Context節點刪除完畢回撥Handler方法

回撥使用者方法邏輯在callHandlerRemoved0()方法中完成,程式碼如下

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        try {
            ctx.handler().handlerRemoved(ctx);
        } finally {
            ctx.setRemoved();
        }
    } catch (Throwable t) {
        fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
    }
}

Handler可以重寫handlerRemoved方法在Handler被刪除成功後 執行部分操作

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 節點被刪除完畢之後回撥到此
    }
}

最後將該節點的狀態設定為removed

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}

至此Pipeline新增節點的刪除就完成了,其他的removeXXX方法原理類似