1. 程式人生 > >netty(三) NioEventLoop再學習

netty(三) NioEventLoop再學習

NioEventLoop再次學習

昨天開始學習了NioEventLoop,今天詳細看了NioEventLoop的底層實現。

NioEventLoop的繼承關係如圖所示:
這裡寫圖片描述
從圖中可以看出來,NioEventLoop的終極父類是Executor,也就是是說NioEventLoop是一個執行緒池(但是它也可以看做一個執行緒)。在程式中並沒有使用NioEventLoop,而是通過使用NioEventLoopGroup物件來最終使用NioEventLoop的。

1 NioEventLoopGroup

NioEventLoopGroup是一個執行緒池,從上面的類繼承關係可以看出來,NioEventLoopGroup的終極父類也是Executor,因為EventLoopGroup是它的父類。NioEventLoopGroup作為一個執行緒池,裡面盛放的物件就是NioEventLoop物件。也就是說NioEventLoopGroup中盛放的時一個一個的執行緒池NioEventLoop。下面看看NioEventLoopGroup底層實現。

通過new NioEventLoopGroup()會呼叫NioEventLoopGroup的構造器,在一步一步,最終呼叫NioEventLoopGroup的父類的構造器,這裡說一點,既然NioEventLoopGroup是一個執行緒池,那麼就會指定執行緒池中的執行緒數,如果直接new NioEventLoopGroup()那麼會開啟默認個執行緒,預設數是CPU核心的2倍。

會呼叫父類MultithreadEventLoopGroup的構造器,這裡會指定NioEventLoopGroup池中的執行緒數
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super
(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } //呼叫父類MultithreadEventExecutorGroup的構造器 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { //執行緒工廠,一定是用來建立執行緒的 threadFactory = this.newDefaultThreadFactory(); //這裡會發現NioEventLoopGroup中實際上存放的就是EventExecutor,把它看做執行緒
//以陣列的形式儲存 this.children = new SingleThreadEventExecutor[nThreads]; //例項化每個陣列元素,也就是建立每一個EventExecutor物件 this.children[i] = this.newChild(threadFactory, args); //一旦陣列中有EventExecutor建立不成功,那麼本次建立失敗 //已經建立的EventExecutor物件就要被銷燬 this.children[j].shutdownGracefully(); }

這裡需要看看newChild()方法:

//可以發現這裡會交由其子類來重寫
protected abstract EventExecutor newChild(ThreadFactory var1, Object... var2) throws Exception;

因為關注的是NioEventLoopGroup,因此看看它對該方法的重寫:

protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
}

從這個方法可以看出,建立的EventExecutor實際上就是NioEventLoop物件。

從這裡就可以看出,雖然程式中沒有直接操作NioEventLoop,但是通過NioEventLoopGroup,就可以使用NioEventLoop。

2 NioEventLoop

直接看上面呼叫的NioEventLoop的構造方法:

 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    //首先呼叫父類構造器
    super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    } else if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    } else {
        this.provider = selectorProvider;
        //這個和獲得Selector物件的方法很像,估計就是
        NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
        this.selectStrategy = strategy;
    }
}

然後呼叫了NioEventLoop的父類構造器:

//繼續呼叫父類構造器
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
}

