1. 程式人生 > >Bootstrap初始化過程原始碼分析--netty客戶端的啟動

Bootstrap初始化過程原始碼分析--netty客戶端的啟動

Bootstrap初始化過程

netty的客戶端引導類是Bootstrap,我們看一下spark的rpc中客戶端部分對Bootstrap的初始化過程

TransportClientFactory.createClient(InetSocketAddress address)

只需要貼出Bootstrap初始化部分的程式碼

// 客戶端引導物件
Bootstrap bootstrap = new Bootstrap();
// 設定各種引數
bootstrap.group(workerGroup)
  .channel(socketChannelClass)
  // Disable Nagle's Algorithm since we don't want packets to wait
  // 關閉Nagle演算法
  .option(ChannelOption.TCP_NODELAY, true)
  .option(ChannelOption.SO_KEEPALIVE, true)
  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
  .option(ChannelOption.ALLOCATOR, pooledAllocator);

// socket接收緩衝區
if (conf.receiveBuf() > 0) {
  bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}

// socket傳送緩衝區
// 對於接收和傳送緩衝區的設定應該用如下的公式計算:
// 延遲 *頻寬
// 例如延遲是1ms,頻寬是10Gbps,那麼緩衝區大小應該設為1.25MB
if (conf.sendBuf() > 0) {
  bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
}

final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();

// 設定handler(處理器物件)
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  @Override
  public void initChannel(SocketChannel ch) {
    TransportChannelHandler clientHandler = context.initializePipeline(ch);
    clientRef.set(clientHandler.getClient());
    channelRef.set(ch);
  }
});

// Connect to the remote server
long preConnect = System.nanoTime();
// 與服務端建立連線,啟動方法
ChannelFuture cf = bootstrap.connect(address);

分為幾個主要的步驟:

  • 首先建立一個Bootstrap物件,呼叫的是無參構造器
  • 設定各種引數,如通道型別,關閉Nagle演算法,接收和傳送緩衝區大小,設定處理器
  • 呼叫connect與服務端建立連線

接下來,我們主要通過兩條線索來分析Bootstrap的啟動過程,即構造器和connect兩個方法,而對於設定引數的過程僅僅是給內部的一些成員變數賦值,所以不需要詳細展開。

Bootstrap.Bootstrap()

Bootstrap繼承了AbstractBootstrap,看了一下他們的無參構造方法,都是個空方法。。。。。。所以這一步,我們就省了,瞬間感覺飛起來了有沒有^_^

Bootstrap.connect(SocketAddress remoteAddress)

public ChannelFuture connect(SocketAddress remoteAddress) {
    // 檢查非空
    ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
    // 同樣是對一些成員變數檢查非空,主要檢查EventLoopGroup,ChannelFactory,handler物件
    validate();
    return doResolveAndConnect(remoteAddress, config.localAddress());
}

主要是做了一些非空檢查,需要注意的是,ChannelFactory物件的設定,前面的spark中在對Bootstrap初始化設定的時候呼叫了.channel(socketChannelClass)方法,這個方法如下:

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

建立了一個ReflectiveChannelFactory物件,並賦值給內部的channelFactory成員。這個工廠類會根據傳進來的Class物件通過反射建立一個Channel例項。

doResolveAndConnect

從這個方法的邏輯中可以看出來,建立一個連線的過程分為兩個主要的步驟;

  • 初始化一個Channel物件並註冊到EventLoop中
  • 呼叫doResolveAndConnect0方法完成tcp連線的建立

值得注意的是,initAndRegister方法返回一個Future物件,這個型別通常用於非同步機制的實現。在這裡,如果註冊沒有立即成功的話,會給返回的futrue物件新增一個監聽器,在註冊成功以後建立tcp連線。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 初始化一個Channel物件並註冊到EventLoop中
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        // 如果註冊失敗,世界返回失敗的future物件
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {// 如果註冊還在進行中,需要向future物件新增一個監聽器,以便在註冊成功的時候做一些工作,監聽器實際上就是一個回撥物件
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    // 註冊成功後仍然呼叫doResolveAndConnect0方法完成連線建立的過程
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }

initAndRegister

仍然分為兩個步驟:

  • 通過channel工廠類建立一個channel物件,通過反射獲取指定的channel型別的無參構造器,呼叫構造器來建立物件
  • 呼叫init方法對channel物件進行初始化,init方法是一個抽象方法,Bootstrap和ServerBootstrap的實現不同
  • 將channel註冊到EventLoopGroup中

注意看原始碼中的一段註釋,這段註釋對netty的執行緒模型的理解很有幫助,大致意思是說:

  • 如果當前的程式碼是在EventLoopEvent執行緒中執行的,那麼程式碼執行到這裡說明channel已經成功註冊到EventLoopEvent上了,此時再呼叫bind() 或 connect()方法肯定是沒有問題的
  • 如果當前程式碼不是在EventLoopEvent執行緒中執行的,也就是說當前執行緒是另外的執行緒,在這裡繼續呼叫bind() 或 connect()方法仍然是安全的,並不會由於併發引起方法執行順序的錯亂,原因是netty中一個channel只會繫結到一個執行緒上,所有關於這個channel的操作包括註冊,bind或connect都會以排隊任務的形式在一個執行緒中序列執行,這種做法也為netty規避了很多執行緒安全問題,從而減少了很多加鎖,同步的程式碼,減少了執行緒之間的競爭資源導致的執行緒切換,側面上提高了執行緒執行效率。

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通過channel工廠類建立一個channel物件
channel = channelFactory.newChannel();
// 呼叫init方法對channel進行一些初始化的設定
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

    // 註冊到EventLoopGroup中
    ChannelFuture regFuture = config().group().register(channel);
    // 如果發生異常,需要關閉已經建立的連線
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
    
    return regFuture;
}

NioSocketChannel初始化

DEFAULT_SELECTOR_PROVIDER是預設的SelectorProvider物件,這時jdk中定義的一個類,主要作用是生成選擇器selector物件和通道channel物件

public NioSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

newSocket中通過呼叫provider.openSocketChannel()方法建立了一個SocketChannel物件,它的預設實現是SocketChannelImpl。
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

然後經過幾次呼叫,最後呼叫了下面的構造器,首先呼叫了父類AbstractNioByteChannel的構造器,
然後建立了一個SocketChannelConfig物件,這個類有點類似於門面模式,對NioSocketChannel物件和Socket物件的一些引數設定和獲取的介面進行封裝。
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

我們在接著看父類AbstractNioByteChannel的構造方法

AbstractNioByteChannel(Channel parent, SelectableChannel ch)

沒有做任何工作,直接呼叫了父類的構造方法,注意這裡多了一個引數SelectionKey.OP_READ,這個引數表示channel初始時的感興趣的事件,channel剛建立好之後對read事件感興趣
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}

AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp)

主要還是呼叫父類的構造方法

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 父類構造方法
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // 設定非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            // 如果發生異常,關閉該channel
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

AbstractChannel(Channel parent)

最關鍵的初始化邏輯在這個最頂層的基類中,其中很重的兩個物件Unsafe物件和ChannelPipeline物件,前者封裝了jdk底層api的呼叫,後者是實現netty對事件的鏈式處理的核心類。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 建立一個ChannelId物件,唯一標識該channel
    id = newId();
    // Unsafe物件,封裝了jdk底層的api呼叫
    unsafe = newUnsafe();
    // 建立一個DefaultChannelPipeline物件
    pipeline = newChannelPipeline();
}

小結

前面一小節,我們主要簡單分析了一下NioSocketChannel的初始化過程,可以看到最主要的邏輯在AbstractChannel的構造方法中,這裡我們看到了兩個重要的類的建立過程。

Bootstrap.init

