1. 程式人生 > >Netty Hello World 入門原始碼分析

Netty Hello World 入門原始碼分析

第一節簡單提了什麼是網路程式設計,Netty 做了什麼,Netty 都有哪些功能元件。這一節就具體進入 Netty 的世界,我們從用 Netty 的功能實現基本的網路通訊開始分析 各個元件的使用。

1. 一個簡單的傳送接收訊息的例子

話不多說,先來實現一個傳送接收訊息的例子。本例項基於 SpringBoot 工程搭建。

專案類檔案如下:

客戶端和服務端的主要程式碼分為3個部分:啟動器,ChannelInitializer,eventHandler。

相關程式碼已經上傳 GitHub,請參閱:點我 (๑¯ ³ ¯๑)

Server端:

package com.rickiyang.learn.helloWorld;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description: server 端
 */
@Slf4j
public class HwServer {

    private int port;

    public HwServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server start fail",e);
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HwServer server = new HwServer(7788);
        server.start();
    }
}

server initializer:

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
        ctx.write("hi, received your msg");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

server handler:

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
        ctx.write("hi, received your msg");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

客戶端程式碼:

client:

package com.rickiyang.learn.helloWorld;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwClient {

    private  int port;
    private  String address;

    public HwClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());
        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().writeAndFlush("Hello world, i'm online");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("client start fail",e);
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HwClient client = new HwClient(7788,"127.0.0.1");
        client.start();
    }
}

client initializer :

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Created by Administrator on 2017/3/11.
 */
public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 客戶端的邏輯
        pipeline.addLast("handler", new HwClientHandler());
    }
}

client handler :

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server say : " + msg.toString());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("client channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client is close");
    }


}

程式碼很簡單,主要功能就是啟動 服務端 和 客戶端, 然後實現一個簡單的 handler ,在handler 中獲取訊息並列印,本地先啟動服務端 main 函式,再啟動客戶端即可。

2. 從 EventLoopGroup 開始說起

觀看客戶端 和 服務端 的啟動類,我們看到都有相同的特性:

建立 EventLoopGroup 去監聽 channel,然後使用定義的 handler處理對應的事件。

Netty 是一個非同步事件驅動的 NIO 框架,所有IO操作都是非同步非阻塞的。Netty 實際上是使用 Threads(多執行緒)處理 I/O 事件。

EventLoopGroup 是個啥東西呢?我們回想一下 Reactor 模型,主要的操作是使用 Selector 監聽 channel 上的事件,Reactor 模型有三種結構,首先是單執行緒模型:

這種模型顯而易見始終只有一個 Acceptor 執行緒在處理客戶端連線事件和服務端產生的讀寫事件,好處是始終只有一個執行緒在工作不會產生併發帶來的一系列問題。但是不足之處也顯而易見:

  1. 一個執行緒來處理對於現在的多核系統來說有點浪費資源;
  2. 雖然是使用非同步非阻塞I/O處理,但是面對大併發的請求場景,很有可能會負載過重,堆積事件,這樣客戶端就會有超時發生,然後重複傳送請求,必然會造成系統超載;
  3. 單執行緒如果掛掉了系統就停止了,這種場景如何處理。

所以這種單執行緒模型對於當今系統的發展是沒有適用場景的。接著又演變出多執行緒的 Reactor 模型。

在多執行緒模型下,Acceptor 是一個單獨的執行緒專門處理 Client 的請求連線事件,所有的 I/O 操作都由一個特定的 NIO 執行緒池負責,每個客戶端都與一個特定的 NIO 執行緒池繫結,因此這個客戶端連線中的所有 I/O 操作都是在同一個執行緒中完成的。客戶端連線是很多的,但是 NIO 執行緒很少,所以 一個 NIO 執行緒可以同時繫結到多個客戶端連線中。

從上面的模型找缺點的話,很顯然能發現還是有單點的問題:處理客戶端連線請求的執行緒仍舊只有一個,如果這個執行緒掛了,整個系統將不可用。所以這種超級併發的情況也要考慮啊,系統不能有單點,接著改:

現在的系統就沒有單點問題了,但是也增加了複雜性。

我們剛才在說 EventLoopGroup ,為啥突然又轉到了 Reactor 模型上去了呢? 前面說過,Netty 是基於 NIO 程式設計的,NIO 又是基於 Reactor 模型的,自然 Netty 的程式設計模型也是 Reactor 。而 EventLoopGroup 其實就是來設定 Reactor 模型的型別根據不同的引數方式。

我們來看 Server 啟動類中的寫法:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ServerChannelInitializer());