//呼叫SingleThreadEventExecutor構造器
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {

    //建立SingleThreadEventExecutor執行緒池中的執行緒
    this.thread = threadFactory.newThread(new Runnable() {
        public void run() {
            呼叫run方法
            SingleThreadEventExecutor.this.run();
        }
    }

}

//由子類重寫
protected abstract void run();

解釋:SingleThreadEventExecutor是一個執行緒池,因此需要制定裡面的執行緒,從名字可以看出來,SingleThreadEventExecutor執行緒池中只有一個執行緒,也就是它的thread屬性。SingleThreadEventExecutor的子類NioEventLoop又重寫了run()方法,因此在SingleThreadEventExecutor執行緒池中的執行緒執行的時候回撥用NioEventLoop的run()方法。然後把建立的NioEventLoop物件賦值給MultithreadEventExecutorGroup中的children[i].

至此可以明白了,建立NioEventLoopGroup時,實際上就建立了若干個NioEventLoop執行緒池物件,並將其放入到NioEventLoopGroup中,交由其統一管理。

又因為NioEventLoop現場池中只有一個執行緒,因此,可以把NioEventLoop看做一個執行緒。NioEventLoop負責的事情就是連線和IO操作。具體就是在NioEventLoop的run方法中實現的。

3 NioEventLoop的職責

建立完NioEventLoopGroup之後,實際上就建立了NioEventLoop,然後將NioEventLoopGroup繫結到啟動類上Bootstrap,伺服器端是繫結到ServerBootstrap啟動類上。

//繫結
bootstrap.group(group,worker);

//啟動,監控,在這裡會啟動NioEventLoop中的執行緒,監聽,或者處理IO
bootstrap.bind()
        //同步等待
        .sync();

看看bind()方法,然後進行一系列的呼叫:

public ChannelFuture bind() {    
    return this.doBind(localAddress);
}

//doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
    AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}

//doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    /**
    *這裡會執行eventLoop().execute(),往執行緒池中提交任務
    *eventLoop()會返回NioEventLoop物件
    *那麼就會執行NioEventLoop的execute()方法
    */
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

//NioEventLoop從SingleThreadEventExecutor繼承過來的execute()
public void execute(Runnable task) {
    //這裡啟動NioEventLoop中的執行緒
    this.startThread();
}

然後將會執行run()方法,NioEventLoop的重寫的run方法也會被執行:

//NioEventLoop重寫的run
 protected void run() {
    while(true) {
        while(true) {

            //實際呼叫select方法阻塞,等待IO或者網路連線
            this.select(this.wakenUp.getAndSet(false));

            //處理相關事件   
            this.processSelectedKeys();

        }
    }
}

下面就是經過一系列呼叫執行的方法:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    //這就和利用原生java NIO jdk程式設計非常像了,獲得channel上發生動作的集合,遍歷這個集合,
    //不停的處理這些channel上的動作
    if (!selectedKeys.isEmpty()) {
        Iterator i = selectedKeys.iterator();

        while(true) {
            SelectionKey k = (SelectionKey)i.next();
            Object a = k.attachment();
            i.remove();
            if (a instanceof AbstractNioChannel) {
                this.processSelectedKey(k, (AbstractNioChannel)a);
            } else {
                NioTask<SelectableChannel> task = (NioTask)a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (this.needsToSelectAgain) {
                this.selectAgain();
                selectedKeys = this.selector.selectedKeys();
                if (selectedKeys.isEmpty()) {
                    break;
                }

                i = selectedKeys.iterator();
            }
        }

    }
}

ok,至此完畢。可算是NioEventLoop搞懂了。

4 總結

在程式編寫的時候不需要直接操作NioEventLoop,而是通過NioEventLoopGroup來,間接建立NioEventLoop。NioEventLoopGroup是一個執行緒池,裡面存放若干個NioEventLoop物件。

NioEventLoop是一個執行緒池物件,但是它裡面只有一個執行緒,因此也可以將其看做一個執行緒,它主要做的事情就是,開啟它裡面的唯一的那個執行緒,如果是負責監聽連線的話,那麼將ServerSocketChannel註冊在這個執行緒上,並宣告對Accept感興趣,這個執行緒就會通過Selector呼叫select()阻塞,直至有連線請求過來,那麼就開始處理;如果是負責IO的話,那麼將SocketChannel註冊在這個執行緒上,並宣告對Read/Write感興趣,這個執行緒也會通過Selector呼叫select()阻塞,直至有IO操作發生,然後喚醒處理IO。

5 參考資料