1. 程式人生 > >Netty原始碼分析 (一)----- NioEventLoopGroup

Netty原始碼分析 (一)----- NioEventLoopGroup

提到Netty首當其衝被提起的肯定是支援它承受高併發的執行緒模型,說到執行緒模型就不得不提到NioEventLoopGroup這個執行緒池,接下來進入正題。

執行緒模型

首先來看一段Netty的使用示例

package com.wrh.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public final class SimpleServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }
}

下面將分析第一、二行程式碼,看下NioEventLoopGroup類的建構函式幹了些什麼。其餘的部分將在其他博文中分析。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

從程式碼中可以看到這裡使用了兩個執行緒池bossGroupworkerGroup,那麼為什麼需要定義兩個執行緒池呢?這就要說到Netty的執行緒模型了。

 

 

Netty的執行緒模型被稱為Reactor模型,具體如圖所示,圖上的mainReactor指的就是bossGroup

,這個執行緒池處理客戶端的連線請求,並將accept的連線註冊到subReactor的其中一個執行緒上;圖上的subReactor當然指的就是workerGroup,負責處理已建立的客戶端通道上的資料讀寫;圖上還有一塊ThreadPool是具體的處理業務邏輯的執行緒池,一般情況下可以複用subReactor,比我的專案中就是這種用法,但官方建議處理一些較為耗時的業務時還是要使用單獨的ThreadPool。

NioEventLoopGroup建構函式

NioEventLoopGroup的建構函式的程式碼如下

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, null);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, selectorProvider);
} 

NioEventLoopGroup類中的建構函式最終都是呼叫的父類MultithreadEventLoopGroup如下的建構函式:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

從上面的建構函式可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()來建立物件,即不指定執行緒個數,則netty給我們使用預設的執行緒個數,如果指定則用我們指定的執行緒個數。

預設執行緒個數相關的程式碼如下:

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

而SystemPropertyUtil.getInt函式的功能為:得到系統屬性中指定key(這裡:key=”io.netty.eventLoopThreads”)所對應的value,如果獲取不到獲取失敗則返回預設值,這裡的預設值為:cpu的核數的2倍。

結論:如果沒有設定程式啟動引數(或者說沒有指定key=”io.netty.eventLoopThreads”的屬性值),那麼預設情況下執行緒的個數為cpu的核數乘以2。

繼續看,由於MultithreadEventLoopGroup的建構函式是呼叫的是其父類MultithreadEventExecutorGroup的建構函式,因此,看下此類的建構函式

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }

    children = new SingleThreadEventExecutor[nThreads];
    //根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }
        //產生nTreads個NioEventLoop物件儲存在children陣列中
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
                //如果newChild方法執行失敗,則對前面執行new成功的幾個NioEventLoop進行shutdown處理
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
}

該建構函式幹了如下三件事:

1、產生了一個執行緒工場:threadFactory = newDefaultThreadFactory();

MultithreadEventExecutorGroup.java
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());//getClass()為:NioEventLoopGroup.class
}

DefaultThreadFactory.java    
public DefaultThreadFactory(Class<?> poolType) {
    this(poolType, false, Thread.NORM_PRIORITY);
}

2、根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

3、 產生nTreads個NioEventLoop物件儲存在children陣列中 ,執行緒都是通過呼叫newChild方法來產生的。

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

這裡傳給NioEventLoop建構函式的引數為:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。

NioEventLoop建構函式分析

既然上面提到來new一個NioEventLoop物件,下面我們就看下這個類以及其父類。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}

繼續看父類 SingleThreadEventLoop的建構函式

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}

又是直接呼叫來父類SingleThreadEventExecutor的建構函式,繼續看

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }

    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;//false

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
            //呼叫NioEventLoop類的run方法
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error(
                            "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }

                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });

    taskQueue = newTaskQueue();
} 
protected Queue<Runnable> newTaskQueue() {
    return new LinkedBlockingQueue<Runnable>();
}

主要幹如下兩件事:

1、利用ThreadFactory建立來一個Thread,傳入了一個Runnable物件,該Runnable重寫的run程式碼比較長,不過重點僅僅是呼叫NioEventLoop類的run方法。

2、使用LinkedBlockingQueue類初始化taskQueue 。

其中,newThread方法的程式碼如下:

DefaultThreadFactory.java

@Override
public Thread newThread(Runnable r) {
    Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());

    try {
    //判斷是否是守護執行緒,並進行設定
        if (t.isDaemon()) {
            if (!daemon) {
                t.setDaemon(false);
            }
        } else {
            if (daemon) {
                t.setDaemon(true);
            }
        }
            //設定其優先順序
        if (t.getPriority() != priority) {
            t.setPriority(priority);
        }
    } catch (Exception ignored) {
        // Doesn't matter even if failed to set.
    }
    return t;
}

protected Thread newThread(Runnable r, String name) {
    return new FastThreadLocalThread(r, name);
}

FastThreadLocalThread.java

public FastThreadLocalThread(Runnable target, String name) {
    super(target, name);// FastThreadLocalThread extends Thread 
} 

到這裡,可以看到底層還是藉助於類似於Thread thread = new Thread(r)這種方式來建立執行緒。

關於NioEventLoop物件可以得到的點有,初始化了如下4個屬性。

1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父類SingleThreadEventExecutor中)

總結

關於NioEventLoopGroup,總結如下

1、 如果不指定執行緒數,則執行緒數為:CPU的核數*2

2、根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser

3、產生nThreads個NioEventLoop物件儲存在children陣列中。

可以理解NioEventLoop就是一個執行緒,執行緒NioEventLoop中裡面有如下幾個屬性:

1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父類SingleThreadEventExecutor中)

更通俗點就是:NioEventLoopGroup就是一個執行緒池,NioEventLoop就是一個執行緒。NioEventLoopGroup執行緒池中有N個NioEventLoop執行緒。