這裡建立了兩個 group,下面 bootstrap set group的時候 set了兩個執行緒池,即 Acceptor 使用一個執行緒池,一個 Reactor 執行緒池。但是可以看到兩個執行緒池都沒有設定大小,進去 NioEventLoopGroup 的構造方法可以看到 預設值是 0,即初始化為0,不開啟執行緒,當有事件進來的時候會開啟一個執行緒來處理。那麼如果將 workGroup 設定為多個執行緒的時候,上面這種寫法就是 Reactor 的多執行緒模型。

我們再來看另一種寫法:

EventLoopGroup bossGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ServerChannelInitializer());

這裡的 group() 方法與上面的區別在於只有一個引數,進入方法內部看看:

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

/**
     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
     * {@link Channel}'s.
     */
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

很明顯它呼叫了帶兩個 EventLoopGroup 引數的 group方法,即兩個 group 執行緒池使用的是同一個。這就是 Reactor 的單執行緒模型。

還有一個 Reactor 的主從多執行緒模型,這個在 Netty 中是沒有實現的,即你將 bossGroup 的執行緒設定為大於1,這個不會改變 Acceptor 的時候事件處理方式,因為在服務端啟動的時候 ServerSocketChannel 只會繫結到 bossGroup 中的一個執行緒,即使你設定了多個,啟動的時候只會使用一個。Netty 官方認為處理連線請求的時候沒有必要使用多執行緒的方式。

現在我們瞭解到 EventLoopGroup 的作用是初始化執行緒池的,那就一起看看它是怎麼實現的吧。

首先看一下 EventLoopGroup 的類結構圖:

可以看到它繼承了 ScheduledExecutorService,即 EventLoopGroup 有執行緒池排程的能力。上面在程式碼中我們使用的是 EventLoopGroup 的子類 NioEventLoopGroup, 還有一個OioEventLoopGroup也可以使用。繼續看 NioEventLoopGroup 的類結構:

可以看到繼承關係為:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。

先看一下我們使用的預設無參構造方法:

public NioEventLoopGroup() {
    this(0);
}

這裡預設設定為 0 ,但是後面的邏輯會判斷如果為 0,那麼會將 執行緒數設定為當前 CPU * 2。

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

這個建構函式將 執行緒執行的 Executor 設定為空,後面會判斷為空重新構造一個 Executor。

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
    int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

上面第一個構造方法是使用了 JDK 的 NIO 生成一個 Selector,第二個是生成一個 Selector 預設策略。接著進入第三個構造方法,這裡使用了父類 MultithreadEventLoopGroup 的 構造方法,還 set 了一個執行緒拒絕策略。

跟進父類的構造方法:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

這裡對執行緒數進行了判斷,如果是 0 則賦預設值,這裡的預設值就是當前核心數 * 2。

接下來又呼叫了它的父類 MultithreadEventExecutorGroup 的構造器:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

再點選進入,終於能看到一段實質性的程式碼了,太不容易:

/**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    //上面我們有個建構函式傳 executor == null。在這裡判斷如果為空,則建立一個新的ThreadPerTaskExecutor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 這裡就是建立指定大小的執行緒池,執行緒池中的每一個元素都是一個 EventLoop
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //存入了一個 NIOEventLoop 類例項
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                //這裡如果建立失敗,首先嚐試優雅停止執行緒,下面會判斷執行緒未正常停止的情況繼續判斷
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    //例項化執行緒工廠執行器選擇器: 根據children獲取選擇器
    chooser = chooserFactory.newChooser(children);

    //為每一個 EventLoop 執行緒新增一個 執行緒終止的監聽器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    // 到這裡 eventLoop 就建立完畢,接著做了一個操作:把 eventLoop 新增進一個新的不可變的 set集合中,即宣告只讀屬性
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

這段程式碼的實質就是來建立 EventLoop 例項,並 set 執行緒執行。

  1. 首先校驗 executor 是否為空,如果是空,則新建一個 ThreadPerTaskExecutor 物件。這個 executor 是用來執行執行緒池中的所有的執行緒,也就是所有的 NioEventLoop,其實從 NioEventLoop 構造器中也可以知道,NioEventLoop 構造器中都傳入了executor這個引數
  2. 接著建立了一個指定大小的執行緒池,這裡的執行緒池就是用來執行我們的 EventLoop,即監聽事件。
  3. 下面就開始往執行緒池中放東西了,for 迴圈的開始是一個 newChild(executor, args)方法,這個方法主要實現的功能就是 new 出一個 NioEventLoop 例項,具體可以參考 NioEventLoopGroup 中的方法:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
  1. eventLoop 建立完畢,下面的操作就是一個安全性保護,這裡就可以使用了。當有 I/O 事件來,就從執行緒池中取出一個執行緒來執行,那麼怎麼取就是根據 chooser 選擇器的策略來執行, 呼叫選擇器的 next()方法。

這裡執行緒的初始化就結束了,所以這麼多的轉跳只做了一個事情:建立 Selector 的執行緒池。

3. NioEventLoop 做了什麼

上面分析了 EventLoopGroup 的作用是定義了一個執行緒池,建立 EventLoop,而EventLoop 的作用不言而喻,按照 Reactor 模型來理解,大概就是兩件事:監聽連線請求,將事件分發給 handler 處理。下面我們就詳細分析一下 NioEventLoop 的程式碼。

入口就是 newChild()方法,返回的是 EventLoopGroup 的建構函式。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    selector = openSelector();
    selectStrategy = strategy;
}

//父類建構函式
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    //建立了一個指定大小的佇列
    tailTasks = newTaskQueue(maxPendingTasks);
}

//父類的父類建構函式
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

private Selector openSelector() {
    final Selector selector;
    try {
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }

    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
            } catch (ClassNotFoundException e) {
                return e;
            } catch (SecurityException e) {
                return e;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
        if (maybeSelectorImplClass instanceof Exception) {
            Exception e = (Exception) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
        }
        return selector;
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);

                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
    } else {
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", selector);
    }

    return selector;
}

上面是 NioEventLoop 的建構函式,裡面又有呼叫父類的構造器,一般這種我們看到都很頭痛,寫碼一直爽,看碼火葬場,你需要一直跳進跳出。我們上面啟動 Netty 服務的是時候分配了一個 boss 執行緒池,一個 worker 執行緒池,boss 執行緒池啟動只需要一個執行緒,主要負責客戶端的連線請求;而 worker 執行緒池就是用來處理 當前這個連線上所有事件用的,一個 worker 執行緒就是一個 EventLoop,一個 channel 只會被一個 EventLoop 處理,但 一個 EventLoop 可以處理多個 channel。

NioEventLoop 的本質是一個執行緒,那麼這個執行緒是在何時被初始化,又是如何處理連線事件的監聽的呢?至少目前是沒有看到眉目,我們先看一下類結構圖,從父類身上找找關鍵資訊:

SingleThreadEventLoop 的父類 SingleThreadEventExecutor 的建構函式很有意思了,這是一個只有一個執行緒的執行緒池, 先看看其中的幾個變數:

  1. state:執行緒池當前的狀態;
  2. taskQueue:存放任務的佇列;
  3. thread:執行緒池維護的唯一執行緒;
  4. scheduledTaskQueue:定義在其父類AbstractScheduledEventExecutor中,用以儲存延遲執行的任務。

我們先記住這些變數哈,下面會解釋。

因為 EventLoop 本質就是一個執行緒,這個執行緒的初始化在哪呢?往上翻看父類的資訊,不難看出:SingleThreadEventExecutor 類裡面有初始化執行緒的操作,它的初始化過程在 doStartThread()方法中,往上跟蹤初始化的呼叫鏈:


@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}


private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

呼叫的地方是 execute()方法。這個方法是 SingleThreadEventExecutor 對外暴露的唯一介面:

public interface Executor {
    void execute(Runnable command);
}

即所有通過 EventLoop 提交的任務都是通過這一個執行緒來執行。另外上面的父類構造方法中我們看到有佇列的初始化,不難看出,佇列的作用是當有多個事件同時在一個 EventLoop 中待執行的時候,EventLoop 的做法是將任務包裝成物件存放在佇列中然後按照先後順序執行。那麼是不是肯定有個執行的方法呢?比如一個迴圈的取出任務的方法,這個是有的,先看 doStartThread()方法的程式碼:

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
              ...
              ...
              ...
        }
    });
}

程式碼比較多,就趕重要的說,看到在 doStartThread() 方法中執行了一個非同步執行緒,而執行緒中做的事情是呼叫SingleThreadEventExecutor.this.run()方法。這個 run()是何方神聖呢? SingleThreadEventExecutor 中的 run()只是一個抽象方法:

protected abstract void run();

具體的實現在子類,我們看 NioEventLoop 裡面的實現:

@Override
protected void run() {
    for (;;) {
        try {
            // 判斷接下來是是執行select還是直接處理IO事件和執行佇列中的task
            // hasTask判斷當前執行緒的queue中是否還有待執行的任務
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                        // 說明當前queue中沒有task待執行
                    select(wakenUp.getAndSet(false));
                                        // 喚醒epoll_wait
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
                 /* ioRatio調節連線事件和內部任務執行事件百分比
       * ioRatio越大,連線事件處理佔用百分比越大 */
            final int ioRatio = this.ioRatio;
            // 如果比例是100,表示每次都處理完IO事件後,執行所有的task
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();
                                // 處理IO事件
                processSelectedKeys();
                                // 當前時間減去處理IO事件開始的時間就是處理IO事件花費的時間
                final long ioTime = System.nanoTime() - ioStartTime;
                // 執行task的時間taskTime就是ioTime * (100 - ioRatio) / ioRatio
                // 如果taskTime時間到了還有未執行的task,runAllTasks也會返回
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);

            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    }
}