回到AbstractBootstrap.initAndRegister方法中,在完成通過反射呼叫NioSocketChannel構造方法並建立一個例項後,緊接著就要對這個新建立的Channel例項進行初始化設定工作,我們看一下Bootstrap對新建立的Channel的初始化過程:

  • 向channel的Pipeline中新增一個處理器,ChannelPipeline我們可以理解為一個流水線,在這條流水線上有各種各樣的處理器,一個channel事件產生後會在這個流水線上進行傳播,依次經過所有的處理器
  • 設定引數,也就是以ChannelOption為key的一些引數,可以通過DefaultChannelConfig.setOption方法看到具體可以設定哪些引數。
  • 設定屬性

    void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    // 向ChannelPipeline中新增一個處理器,這個處理器就是我們之前設定的處理器
    p.addLast(config.handler());

      final Map<ChannelOption<?>, Object> options = options0();
      // 設定引數,最終是通過呼叫SocketChannelConfig的一些引數設定介面設定引數
      synchronized (options) {
          setChannelOptions(channel, options, logger);
      }
    
      final Map<AttributeKey<?>, Object> attrs = attrs0();
      // 設定屬性
      synchronized (attrs) {
          for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
              channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
          }
      }

    }

MultithreadEventLoopGroup.register

在完成channel的建立和初始化之後,我們就要將這個channel註冊到一個EventLoop中,NioNioEventLoop繼承自MultithreadEventLoopGroup, 通過呼叫SingleThreadEventLoop的register方法完成註冊

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

可以看到,通過next()方法選出了其中的一個EventLoop進行註冊。MultithreadEventLoopGroup是對多個真正的EventLoopGroup的封裝,每個實現了實際功能的真正的EventLoopGroup執行在一個執行緒內,
所以我們接下來應該看單個的EventLoopGroup的註冊方法。

SingleThreadEventLoop.register

這裡建立了一個DefaultChannelPromise物件,用於作為返回值。

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

最終呼叫了Unsafe的register方法將channel繫結到當前的EventLoopGroup物件上。
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

