1. 程式人生 > >Netty原始碼分析:1.4伺服器啟動流程

Netty原始碼分析:1.4伺服器啟動流程

第一章節是主要是伺服器啟動的程式碼分析。章節目錄有:
|———1.1初始化NioEventLoopGroup
|———1.2初始化NioEventLoop
|———1.3初始化NioServerSocketChannel
|———1.4伺服器啟動流程
為什麼先從初始化開始瞭解伺服器啟動?
因為在我看伺服器啟動的相關原始碼的時候,有很多地方都是初始化的時候已經建立好的。所以我就從初始化的原始碼開始看起。這是我第一次看原始碼的筆記,仍有很多理解錯誤的地方和不解的地方。歡迎討論。

本篇目錄

  • 啟動伺服器程式碼
  • 程式碼分析

啟動伺服器程式碼

  • 1 建立NioEventLoopGroup物件。上文已經介紹了NioEventLoopGroup的初始化已經內部執行緒NioEventLoop的初始化話。
  • 2 建立ServerBootstrap物件,該類初始化主要是建立了幾個LinkedHashMap來儲存設定。例如childOptions或者childAttrs
 public void bind() {

         //1 
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        try
{ // 引導繫結和啟動伺服器 //2 ServerBootstrap b = new ServerBootstrap(); //3 b.group(boss, work); // 建立NioEventLoopGroup物件來處理事件,如接受新連線、接收資料、寫資料等等 //4 b.channel(NioServerSocketChannel.class); // 設定childHandler執行所有的連線請求
//5 b.childHandler(new ChildChannelHandler()); //6 b.option(ChannelOption.SO_BACKLOG, 100); //繫結埠 //7 ChannelFuture future = b.bind(8080).sync(); //8 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //9 boss.shutdownGracefully(); work.shutdownGracefully(); } }

3 Group

賦值兩個執行緒池。
1 首先第一個boss執行緒池是賦值到父類AbstractBootstrap的group變數中。
2 work執行緒池就賦值在ServerBootstrap的childGroup變數中。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //1
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

4 channel()

  • 首先ReflectiveChannelFactory類是一個工廠類,裡面只有一個方法newChannel()用來將傳入的class進行無參構造生成物件的。
  • 然後channelFactory()是將ReflectiveChannelFactory賦予ServerBootstrap的父類的channelFactory引數
    此時channel 還沒被建立,直到bind()方法中呼叫 initAndRegister()方法的時候才會用該工廠類生成一個channel。下文會講。
 public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

5 childHandler()

在呼叫這個“childHandler()“這個方法的時候,你需要自己“寫一個方法繼承ChannelInitializer類“。而“ChannelInitializer“類也是繼承“ChannelInboundHandlerAdapter“的。所以“childHandler()“方法的引數就是我們自己寫的類。然後賦值到childHandler引數。
public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }

6 Option()

將一些配置儲存到options變數中,該變數是一個LinkedHashMap
public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

7 bind()

![image.png](https://upload-images.jianshu.io/upload_images/6212571-f500986ea414273c.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 之前的一些方法都只是對ServerBootstrap的配置,說白了就是用來set引數的。 bind()則是開始啟動伺服器了。 - 1 進行對“group“和“channelFactory“兩個引數進行非空驗證
public ChannelFuture bind(SocketAddress localAddress) {
        validate(); //1 
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

doBind()

  • 1 建立一個channel 並且初始化和註冊。關鍵部分。程式碼分析看下面
  • 2 判斷channel是否註冊成功。如果已經註冊成功那麼就進行doBind0()方法。如果還沒成功,那就新增一個監聽器,等返回成功的時候就進行doBind0()方法。解析看下文。
 private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //1
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        //2
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else { 
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {

                        promise.setFailure(cause);
                    } else {

                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

doBind0()

  • 1 該方法是執行在channel已經與selector註冊後的。給執行緒新增一個任務。該任務是繫結埠的。

到這個方法。伺服器啟動就已經完成了

 private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        //1
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

下文都是對initAndRegister()方法的程式碼解析

initAndRegister()

  • 1 利用工廠建立一個channel物件。初始化內容看上一篇文章初始化NioServerSocketChannel
  • 2 對channel進行一個初始化,看下文。
  • 3 對channel進行註冊,看下文
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
           //1
            channel = channelFactory.newChannel();
            //2
            init(channel);
        } catch (Throwable t) {

        }
        //3
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

init()

  • 1 對channel 的config變數裡新增options屬性
  • 2 對channel 的config變數裡新增attrs屬性
  • 3 獲取channel 物件的pipeline管道,然後在管道里面新增一個handler,該handler作用有:新增bootstrap裡的handler。對channel 新增一個任務。
  • 4 新增bootstrap裡的handler。對channel 新增一個任務
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        //1 
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        //2
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        //3
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        //4
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

config().group().register(channel)

  • config().group()是獲取到執行緒池,該執行緒池是ServerBootstrap的父類AbstractBootstrap儲存的boss執行緒池。是不是恍然大悟,這就是把channel放到boss執行緒池裡的一個執行緒裡面去執行任務啊。
  • 1 是MultithreadEventLoopGroup類的方法 也就是NioEventLoopGroup的父類。 該方法就是從執行緒池裡獲取一個EventLoop.然後執行EventLoop裡的register()方法。
 @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
@Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel); //1
    }

繼續看下去。
- 2 是SingleThreadEventLoop裡的方法。`DefaultChannelPromise() 方法是給channel 與該執行緒 新增一個監聽器。

@Override
    public ChannelFuture register(Channel channel) {
       //2
        return register(new DefaultChannelPromise(channel, this));
    }

繼續往下看
- 3 繼續是SingleThreadEventLoop裡的方法。promise是監聽器物件,promise.channel()獲取到channel。unsafe()方法是實現底層的register,read或者write操作

 @Override
    public ChannelFuture register(final ChannelPromise promise) {
        //3
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

繼續往下分析,該register()方法是AbstractChannel類的。也就是初始化NioServerSocketChannel的時候,建立pipeline和unsafe的類。
- 4 進行引數驗證
- 5 將該eventLoop執行緒賦值於channel引數。
- 6 eventLoop.inEventLoop() 判斷,如果現在的執行緒是EventLoop()的 執行緒,那麼執行任務,如果不是那麼就用執行器執行任務。在這裡debug,會返回false,會呼叫execute()方法。
下面繼續探討
- 7 下面探討下register0()方法

 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            //4
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            //5
            AbstractChannel.this.eventLoop = eventLoop;
            //6
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                        //7
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {

                }
            }
        }

首先在研究register0()之前有一個小知識。

在版本4.1.x 的時候,在初始化eventLoop的時候,還沒有建立執行緒,而是儲存了Executor這個變數。這個變數在4.0.x版本的時候是沒有的。那麼4.1.x版本在什麼時候建立執行緒呢?在eventLoop呼叫execute()方法的時候建立執行緒。下文可以看到

  • 1 繼續判斷是否現在的執行緒是EventLoop物件的執行緒,肯定返回false,因為EventLoop執行緒裡的thread變數是null嘛。
 @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

[圖片上傳失敗…(image-1a4a0b-1523959190717)]

  • 2 在startThread()方法中,才建立一個執行緒。
  • 3 在任務佇列裡新增一個任務。
public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //1
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
             //2
            startThread();
            //3
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

register0(ChannelPromise promise)

  • 1 判斷EventLoop執行緒是否還存活。
  • 2 這個是記錄是否註冊過的。neverRegistered預設是true;
  • 3 這個是核心程式碼,到這裡才進行將channel與selector註冊在一起。
 private void register0(ChannelPromise promise) {
            try {
                //1
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                //2
                boolean firstRegistration = neverRegistered;
                doRegister();//3
                neverRegistered = false;
                registered = true;


                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {

                        beginRead();
                    }
                }
            } catch (Throwable t) {

            }
        }