// io.netty.channel.DefaultSelectStrategy#calculateStrategy
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
  // 如果還有task待執行則先執行selectNow,selectNow是立即返回的,不是阻塞等待
  // 如果沒有待執行的task則執行select,有可能是阻塞等待IO事件
  return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}


// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
  @Override
  public int get() throws Exception {
    // epoll_wait的引數timeout可以指定超時時間,selectNow傳入的引數是0,也就是不超時等待立即返回
    return selectNow();
  }
};

上面的 run()方法裡面是一個死迴圈,在執行select()前有一個hasTasks()的操作,這個hasTasks()方法判斷當前 taskQueue 是否有元素。如果 taskQueue 中有元素,執行 selectNow()方法,最終執行selector.selectNow(),該方法會立即返回,保證了 EventLoop 在有任務執行時不會因為 I/O 事件遲遲不來造成延後處理,這裡優先處理 I/O 事件,然後再處理任務。

知識點

這裡插入一個知識點,selectNow() 其實暴露的就是 Java 封裝的 epoll 模型的一部分。具體參考:

java.nio.channels.Selector 類:

public abstract class Selector implements Closeable {

  
    protected Selector() { }

   
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

   
    public abstract boolean isOpen();

  
    public abstract SelectorProvider provider();

    public abstract Set<SelectionKey> keys();

   
    public abstract Set<SelectionKey> selectedKeys();

   
    public abstract int selectNow() throws IOException;

   
    public abstract int select(long timeout)
        throws IOException;

    public abstract int select() throws IOException;

   
    public abstract Selector wakeup();

    public abstract void close() throws IOException;

}

上面三個select方法都呼叫了 lockAndDoSelect,只是 timeout 引數不同,其實最後就是呼叫 epoll_wait 引數不同,epoll_wait 有一個timeout引數,表示超時時間:

  • -1:阻塞
  • 0:立即返回,非阻塞
  • 大於0:指定微秒

詳細的分析限於篇幅就不在這裡說了哈。大家可以下去慢慢看。

如果當前 taskQueue 沒有任務時,就會執行select(wakenUp.getAndSet(false))方法,程式碼如下:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
          /* delayNanos(currentTimeNanos):計算延遲任務佇列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),預設返回1s。每個SingleThreadEventExecutor都持有一個延遲執行任務的優先佇列PriorityQueue,啟動執行緒時,往佇列中加入一個任務。*/
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
          /* 如果延遲任務佇列中第一個任務的最晚還能延遲執行的時間小於500000納秒,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執行次數和標識是否執行過selector.selectNow()),則執行selector.selectNow()方法並立即返回。*/
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

        
            //如果在wakenUp值為true時提交了任務,則該任務沒有機會呼叫
            // Selector#wakeup。因此,我們需要在執行選擇操作之前再次檢查任務佇列。
            //如果不這樣做,則可能會掛起任務,直到選擇操作超時。
            //如果管道中存在IdleStateHandler,則可能要等待直到空閒超時。
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            // 超時阻塞select
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            // 有事件到來 | 被喚醒 | 有內部任務 | 有定時任務時,會返回
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
               
                break;
            }
            if (Thread.interrupted()) {
                
                //執行緒被中斷,因此重置選定的鍵並中斷,這樣我們就不會遇到繁忙的迴圈。
                //由於這很可能是使用者或其客戶端庫的處理程式中的錯誤,因此我們將其記錄下來。
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                                 "Thread.currentThread().interrupt() was called. Use " +
                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            //此處的邏輯就是: 當前時間 - 迴圈開始時間 >= 定時select的時間timeoutMillis,說明已經執行過一次阻塞select()
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 阻塞超時後沒有事件到來,重置selectCnt
                selectCnt = 1;
              // 如果空輪詢的次數大於空輪詢次數閾值 SELECTOR_AUTO_REBUILD_THRESHOLD(512)
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                       selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
                                // 1.首先建立一個新的Selecor
                // 2.將舊的Selector上面的鍵及其一系列的資訊放到新的selector上面。
                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                             selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                         selector, e);
        }
        // Harmless exception - log anyway
    }
}