AbstractChannel.AbstractUnsafe.register

  • 首先是做一些前置檢查,包括變數非空檢查,重複註冊檢查,檢查channel型別和EventLoopGroup型別是否匹配
  • 將這個channel繫結到指定的eventLoop物件上,
  • 呼叫register0完成註冊

      public final void register(EventLoop eventLoop, final ChannelPromise promise) {
          // 做一些非空檢查
          if (eventLoop == null) {
              throw new NullPointerException("eventLoop");
          }
          // 如果重複註冊,通過future物件丟擲一個異常
          // 一個channel只能註冊到一個EventLoopGroup物件上
          if (isRegistered()) {
              promise.setFailure(new IllegalStateException("registered to an event loop already"));
              return;
          }
          // 檢查channel型別和EventLoopGroup型別是否匹配
          if (!isCompatible(eventLoop)) {
              promise.setFailure(
                      new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
              return;
          }
    
          // 將channel內部的eventLoop成員設定為相應的物件
          // 也就是將這個channel繫結到指定頂eventLoop上
          AbstractChannel.this.eventLoop = eventLoop;
    
          // 這裡做了一個判斷,如果當前處於eventLoop對應的執行緒內,那麼直接執行程式碼
          // 如果當前執行的執行緒與eventLoop不是同一個,那麼將這個註冊的任務新增到eventLoop的任務佇列中
          if (eventLoop.inEventLoop()) {
              register0(promise);
          } else {
              try {
                  eventLoop.execute(new Runnable() {
                      @Override
                      public void run() {
                          register0(promise);
                      }
                  });
              } catch (Throwable t) {
                  logger.warn(
                          "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                          AbstractChannel.this, t);
                  closeForcibly();
                  closeFuture.setClosed();
                  safeSetFailure(promise, t);
              }
          }
      }

AbstractChannel.AbstractUnsafe.register0

這個方法實現了實際的註冊邏輯,

  • 依然要做一些前置的設定和檢查工作,包括在註冊過程中不可取消,檢查channel是否存活,
  • 呼叫jdk的api完成註冊。例如,對於jdk Nio的通道的註冊就是呼叫SelectableChannel.register(Selector sel, int ops, Object att)
  • 呼叫所有的已新增的處理器節點的ChannelHandler.handlerAdded方法,實際上這也會呼叫handler.handlerRemoved方法,如果在此之前有handler被移除掉的話
  • 通知future物件已經註冊成功了
  • 觸發一個channel註冊成功的事件,這個事件會在pipeline中傳播,所有註冊的handler會依次接收到該事件並作出相應的處理
  • 如果是第一次註冊,還需要觸發一個channel存活的事件,讓所有的handler作出相應的處理

      private void register0(ChannelPromise promise) {
          try {
              // check if the channel is still open as it could be closed in the mean time when the register
              // call was outside of the eventLoop
              // 將ChannelPromise設定為不可取消,並檢查channel是否還存活,通過內部的jdk的channel檢查是否存活
              if (!promise.setUncancellable() || !ensureOpen(promise)) {
                  return;
              }
              // 是否第一次註冊,
              // TODO 說明情況下會註冊多次??
              boolean firstRegistration = neverRegistered;
              // 完成實際的註冊,即底層api的呼叫
              // 如果對於jdk Nio的通道的註冊就是呼叫SelectableChannel.register(Selector sel, int ops, Object att)
              doRegister();
              // 更新標誌變數
              neverRegistered = false;
              registered = true;
    
              // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
              // user may already fire events through the pipeline in the ChannelFutureListener.
              // 呼叫所有的已新增的處理器節點的ChannelHandler.handlerAdded方法
              pipeline.invokeHandlerAddedIfNeeded();
    
              // 通過future物件已經註冊成功了
              safeSetSuccess(promise);
              // 觸發一個channel註冊成功的事件,這個事件會在pipeline中傳播,
              // 所有註冊的handler會依次接收到該事件並作出相應的處理
              pipeline.fireChannelRegistered();
              // Only fire a channelActive if the channel has never been registered. This prevents firing
              // multiple channel actives if the channel is deregistered and re-registered.
              if (isActive()) {
                  if (firstRegistration) {
                      // 如果是第一次註冊,還需要觸發一個channel存活的事件,讓所有的handler作出相應的處理
                      pipeline.fireChannelActive();
                  } else if (config().isAutoRead()) {
                      // This channel was registered before and autoRead() is set. This means we need to begin read
                      // again so that we process inbound data.
                      //
                      // See https://github.com/netty/netty/issues/4805
                      // 開始接收讀事件
                      // 對於Nio型別的channel, 通過呼叫jdk的相關api註冊讀事件為感興趣的事件
                      beginRead();
                  }
              }
          } catch (Throwable t) {
              // Close the channel directly to avoid FD leak.
              closeForcibly();
              closeFuture.setClosed();
              safeSetFailure(promise, t);
          }
      }

小結

到此,我們就完成了對channel的建立,初始化,和註冊到EventLoop過程的分析,整個過程看下來,其實並不複雜,只不過程式碼的巢狀比較深,繼承結構複雜,有些簡單的功能可能要看好幾層才能找到真正實現的地方,所以還需要耐心和熟悉。這裡,我把主幹邏輯再提煉一下,去掉所有細枝末節的邏輯,一遍能有一個整體的認識:

  • 首先通過反射建立了一個NioSocketChannel(通過反射呼叫無參構造器)
  • 然後對channel物件進行初始化,主要是想這個channel的ChannelPipeline中新增使用者設定的handler
  • 最後將這個channel註冊到一個EventLoop上,註冊過程設計jdk底層的selector註冊api的呼叫,呼叫handler的回撥方法,在channelPipeline中觸發一個channel註冊的事件,這些事件最終回撥各個handler物件的channelRegistered方法。

接下來,我們回到Bootstrap.doResolveAndConnect方法中,繼續完成建立連線的過程的分析。

Bootstrap.doResolveAndConnect0

連線的建立在方法doResolveAndConnect0中實現:

這個方法的主要工作就是對遠端地址進行解析,比如通過dns伺服器對域名進行解析,
然後使用解析後的地址進行連線的建立,連線建立呼叫doConnect方法

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                           final SocketAddress localAddress, final ChannelPromise promise) {
    try {
        final EventLoop eventLoop = channel.eventLoop();
        // 獲取一個地址解析器
        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        // 如果解析器不支援該地址或者改地址已經被解析過了,那麼直接開始建立連線
        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
            // Resolver has no idea about what to do with the specified remote address or it's resolved already.
            doConnect(remoteAddress, localAddress, promise);
            return promise;
        }

        // 對遠端地址進行解析
        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {
            final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {
                // Failed to resolve immediately
                channel.close();
                promise.setFailure(resolveFailureCause);
            } else {
                // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                // 解析成功後進行連線
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        // Wait until the name resolution is finished.
        // 給future物件新增一個回撥,採用非同步方法進行連線,
        resolveFuture.addListener(new FutureListener<SocketAddress>() {
            @Override
            public void operationComplete(Future<SocketAddress> future) throws Exception {
                if (future.cause() != null) {
                    channel.close();
                    promise.setFailure(future.cause());
                } else {
                    doConnect(future.getNow(), localAddress, promise);
                }
            }
        });
    } catch (Throwable cause) {
        promise.tryFailure(cause);
    }
    return promise;
}

Bootstrap.doConnect

呼叫channel的connect方法完成連線過程。
也許是之前看scala程式碼習慣了,回過頭來看java程式碼感覺很冗餘,一大堆程式碼就表達了那一點邏輯,感覺資訊密度太低,現在有很多人認為java會漸漸的沒落,而最優可能取代java的語言中,scala絕對是強有力的競爭者之一,沒有對比就沒有傷害,跟java比,scala語言真的是簡潔太多了,幾句話就能把所要表達的邏輯精準而又直接地表達出來。好像向宣告式程式設計更靠近了一點。

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                // 呼叫 channel.connect方法完成連線
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

AbstractChannel.connect

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

DefaultChannelPipeline.connect

這裡稍微說明一下,tail是整個鏈條的尾節點,如果對netty比較熟悉的話,應該知道netty對於io事件的處理採用責任鏈的模式,即使用者可以設定多個處理器,這些處理器組成一個鏈條,io事件在這個鏈條上傳播,被特定的一些處理器所處理,而其中有兩個特殊的處理器head和tail,他們分別是這個鏈條的頭和尾,他們的存在主要是為了實現一些特殊的邏輯。

public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

AbstractChannelHandlerContext.connect

中間經過幾個呼叫之後,最終呼叫該方法。這裡有一句關鍵程式碼findContextOutbound(MASK_CONNECT),這個方法的程式碼我就不貼了,大概說一下它的作用,更為具體的機制等後面分析Channelpipeline是在詳細說明。這個方法會在處理器鏈中從後向前遍歷,直到找到能夠處理connect事件的處理器,能否處理某種型別的事件是通過位元位判斷的,每個AbstractChannelHandlerContext物件內部有一個int型變數用於儲存標誌各種型別事件的位元位。一般,connect事件會有頭結點head來處理,也就是DefaultChannelPipeline.HeadContext類,所以我們直接看DefaultChannelPipeline.HeadContext.connect方法

public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    // 找到下一個能夠進行connect操作的,這裡用位元位來標記各種不同型別的操作,
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 呼叫AbstractChannelHandlerContext.invokeConnect
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

DefaultChannelPipeline.HeadContext.connect

public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

unsafe物件的賦值:

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, HeadContext.class);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }

所以我們直接看unsafe.connect

AbstractNioChannel.connect

主要邏輯:

  • 狀態檢查,非空檢查
  • 呼叫doConnect方法進行連線
  • 如果立即就連線成功了,那麼將future物件設定為成功
  • 如果超時大於0,會提交一個延遲排程的任務,在超時時間到達後執行這個任務檢查是否連線成功,如果為連線成功連線說明連線超時,需要關閉通道
  • 向future物件新增一個回撥,在future被外部呼叫者取消時將通道關閉

可見建立連線的核心方法是doConnect,這是一個抽象方法,我們看NioSocketChannel,也就是tcp連線的建立過程,檢視AbstractNioChannel的實現類發現還有UDP,SCTP等協議

public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        // 檢查promise狀態,channel存活狀態
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }

        try {
            // 防止重複連線
            if (connectPromise != null) {
                // Already a connect in process.
                throw new ConnectionPendingException();
            }

            boolean wasActive = isActive();
            // 呼叫doConnect方法進行連線
            if (doConnect(remoteAddress, localAddress)) {
                // 如果立即就連線成功了,那麼將future物件設定為成功
                fulfillConnectPromise(promise, wasActive);
            } else {
                connectPromise = promise;
                requestedRemoteAddress = remoteAddress;

                // Schedule connect timeout.
                int connectTimeoutMillis = config().getConnectTimeoutMillis();
                // 如果超時大於0,那麼會在超時到達後檢查是否連線成功
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                            ConnectTimeoutException cause =
                                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
                            // 如果connectPromise能夠標記為失敗,說明此時還沒有連線成功,也就是連線超時了
                            // 此時需要關閉該通道
                            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                close(voidPromise());
                            }
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }

                // 向future物件新增一個回撥,在future被外部呼叫者取消時將通道關閉
                promise.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isCancelled()) {
                            if (connectTimeoutFuture != null) {
                                connectTimeoutFuture.cancel(false);
                            }
                            connectPromise = null;
                            close(voidPromise());
                        }
                    }
                });
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }

NioSocketChannel.doConnect

  • 首先繫結指定的本地地址
  • 呼叫SocketUtils.connect建立連線

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    // 繫結指定的本地地址
    if (localAddress != null) {
    doBind0(localAddress);
    }

      // 這個變數標記建立連線的動作是否發起成功
      // 成功發起建立連線的工作並不表示連線已經成功建立
      boolean success = false;
      try {
          // 實際建立連線的語句
          boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
          if (!connected) {
              selectionKey().interestOps(SelectionKey.OP_CONNECT);
          }
          success = true;
          // 返回連線是否已經成功建立
          return connected;
      } finally {
          if (!success) {
              doClose();
          }
      }

    }

SocketUtils.connect

可以看到,最終是通過呼叫jdk的api來實現連線的建立,也就是SocketChannel.connect方法

public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
        throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws IOException {
                // 呼叫jdk api建立連線,SocketChannel.connect
                return socketChannel.connect(remoteAddress);
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

總結

一句話,這程式碼是真的很深! 非常不直接,初次看的話,如果沒有一個程式碼框架圖在旁邊參考,很容易迷失在層層的繼承結構中,很多程式碼層層呼叫,真正有用的邏輯隱藏的很深,所以看這中程式碼必須要有耐心,有毅力,要有打破砂鍋問到底的決心。不過這樣的複雜的程式碼結構好處也是顯而易見的,那就是良好的擴充套件性,你可以在任意層級進行擴充套件。

總結一下建立連線的過程,我認為可以歸結為三個主要的方面:

  • 第一, 實際建立邏輯的程式碼肯定還是jdk api
  • 第二,這麼多方法呼叫,主要的作用就是迎合框架的要求,本質上是為了程式碼的擴充套件性,比如ChannelPipeline的處理器鏈
  • 第三,另一個主要的工作就是對future物件的處理,這時實現非同步的重要手段,future物件也是外部呼叫者和物件內部狀態之間的連線紐帶,呼叫者通過future物件完成一些功能,如查狀態,發出取消動作,實現阻塞等待等。