1. 程式人生 > >Netty原始碼分析之ChannelPipeline(二)—ChannelHandler的新增與刪除

Netty原始碼分析之ChannelPipeline(二)—ChannelHandler的新增與刪除

上篇文章中,我們對Netty中ChannelPipeline的構造與初始化進行了分析與總結,本篇文章我們將對ChannelHandler的新增與刪除操作進行具體的的程式碼分析;

一、ChannelHandler的新增

下面是Netty官方的一段demo原始碼,可以看到在服務端初始化時執行了向ChannelPipeline中新增自定義channelHandler的操作。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            // p.addLast(new LoggingHandler(LogLevel.INFO));
                            // 向ChannelPipeline中新增自定義channelHandler
                            p.addLast(serverHandler);
                        }
                    });

我們可以看到上面的程式碼中呼叫ChannelPipeline的addLast方法實現了channelHandler的新增,下面我們就分析下addLast方法的具體原始碼實現

首先看下addLast方方法的具體原始碼實現

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //判斷handler是否被重複新增
            checkMultiplicity(handler);

            //建立ChannelHandlerContext節點  filterName檢查名稱是否重複
            newCtx = newContext(group, filterName(name, handler), handler);

            //雙向連結串列中增加ChannelHandlerContext
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {//判斷是否在同一執行緒中
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

分析addLast方法程式碼可以看到,ChannelHandler的新增基本可以分為四步

1、驗證ChannelHandler是否重複新增

我們看下checkMultiplicity方法的具體實現

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {//如果該handler非共享且已經被新增
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;//新增過之後,修改標識
        }
    }

2、建立一個HandlerContext物件

我們之前說過netty會把一個channelhandler封裝成一個ChannelHandlerContext物件,如下面程式碼所示
newCtx = newContext(group, filterName(name, handler), handler);
封裝物件時,我們可以給要新增的channelhandler起一個名字,filterName方法可以判斷該handler的命名是否重複
    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);//返回一個預設名稱
        }
        checkDuplicateName(name);
        return name;
    }
checkDuplicateName方法會遍歷連結串列中節點如果查詢到有重複的name則會丟擲異常
    private void checkDuplicateName(String name) {
        if (context0(name) != null) { //遍歷節點,查詢是否有重複name
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }

3、向連結串列中新增新增context

前面進行了一系列判斷後,通過addLast0方法我們把ChannelHandlerContext新增到pipeline中的雙向連結串列中

    //相當於在tail節點前面插入一個節點,也就是addLast
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;//拿到tail節點的前置節點
        newCtx.prev = prev;//把當前context的前置節點置為 prev
        newCtx.next = tail;//把當前context的後置節點置為tail
        prev.next = newCtx;//把prev節點的後置節點置為context
        tail.prev = newCtx;//把tail節點的前置節點置為context
    }

addLast0內部實現很簡單,就是在tail節點前面插入一個節點,也就是把該ChannelHandlerContext放在連結串列的最後。

4、呼叫回撥方法,通知新增成功

這一步就是當ChannelHandler新增到Pipeline中時呼叫,通過callHandlerAdded()回撥方法通知ChannelHandler新增成功,執行handlerAdded()方法;

首先判斷當前執行緒與eventloop執行緒是否一致,不一致的話封裝成task提交給eventloop執行緒,是同一執行緒直接執行callHandlerAdded0方法,我們看下方法具體實現

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded();//呼叫callHandlerAdded回撥方法
        } catch (Throwable t) {
            boolean removed = false;
            try {
                remove0(ctx);//如果出現異常的話,把該ctx刪除
                ctx.callHandlerRemoved();//呼叫callHandlerRemoved回撥方法
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
    final void callHandlerAdded() throws Exception {
        // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
        // any pipeline events ctx.handler() will miss them because the state will not allow it.
        if (setAddComplete()) {//在新增handler之前,保證狀態為可新增狀態
            handler().handlerAdded(this);
        }
    }

通過上面程式碼實現,我們就可以通過重寫ChannelHandler的handlerAdded方法,執行一些當ChannelHandler新增到Pipeline中時需要觸發的功能邏輯。

二、ChannelHandler的刪除

ChannelHandler的刪除主要是通過ChannelPipeline的remove方法來實現的

我們先看下remove方法原始碼具體實現

    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));//刪除handler
        return this;
    }
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        //不能刪除頭節點和尾節點
        assert ctx != head && ctx != tail;

        //加鎖,保證執行緒安全
        synchronized (this) {
            remove0(ctx);//在連結串列中刪除context物件

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we remove the context from the pipeline and add a task that will call
            // ChannelHandler.handlerRemoved(...) once the channel is registered.
            //這裡主要判斷下該pipline對應channel是否已經註冊到eventloop上
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {//判斷是否在同一執行緒中
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        //呼叫回撥方法,通知handler已刪除
        callHandlerRemoved0(ctx);
        return ctx;
    }

刪除操作整個流程與新增類似

1、獲取ChannelHandlerContext物件

    //根據傳入的handler拿到其包裝的ChannelHandlerContext物件
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        //根據context方法從連結串列中獲取該handler的ChannelHandlerContext物件
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    @Override
    public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }

        AbstractChannelHandlerContext ctx = head.next;
        //遍歷連結串列拿到該handler封裝的ChannelHandlerContext物件
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }

2、判斷是否是首尾節點

首先判斷刪除的節點既不是頭節點也不是尾節點

   //不能刪除頭節點和尾節點
   assert ctx != head && ctx != tail;

3、執行刪除操作

然後通過remove0方法刪除指定Context節點

    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;//獲取當前節點的前置節點
        AbstractChannelHandlerContext next = ctx.next;//獲取當前節點的後置節點
        prev.next = next;//把prev後置節點設定為next
        next.prev = prev;//把next前置節點設定為prev
    }

 4、呼叫回撥方法,通知刪除成功

同樣會判斷當前執行緒與eventloop執行緒是否一致,不一致的話封裝成task提交給eventloop執行緒

 EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {//判斷是否在同一執行緒中
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
 //呼叫回撥方法,通知handler已刪除
 callHandlerRemoved0(ctx);

呼叫callHandlerRemoved()回撥方法,通知觸發handlerRemoved刪除成功。

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            ctx.callHandlerRemoved();//呼叫ctx中的回撥方法
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
    final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);//通知handler刪除成功事件
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

三、總結

通過上面的內容,我們梳理了channelHandler被封裝成ChannelHandlerContext物件後,基於ChannelPipeline的雙向連結串列的新增和刪除操作,整個流程還是比較簡單清晰的。其中如有不足與不正確的地方還望指出與海涵

 

關注微信公眾號,檢視更多技術文章。