這個方法解決了Nio中臭名昭著的 BUG:selector 的 select 方法導致空輪詢 cpu100%。對 Selector()方法中的阻塞定時 select(timeMIllinois) 操作的 次數進行統計,每完成一次 select 操作進行一次計數,若在迴圈週期內 發生 N 次空輪詢,如果 N 值大於 BUG 閾值(預設為512),就進行空輪詢 BUG 處理。重建 Selector,判斷是否是其他執行緒發起的重建請求,若不是則將原 SocketChannel 從舊的 Selector 上去除註冊,重新註冊到新的 Selector 上,並將原來的 Selector 關閉。

當java NIO BUG 觸發時,進行 Selector 重建,rebuildSelector 過程如下:

  1. 通過方法 openSelector 建立一個新的 selector;
  2. 將 old selector 的 selectionKey 執行 cancel;
  3. 將 old selector 的 channel 重新註冊到新的 selector 中。

Netty 的連線處理就是 I/O 事件的處理,I/O 事件包括 READ 事件、ACCEPT 事件、WRITE 事件和 OP_CONNECT 事件:

  • ACCEPT 事件:連線建立好之後將該連線的 channel 註冊到 workGroup 中某個 NIOEventLoop 的 selector中;
  • READ 事件:從 channel 中讀取資料,存放到 byteBuf 中,觸發後續的 ChannelHandler 來處理資料;
  • WRITE 事件:正常情況下一般是不會註冊寫事件的,如果 Socket 傳送緩衝區中沒有空閒記憶體時,在寫入會導致阻塞,此時可以註冊寫事件,當有空閒記憶體(或者可用位元組數大於等於其低水位標記)時,再響應寫事件,並觸發對應回撥;
  • CONNECT 事件:該事件是 Client 觸發的,由主動建立連線這一側觸發的。

再把目光從 select() 方法拉回到 run() 方法, 這個死迴圈的終止邏輯是遇到 confirmShutdown() 方法。然後在迴圈裡會詢問是否有事件,如果沒有,則繼續迴圈,如果有事件,那麼就開始處理。

往下看程式碼中有一個欄位:ioRatio,預設值是 50,這個比例是處理 I/O 事件所需的時間和花費在處理 task 時間的比例。即如果花了 5s 去處理 I/O 事件, 那麼也會花 5s 去處理 task 任務。處理 I/O 事件的操作主要是在 processSelectedKeys()方法中:

private void processSelectedKeys() {
  //
  if (selectedKeys != null) {
    processSelectedKeysOptimized(selectedKeys.flip());
  } else {
    processSelectedKeysPlain(selector.selectedKeys());
  }
}

當有了新 I/O 請求進來, JDK 原生的 Selector 將 SelectionKey 放入感興趣的 key 的集合中,而這個集合現在就是 Netty 通過反射的方式強制替換為以陣列為資料結構的selectedKeys

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
  for (int i = 0;; i ++) {
    final SelectionKey k = selectedKeys[i];
    if (k == null) {
      break;
    }
    //陣列輸出空項, 從而允許在channel 關閉時對其進行垃圾回收
    //陣列中當前迴圈對應的keys置空, 這種感興趣的事件只處理一次就行
    selectedKeys[i] = null;
        // 獲取出 attachment,預設情況下就是註冊進Selector時,傳入的第三個引數  this===>   NioServerSocketChannel
    // 一個Selector中可能被繫結上了成千上萬個Channel,  通過K+attachment 的手段, 精確的取出發生指定事件的channel, 進而獲取channel中的unsafe類進行下一步處理
    final Object a = k.attachment();

    if (a instanceof AbstractNioChannel) {
      //進入這個方法, 傳進入 感興趣的key + NioSocketChannel
      processSelectedKey(k, (AbstractNioChannel) a);
    } else {
      @SuppressWarnings("unchecked")
      NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
      processSelectedKey(k, task);
    }
        // 判斷是否需要再次輪詢
    if (needsToSelectAgain) {
   
      for (;;) {
        i++;
        if (selectedKeys[i] == null) {
          break;
        }
        selectedKeys[i] = null;
      }

      selectAgain();
  
      selectedKeys = this.selectedKeys.flip();
      i = -1;
    }
  }
}

selectedKeys 是一個 set,與 selector 繫結,selector 在呼叫 select() 族方法的時候,如果有 I/O 事件發生,就會往 selectedKeys 中塞相應的 selectionKey。而 selectedKeys 內部維護了兩個 SelectionKey[] 陣列,重寫了 set#add 方法,在#add 的時候實際上是往數組裡面塞 SelectionKey。而在遍歷時只用遍歷陣列而不是遍歷set。

處理輪詢到的IO事件也主要是三步:

  1. 取出輪詢到的SelectionKey
  2. 取出與客戶端互動的channel物件,處理channel
  3. 判斷是否需要再次輪詢

