1. 程式人生 > >一起學Netty(十八)netty原始碼學習之netty server端原始碼初讀(上)

一起學Netty(十八)netty原始碼學習之netty server端原始碼初讀(上)

server端是使用了Reactor模式對nio進行了一些封裝,Reactor模式網上有很多資料,不贅述,瞭解了這個模式開始看原始碼

netty的版本是4.0.21.Final

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.0.21.Final</version>
</dependency>


netty端server端的比較經典的程式碼如下:

(程式碼一)

public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap();
            sbs.group(bossGroup,workerGroup);
            sbs.channel(NioServerSocketChannel.class);
            sbs.localAddress(new InetSocketAddress(port));
            sbs.childHandler(new ChannelInitializer<SocketChannel>() {
                        
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            ch.pipeline().addLast(new HelloWorldServerHandler());
                        };
                        
                    }).option(ChannelOption.SO_BACKLOG, 128)   
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
             // 繫結埠,開始接收進來的連線
             ChannelFuture future = sbs.bind(port).sync();  
             
             System.out.println("Server start listen at " + port );
             future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

step1:

這是一段普通不能再普通的程式碼了,先看EventLoopGroup這個類,這個類是EventLoop的一個集合,我們先簡單的看下EventLoopGroup的一個具體實現NioEventLoopGroup

public NioEventLoopGroup() {
        this(0);
}

public NioEventLoopGroup(int nThreads) {
        this(nThreads, null);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        super(nThreads, threadFactory, selectorProvider);
}

NioEventLoopGroup建構函式最後呼叫了它父類MultithreadEventLoopGroup的建構函式:

(程式碼二)

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

其中DEFAULT_EVENT_LOOP_THREADS設定了這個group中的個數,如果我們沒有預設傳遞引數,系統預設會是:

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
一般是你CPU *2的個數,在Reactor模式中,mainReactor角色一般只需要一個執行緒就搞定了,subReactor角色就是那個苦逼的worker了,一般boss(mainReactor)一個就夠了,subReactor就是需要多個了,所以在【程式碼一】中:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

看名字就知道哪個EventLoopGroup對應哪一個Reactor模型中的角色了

回到上面一個話題中,我們接著【程式碼二】看,它接著呼叫它的父類MultithreadEventExecutorGroup的建構函式

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        if (isPowerOfTwo(children.length)) {
            chooser = new PowerOfTwoEventExecutorChooser();
        } else {
            chooser = new GenericEventExecutorChooser();
        }

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
這是本質的方法,所有的工作基本上都在這裡做了,因為前面我們很懶惰,啥都沒有做,因為我們構造NioEventLoopGroup的時候,沒有傳遞任何引數,netty給了一些預設的實現,首先newDefaultThreadFactory用來穿件一個預設的ThreadFactory,然後初始化了一個叫做children的例項變數,該children的定義是這樣的
private final EventExecutor[] children;
EventExecutor是一個Executor,也就是一個執行緒執行器,換而言之,children = new SingleThreadEventExecutor[nThreads]這行程式碼對children進行了初始化動作,nThreads表示個數,如果我們CPU是4核的話,4*2表示初始化8個SingleThreadEventExecutor執行緒執行器,SingleThreadEventExecutor其實就是Reactor模型中真正工作的worker了,我們先簡單的看下SingleThreadEventExecutor原始碼,下面列出2段SingleThreadEventExecutor的程式碼片段

SingleThreadEventExecutor片段一:

private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();

private final Thread thread;
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
首先先看taskQueue這是一個Runnable的任務佇列,還有一個delayedTaskQueue延遲佇列,最後每一個SingleThreadEventExecutor都綁定了唯一的Thread,用這個thread去執行這些taskQueue和delayedTaskQueue中的任務

SingleThreadEventExecutor片段二:

thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } 
這個thread最後回去呼叫SingleThreadEventExecutor的run方法,這個後面再一起分析

回到上文children的初始化的程式碼中來,初始化好之後,開始為一個child賦值:

children[i] = newChild(threadFactory, args);
看newChild方法:
@Override
    protected EventExecutor newChild(
            ThreadFactory threadFactory, Object... args) throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
        super(parent, threadFactory, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
}

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
        super(parent, threadFactory, addTaskWakesUp);
 }