上面提到過,一個 EventLoop 是可以處理多個 channel 的,並且保證一個 channel 事件只會在同一個 EventLoop 中被處理,那麼這裡的如何保證同一個 channel 會被某個曾經處理過他的 EventLoop 識別呢?

關鍵就在 SelectionKey,上面看到 a 物件其實就是一個 NioSocketChannel,在 AbstractNioChannel 中有一個#doRegister 方法,這裡將 JDK 的 channel 註冊到 selector 上去,並且將自身設定到 attachment 上。這樣 JDK 輪詢出某條 SelectableChannel 有 I/O 事件發生時,就可以直接取出 AbstractNioChannel 了。

我們繼續看看 processSelectedKey(k, (AbstractNioChannel) a)是如何處理感興趣的事件的:

//當有新連線進來,就會到這裡
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  //netty底層對資料的讀寫都是unsafe完成的。這個unsafe也是和Channel進行唯一繫結的物件
  final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  if (!k.isValid()) {
    final EventLoop eventLoop;
    try {
      eventLoop = ch.eventLoop();
    } catch (Throwable ignored) {
      return;
    }
  
    if (eventLoop != this || eventLoop == null) {
      return;
    }
    // close the channel if the key is not valid anymore
    unsafe.close(unsafe.voidPromise());
    return;
  }
    //上面這一串都是在校驗 key 的合法性
  try {
    int readyOps = k.readyOps();
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
      unsafe.read();
      if (!ch.isOpen()) {
        // Connection already closed - no need to handle write.
        return;
      }
    }
    //處理write事件的flush
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
      // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
      ch.unsafe().forceFlush();
    }
    //處理讀和新連線的accept事件
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
      // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
      // See https://github.com/netty/netty/issues/924
      int ops = k.interestOps();
      ops &= ~SelectionKey.OP_CONNECT;
      k.interestOps(ops);

      unsafe.finishConnect();
    }
  } catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
  }
}

首先在讀寫之前都要先呼叫 finishConnect,來確保與客戶端連線上。這個過程最終會傳遞給 channelHandle r的channelActive 方法,因此可以通過 channelActive 來驗證有多少客戶端線上。

接下來是處理 write 事件的 flush,注意,我們的 write 不是在這裡做的,真正的 write 一般是封裝成 task 去執行的。

最後是處理讀和新連線的 accept 事件。Netty 將新連線的 accept 也當做一次 read。對於 boss NioEventLoop 來說,新連線的 accept 事件在 read 的時候通過他的 pipeline 將連線扔給一個 worker NioEventLoop 處理;而worker NioEventLoop 處理讀事件,是通過他的 pipeline 將讀取到的位元組流傳遞給每個 channelHandler 來處理。
從這裡也可以看出來 Netty 所有關於 I/O 操作都是通過內部的 Unsafe 來實現的。

還記得我們是在哪裡扯到了 I/O操作的 processSelectedKeys方法嘛!感覺在扯遠的道路上越來越遠了。再把視線回到 NioEventLoop 的 #run()方法, I/O 操作都是processSelectedKeys方法來處理,下面還有個runAllTasks方法,它是用於處理封裝好的事件操作的。可以看到 runAllTasks 有個函式是帶了事件引數的,雖然設定了一個可以執行的時間引數,但是實際上 Netty 並不保證能精確的確保非 I/O 任務只執行設定的毫秒,下面來看下 runAllTasks 帶時間引數的程式碼:

/* timeoutNanos:任務執行花費最長耗時*/
protected boolean runAllTasks(long timeoutNanos) {
    // 把scheduledTaskQueue中已經超過延遲執行時間的任務移到taskQueue中等待被執行。
    fetchFromScheduledTaskQueue();

    // 非阻塞方式pollTask
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 執行task
        safeExecute(task);
        runTasks ++;
        // 依次從taskQueue任務task執行,每執行64個任務,進行耗時檢查。
        // 如果已執行時間超過預先設定的執行時間,則停止執行非IO任務,避免非IO任務太多,影響IO任務的執行。
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

NioEventLoop 執行 task 的過程,同樣可以分成幾步:

  1. 從 scheduledTaskQueue 轉移定時任務到 taskQueue;
  2. 計算本次任務迴圈的截止時間;
  3. 執行任務;
  4. 執行完任務後的工作。

從上面可以看到 NioEventLoop 中至少有兩種佇列,taskQueuescheduledTaskQueue

EventLoop 是一個 Executor,因此使用者可以向 EventLoop 提交 task。在 execute 方法中,當 EventLoop 處於迴圈中或啟動了迴圈後都會通過 addTask(task)向 EventLoop 提交任務。SingleThreadEventExecutor 內部使用一個 taskQueue 將task 儲存起來。

taskQueue最大的應用場景就是使用者在 channelHandler 中獲取到 channel,然後通過 channel.write() 資料,這裡會把 write 操作封裝成一個 WriteTask,然後通過 eventLoop.execute(task) 執行,實際上是給 EventLoop 提交了一個 task,加入到 taskQueue 佇列中。

同時,EventLoop也是一個ScheduledExecutorService,這意味著使用者可以通過ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)方法向EventLoop提交定時任務。因此,EventLoop內部也維護了一個優先順序佇列scheduledTaskQueue來儲存提交的定時任務。

知道了NioEventLoop內部的任務佇列後,再來看執行task的過程。

第一步,是將到期的定時任務轉移到taskQueue中,只有在當前定時任務的截止時間已經到了,才會取出來。

然後第二步計算本次任務迴圈的截止時間deadline。

第三步真正去執行任務,先執行task的run方法,然後將runTasks加一,每執行完64(0x3F)個任務,就判斷當前時間是否超過deadline,如果超過,就break,如果沒有超過,就繼續執行。

需要注意的是,這裡如果任務沒執行完break掉了,afterRunningAllTasks後,NioEventLoop就會重新開始一輪新的迴圈,沒完成的任務仍然在taskQueue中,等待runAllTasks的時候去執行。

最後一步是afterRunningAllTasks,執行完所有任務後需要進行收尾,相當於一個鉤子方法,可以作統計用。
最後總結一下處理任務佇列的task的過程就是:

eventLoop是一個Executor,可以呼叫execute給eventLoop提交任務,NioEventLoop會在runAllTasks執行。NioEventLoop內部分為普通任務和定時任務,在執行過程中,NioEventLoop會把過期的定時任務從scheduledTaskQueue轉移到taskQueue中,然後執行taskQueue中的任務,同時每隔64個任務檢查是否該退出任務迴圈。

4. EventLoop 如何繫結 channel

上面的長篇大論其實只是分析了 Netty 初始化一個 Reactor 執行緒是多麼的艱難。考慮了太多的事情。我們一開頭寫了一個Demo,到現在為止都沒有分析到客戶端和服務端啟動的時候是如何將 Reactor 執行緒和 channel 繫結起來的,即啟動的時候如何將一個 SocketChannel 繫結到 work thread 上。所以我們還是從啟動過程分析一下,走一遍總體流程。

客戶端啟動

再重溫一下客戶端啟動程式碼:

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
  .channel(NioSocketChannel.class)
  .handler(new ClientChannelInitializer());
try {
  ChannelFuture future = bootstrap.connect(address,port).sync();
  future.channel().writeAndFlush("Hello world, i'm online");
  future.channel().closeFuture().sync();
} catch (Exception e) {
  log.error("client start fail",e);
}finally {
  group.shutdownGracefully();
}

EventLoopGroup 就不用解釋了,初始化了一個啟動執行緒池。下面的 Bootstrap 是Netty 封裝的啟動類,通過一連串的鏈式呼叫繫結 Selector 執行緒,啟動指定型別的SocketChannel 和 初始化處理邏輯。

初始化好啟動資訊之後呼叫 connect()進行連線:

public ChannelFuture connect(SocketAddress remoteAddress) {
  if (remoteAddress == null) {
    throw new NullPointerException("remoteAddress");
  }

  validate();
  return doResolveAndConnect(remoteAddress, config.localAddress());
}

首先是校驗一下必傳引數是否存在,埠、IP 以及 handler 資訊是否初始化。下面的 doResolveAndConnect方法就是連線的主要邏輯:

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
  //完成channel 的初始化和註冊
  final ChannelFuture regFuture = initAndRegister();
  final Channel channel = regFuture.channel();
    //註冊成功直接返回
  if (regFuture.isDone()) {
    if (!regFuture.isSuccess()) {
      return regFuture;
    }
    return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
  } else {
    // 如果註冊還在進行中,需要向future物件新增一個監聽器,以便在註冊成功的時候做一些工作,監聽器實際上就是一個回撥物件
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        // Direclty obtain the cause and do a null check so we only need one volatile read in case of a
        // failure.
        Throwable cause = future.cause();
        if (cause != null) {
          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
          // IllegalStateException once we try to access the EventLoop of the Channel.
          promise.setFailure(cause);
        } else {
          // Registration was successful, so set the correct executor to use.
          // See https://github.com/netty/netty/issues/2586
          promise.registered();
          // 註冊成功後仍然呼叫doResolveAndConnect0方法完成連線建立的過程
          doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
        }
      }
    });
    return promise;
  }
}