可以看出最後還是呼叫的SingleThreadEventExecutor的建構函式,期間做了selector的處理,我們知道NIO是基於事件驅動的,所以netty也是離不開selector,所以每一個NioEventLoop(上文描述的SingleThreadEventLoop的子類)都需要繫結一個selector,這樣在netty初始化channel的時候,只需要將channel繫結selector就可以了,關於openSelector這個方法,netty也進行了大量的優化,關於openselector這個方法,大家可以參考http://budairenqin.iteye.com/blog/2215896,我這個就不重複介紹了

好了,到此為止NioEventLoopGroup的原始碼就已經分析完了,此時NioEventLoopGroup中的每一個NioEventLoop中繫結的thread還沒有start,我們接著看

step2:

回到程式碼一:

ServerBootstrap sbs = new ServerBootstrap();
sbs.group(bossGroup,workerGroup);
group方法是將我們剛剛初始化好的bossGroup,workerGroup分別進行賦值(當然需要進行賦值繫結啦,否則例項化他們幹啥用,例項化就是為了用,多說兩句,不忘初心,bossGroup和workerGroup這兩個的使命是什麼,重述一遍,bossGroup和workerGroup中維護的都是維護了兩個佇列,一個thread,boss的thread執行緒是用來接收連結的然後轉發給worker處理,worker列隊就是一個個任務,等待worker的thread做處理,但是這兩個group中的thread都沒有start呢,並且selector都沒有註冊,哈哈,不忘初心,做人也是一樣)

廢話不多說,workerGroup賦值給ServerBootstrap例項的childGroup,bossGroup賦值給AbstractBootstrap的group,具體程式碼如下:

ServerBootstrap.java

private volatile EventLoopGroup childGroup;

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
}
AbstractBootstrap.java
private volatile EventLoopGroup group;

AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
}
接著看程式碼一:
sbs.channel(NioServerSocketChannel.class);
這個channel方法則是規定了這個ServerBootstrap的channel是啥,這裡面維護了一個叫做channelFactory的東西,顧名思義,這是一個channel的工廠,我們傳遞的是NioServerSocketChannel這個類,那麼這個channelFactory工廠創建出來的channel就是NioServerSocketChannel

去掉一些handler類,我們看:

ChannelFuture future = sbs.bind(port).sync();  
我們看bind方法
public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
}

public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise;
        if (regFuture.isDone()) {
            promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }

        return promise;
}

最後呼叫了AbstractBootstrap(這個我們剛才做過處理,把bossGroup賦值了AbstractBootstrap的group,也就是說這個AbstractBootstrap獲取到一個thread執行器了)

言歸正傳,我們看doBind方法:

(程式碼三)

final ChannelFuture regFuture = initAndRegister();

final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }
好了,我們看到了channelFactory了,那麼它new出來的channel當然就是NioServerSocketChannel

(程式碼四)

@Override
        public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
這邊要提及的是,我們建立NioServerSocketChannel的例項的時候,會觸發它的父類AbstractChannel的建構函式:
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
}
unsafe = newUnsafe()這個是我們需要關注的,unsafe看起來就很熟悉,不過此unsafe並不是juc包那個CAS的unsafe,我們看看它的抽象:

原始碼位於io.netty.channel.Channel的Unsafe

interface Unsafe {
        /**
         * Return the {@link SocketAddress} to which is bound local or
         * {@code null} if none.
         */
        SocketAddress localAddress();

        /**
         * Return the {@link SocketAddress} to which is bound remote or
         * {@code null} if none is bound yet.
         */
        SocketAddress remoteAddress();

        /**
         * Register the {@link Channel} of the {@link ChannelPromise} with the {@link EventLoop} and notify
         * the {@link ChannelFuture} once the registration was complete.
         */
        void register(EventLoop eventLoop, ChannelPromise promise);

我列出部分程式碼的意思就是這個unsafe其實是束縛於Channel的,但也提供了channel的一些方法,比如register(EventLoop eventLoop, ChannelPromise promise)這個方法,將channel註冊到eventLoop上去,這是很關鍵的,我們再回歸到AbstractChannel的newUnsafe方法上來,追蹤原始碼最終發現:
@Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
}

好了,最終NioServerSocketChannel這個channel對應的Unsafe是NioMessageUnsafe,關於AbstractChannel的newUnsafe這只是一個插曲(下文會用到),我們還是回到【程式碼三】
final Channel channel = channelFactory().newChannel();

init(channel);

init的方法,很重要,這裡面規範了channel的一些屬性,我們看看位於ServerBootstrap中的init方法

@Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
因為channel這邊有很多option,另外我們看到了childGroup(我們剛才賦值的workerGroup),childHandler也就是handlers,也就是channel配置了options,獲取到執行緒執行器了,最後我們需要關注的就是channel中的ChannelPipeline中增加了一個ServerBootstrapAcceptor(後面再分析,這邊就是說明一下,在channelPipeLine中有了ServerBootstrapAcceptor)

回到initAndRegister這個方法中來,init完了channel,最最關鍵的一步就是

ChannelFuture regFuture = group().register(channel);

還是不忘初心,在NIO的程式設計中有一個很重要的步驟,我們可以將這個步驟寫出來:

SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
InetSocketAddress s = new InetSocketAddress("localhost", 2000);
channel.connect(s);

Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

這是一段很經典的純的Java的NIO的的client程式碼。

這邊我們看channel.register這行程式碼,我們需要將channel註冊到selector上,並且傳遞自己感興趣的事件引數,這樣當有selector上有事件發生的是,就會通知註冊的channel觸發對應的事件

我們在分析netty的時候,我們知道在分析NioEventLoopGroup原始碼的時候,我們知道每一個NioEventLoopGroup都維護了一個selector,再後文中我們也分析了一個channel的建立過程,但是還沒有進行selector和selector進行register

這邊的group().register(channel)最後呼叫的是:

channel.unsafe().register(this, promise);
回到一圈,我們有看到了unsafe,這個unsafe上文已經分析過了,是NioMessageUnsafe,register是它父類AbstractUnsafe完成的動作

我們也注意下register的入參,this指代的是SingleThreadEventLoop這個例項(其實我要的不是這個例項,而是這個例項中的selector(位於NioEventLoop.java中))

最後的註冊動作是在AbstractNioChannel.java中完成的

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
這邊我們要注意evetLoop就是我們剛才說的this,也就是SingleThreadEventLoop抽象類的具體實現,這邊獲取到了selector,javaChannel()這個方法就是
protected SelectableChannel javaChannel() {
        return ch;
    }
private final SelectableChannel ch;

估計這邊大家要問這個ch是什麼時候賦值的,請看【程式碼四】

在newInstant的時候建立NioServerSocketChannel例項的時候,進行了賦值,這邊ch其實就是NioServerSocketChannel

好了,到此為止,我們基本分析了Netty的server端的啟動過程,你也許會問server的thread貌似還沒有start吧,其實不然,你可以仔細觀察一下register0這個方法,是這麼呼叫的:

try {
                    eventLoop.execute(new OneTimeTask() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
}
這個eventLoop其實就是AbstractBootstrap的group,像註冊這種動作也是一種事件,所以需要Bootstrap的自帶的thread去做這個動作,最終會呼叫SingleThreadEventExecutor的execute的方法:
@Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

最後會呼叫startThread方法,歸於自然,歸於本心,呼叫SingleThreadEventExecutor的
private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                delayedTaskQueue.add(new ScheduledFutureTask<Void>(
                        this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
                        ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
                thread.start();
            }
        }
    }

thrad的start呼叫的方法就是我們剛才片段二種的
SingleThreadEventExecutor.this.run();

好了,先分析這麼多吧,我們進行總結一下,分析太多,我自己也記不過來

1)Netty的server端程式碼一開始初始化了兩個EventLoopGroup,其實就是初始化EventLoop,每一個EventLoop的具體實現就是維護了一個任務佇列,一個延遲任務佇列,一個thread,並且每一個EventLoop都有一個屬於自己的Executor執行器,這樣做的好處就是每一個唯一的thread去不停的迴圈呼叫,去執行任務佇列和延遲任務佇列中的task,沒有了上下文的切換們還要記得每一個EventLoop還初始化了一個selector,關於selector的建立,netty做了很大的優化,使其與CPU更加親和

(中間還忘記分析了,CPU是2的倍數的時候,Netty的優化,大家可以自己看下)

2)關於serverBootstrap的初始化,主要就是做了channel的建立,channel的執行器的繫結,option屬性的配置,繫結埠,這些配置好了之後就是channel和selector的繫結,繫結的時候,順帶啟動一些AbstractBootstrap中的thread,讓其進行無限迴圈中

3)關於細節

①serverBootstrap中在channelPipeline中偷偷地加了一個ServerBootstrapAcceptor

②serverBootstrap中的boss執行緒對應的unsafe物件是NioMessageUnsafe例項