這個方法其實只做了兩個事情:

  • 初始化一個 Channel 物件並註冊到 EventLoop 中;
  • 呼叫 doResolveAndConnect0() 方法完成 tcp 連線的建立。

繼續看初始化 channel 的過程:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
      //使用工廠類ChannelFactory的newChannel通過反射建立Channel例項
        channel = channelFactory.newChannel();
      //呼叫init方法執行初始化操作
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // 若在建立例項和初始化期間丟擲異常,建立DefaultChannelPromise例項,寫入異常並返回
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
        //呼叫EventLoopGroup的register方法,完成註冊操作
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

         /** 如果程式到這裡,說明promise沒有失敗,可能發生以下情況之一 
         * 1) 如果嘗試將Channel註冊到EventLoop,且此時註冊已經完成;inEventLoop返回true,channel已經成功注
         *    冊,可以安全呼叫bind() or connect()
         * 2) 如果嘗試註冊到另一個執行緒,即inEventLoop返回false,則此時register請求已成功新增到事件迴圈的任務隊
         *    列中,現在同樣可以嘗試bind()或connect(),因為bind()或connect()會被排程在執行register 
         *    Task之後執行, 因為register(),bind()和connect()都被繫結到同一個I/O執行緒。
         */
    return regFuture;
}

channel 的註冊過程主要就在上面的一句程式碼中:

ChannelFuture regFuture = config().group().register(channel);

跟著register()往下走,可以跟到一段程式碼:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  ...
  ...
  ...
  AbstractChannel.this.eventLoop = eventLoop;

  if (eventLoop.inEventLoop()) {
    register0(promise);
  } else {
    try {
      eventLoop.execute(new Runnable() {
        @Override
        public void run() {
          register0(promise);
        }
      });
    } catch (Throwable t) {
      logger.warn(
        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
        AbstractChannel.this, t);
      closeForcibly();
      closeFuture.setClosed();
      safeSetFailure(promise, t);
    }
  }
}

這裡就是從 EventLoopGroup 中拿到特定的 EventLoop,如何分配的過程上面已經有分析,就是呼叫 NioEventLoop 的 next()方法。判斷 NioEventLoop 的執行緒是否已經啟動,如果已經啟動,呼叫 register0方法;否則呼叫 eventLoop.execute 方法啟動執行緒。

再跟一下 register0(promise)的程式碼:

private void register0(ChannelPromise promise) {
  try {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
      return;
    }
    boolean firstRegistration = neverRegistered;
    //呼叫JDK去註冊Selector
    doRegister();
    neverRegistered = false;
    registered = true;
    //設定註冊成功通知監聽器
    safeSetSuccess(promise);
    //觸發註冊成功事件
    pipeline.fireChannelRegistered();
    //如果是第一次則觸發啟用成功事件
    if (firstRegistration && isActive()) {
      pipeline.fireChannelActive();
    }
  } catch (Throwable t) {
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
  }
}

在這裡呼叫 JDK NIO 去註冊 Selector,設定註冊成功的監聽事件。

這裡是不是就把上面分析的 EventLoop 的 execute 方法和 啟動聯絡起來了,通過execute 來執行task。

initAndRegister方法的主要過程就分析完了,其實主要有三點:

  1. 建立了一個NioServerSocketChannel物件;
  2. 為NioServerSocketChannel對應的ChannelPipeLine增加了一個ServerBootstrapAcceptor處理器,用來處理新的連線;
  3. 從NioEventLoopGroup中分配了一個NioEventLoop,用於監聽NioServerSocketChannel通道上的 I/O 事件。

客戶端的啟動就分析完成,工作量還是不少哈。

Server 端啟動

還是先看一下 Server 啟動的程式碼:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ServerChannelInitializer());

try {
  ChannelFuture future = server.bind(port).sync();
  future.channel().closeFuture().sync();
} catch (InterruptedException e) {
  log.error("server start fail",e);
}finally {
  bossGroup.shutdownGracefully();
  workGroup.shutdownGracefully();
}

這裡看到有個區別是:客戶端啟動是通過 Bootstrap 啟動類來實現,呼叫 connect()方法,服務端啟動是ServerBootstrap啟動類來實現,呼叫 bind()方法。

public ChannelFuture bind(SocketAddress localAddress) {
  validate();
  if (localAddress == null) {
    throw new NullPointerException("localAddress");
  }
  return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
  final ChannelFuture regFuture = initAndRegister();
  final Channel channel = regFuture.channel();
  if (regFuture.cause() != null) {
    return regFuture;
  }

  if (regFuture.isDone()) {
    // At this point we know that the registration was complete and successful.
    ChannelPromise promise = channel.newPromise();
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
  } else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
          // Registration o