1. 程式人生 > >【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一)

【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一)

目錄

RPC上下文TransportContext

RPC配置TransportConf

RPC客戶端工廠TransprotClientFactory

RPC服務端TransportServer


作為一個分散式計算引擎,既然是分散式,那麼網路通訊是肯定少不了的,在Spark中很多地方都涉及到了網路通訊,各個元件之間訊息傳輸、使用者檔案和資源的上傳、Shuffle過程、Block的資料複製與備份等等,都少不了網路通訊。

在Spark2.X之前,Spark元件通訊使用Akka,使用者檔案和資源的上傳是基於Jetty實現的HttpFileServer,Shuffle過程、Block的資料複製與備份是基於Netty實現的。Spark2.0版本之後,Spark放棄了Akka和Jetty,各個元件之間訊息傳輸、使用者檔案和資源的上傳、Shuffle過程、Block的資料複製與備份等等統一採用Spark的RPC框架NettyStreamManager,將通訊框架統一起來了。

RPC上下文TransportContext

org.apache.spark.network.TransportContext,傳輸上下文,其主要作用是建立TransportServer,TransportClientFactory和使用TransportChannelHandler設定Netty Channel pipelines(Netty的通訊管道)。

TransportClient 提供了兩個通訊協議,分別是RPC控制層和資料層。RPC的處理是在TransportContext之外的,它負責設定流,這個流以zero-copy IO的形式與資料塊進行通訊傳輸。

TransportServer和TransportClientFactory都為每一個channel建立一個TransportChannelHandler。每一個TransportChannelHandler都包含一個TransportClient,可以使得服務程序通過現有的channel傳送訊息給客戶端。

下面是TransportContext的原始碼分析:

TransportContext中包含TransportConf、RpcHandler、MessageEncoder、MessageDecoder和一個建立TransportChannelHandler時使用的closeIdleConnections(布林型)屬性;兩個構造方法,TransportConf、RpcHandler是必須傳入TransportContext中的,closeIdleConnections選填。如下圖所示:

TransportContext的成員屬性和構造方法

TransportContext的主要作用之一就是建立TransprotClientFactory,建立TransportClientFactory時需要傳入TransportClientBootstrap列表。

建立TransprotClientFactory

TransportContext另一個主要作用就是建立TransportServer,RPC框架的服務端,建立服務端時可以設定指定的埠,也可以指定特定的IP地址+埠,當然也可以既不指定IP地址也不指定埠(此時,埠預設為0)。建立TransportServer時還需要傳入一個TransportServerBootstrap列表。

建立TransportServer

TransportContext的最後一個作用就是建立TransportChannelHandler,並使用TransportChannelHandler設定Netty Channel pipelines(Netty的通訊管道)。初始化Netty通訊管道,設定編碼/解碼,TransportChannelHandler還進行傳送/接收訊息處理。TransportChannelHandler包含TransportClient,在此channel上進行通訊,與此channel直接關聯,確保所有使用者使用channel時得到的是同一個TrasportClient物件。

建立TransportChannelHandler

RPC配置TransportConf

TransportContext中包含org.apache.spark.network.util.TransportConf,TransportConf提供整個RPC框架的配置資訊。TransprotConf有兩個主要的成員屬性,分別是配置提供者conf和模組配置名稱module,還有一些關鍵配置資訊的KEY,根據這些KEY,TransportConf為RPC框架提供相應的API可獲取配置資訊。

模組配置名稱module與getConfKey(String suffix)方法結合,為關鍵配置資訊的KEY賦值,賦值的格式是:

"spark." + module + "." + suffix,suffix是具體的字尾

 具體實現是:

module的使用

另一個主要的成員屬性就是配置提供者ConfigProvider conf,根據配置資訊的KEY從配置提供者ConfigProvider conf中得到具體的配置資訊。

org.apache.spark.network.util.ConfigProvider是一個抽象類,有一個get抽象方法,其他的get、getInt、getLong、getDouble、getBoolean具體方法都是基於這個抽象方法進行的型別轉換。

ConfigProvider的實現

Spark中使用org.apache.spark.network.netty.SparkTransportConf來建立TransportConf。fromSparkConf方法構建了TransportConf,該方法需要傳遞三個引數SparkConf、模組名module和可用核心數numUsableCores(預設為0)。如果numUsableCores小於等於0,執行緒數量就是系統可用處理器數量,如果大於0便和MAX_DEFAULT_NETTY_THREADS=8進行比較,執行緒數量取小的,因為系統不可能將全部核心數都用來網路傳輸,因此需要設定上限。我們可以通過在Spark的配置中手動設定serverthread和clientthread的數量來覆蓋MAX_DEFAULT_NETTY_THREADS。具體程式碼如下:

SparkTransportConf的實現

在fromSparkConf方法中設定分別設定了服務端傳輸執行緒數(spark.$module.io.serverThreads)和客戶端傳輸執行緒數(spark.$module.io.clientThreads),建立TransportConf物件時,傳遞的是ConfigProvider的匿名內部類,該匿名內部類實現的get方法就是呼叫了SparkConf的get方法。

RPC客戶端工廠TransprotClientFactory

TransportContext的一個很重要的作用就是建立org.apache.spark.network.client.TransprotClientFactory。

TransprotClientFactory為其他主機維護著一個連線池,並且確保相同的遠端主機返回相同的TransportClient。它還為所有的TransportCliet維護著一個共享的工作執行緒池。

TransportClientFactory構造方法如下:

public TransportClientFactory(
      TransportContext context,
      List<TransportClientBootstrap> clientBootstraps) {
    this.context = Preconditions.checkNotNull(context);// TransportContext
    this.conf = context.getConf();// TransportConf
    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));// TransportClientBootstrap列表
    this.connectionPool = new ConcurrentHashMap<>();// 建立連線池
    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();// conf中key為"spark."+module+"io.numConnectionsPerPeer"的值
    this.rand = new Random();// 在ClientPool中隨機選擇TransportClient

    IOMode ioMode = IOMode.valueOf(conf.ioMode());// conf中key為"spark."+module+"io.mode"的值,IO mode有兩種模式:nio(預設)和epoll
    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);// 根據ioMode匹配Channel建立模式,有兩種:nio(預設)和epoll
    // TODO: Make thread pool name configurable.
    this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");// 建立Netty的WorkerGroup
    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
  }

在建立TransportClientFactory時傳入了兩個引數,一個是傳輸上下文TransportContext,還有一個是TransportClient載入程式列表List<TransportClientBootstrap>。

TransportClientBootstrap是一個介面,有兩個實現類分別是:SaslClientBootstrap和EncryptionDisablerBootstrap

TransportClientBootstrap在TransportClient給使用者使用之前做了一些引導工作,是對連結初始化之前做的一些準備工作,比如SASL身份驗證令牌,因為建立的連結可以重複使用,因此載入程式只會執行一次。

TransportClientFactory的主要作用就是建立RPC客戶端org.apache.spark.network.client.TransportClient,程式碼如下:

public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
    // Get connection from the connection pool first.
    // If it is not found or not active, create a new one.
    // Use unresolved address here to avoid DNS resolution each time we creates a client.
    // 建立InetSocketAddress
    final InetSocketAddress unresolvedAddress =
      InetSocketAddress.createUnresolved(remoteHost, remotePort);

    // Create the ClientPool if we don't have it yet.
    // 根據InetSocketAddress,在連線池找到對應的ClientPool快取
    // 如果找不到建立新的ClientPool快取,快取大小為conf中key為"spark."+module+"io.numConnectionsPerPeer"的值
    ClientPool clientPool = connectionPool.get(unresolvedAddress);
    if (clientPool == null) {
      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
      clientPool = connectionPool.get(unresolvedAddress);
    }

    // 隨機選擇TransportClient快取
    int clientIndex = rand.nextInt(numConnectionsPerPeer);
    TransportClient cachedClient = clientPool.clients[clientIndex];

    /**
     * client快取存在並且可使用時的操作:
     * 設定TransportChannelHandler最後一次的使用時間,確保不超時;
     * 然後再檢查client是否存活,存活的話建立TransportClient成功
     * */
    if (cachedClient != null && cachedClient.isActive()) {
      // Make sure that the channel will not timeout by updating the last use time of the
      // handler. Then check that the client is still alive, in case it timed out before
      // this code was able to update things.
      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
        .get(TransportChannelHandler.class);
      synchronized (handler) {
        handler.getResponseHandler().updateTimeOfLastRequest();
      }

      if (cachedClient.isActive()) {
        logger.trace("Returning cached connection to {}: {}",
          cachedClient.getSocketAddress(), cachedClient);
        return cachedClient;
      }
    }

    /**
     * client快取存在或者未啟用時候的操作:
     * 根據IP和埠號重新建立InetSocketAddress
     * */
    // If we reach here, we don't have an existing connection open. Let's create a new one.
    // Multiple threads might race here to create new connections. Keep only one of them active.
    final long preResolveHost = System.nanoTime();
    final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
    if (hostResolveTimeMs > 2000) {
      logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    } else {
      logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    }

    /**
     * 建立InetSocketAddress的過程會產生靜態條件,此時標記了clientPool中的locks陣列
     * 在clientPool中locks陣列中的元素與TransportClient陣列中的元素一對一對應關係
     * 先進入的執行緒會過載createClient方法,並放入到clientPool中與clientIndex對應位置上
     * 後面進入的執行緒就會直接得到第一個執行緒進來時建立好的client了
     * */
    synchronized (clientPool.locks[clientIndex]) {
      cachedClient = clientPool.clients[clientIndex];
      // 後續執行緒進入時直接使用
      if (cachedClient != null) {
        if (cachedClient.isActive()) {
          logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
          return cachedClient;
        } else {
          logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
        }
      }
      // 第一個執行緒進來時建立client,過載了createClient方法
      clientPool.clients[clientIndex] = createClient(resolvedAddress);
      return clientPool.clients[clientIndex];
    }
  }

從上面的建立過程可以知道TransportClinetFactory採用隨機負載均衡的方式,從clientPool(也就是快取)中獲取client。

第一個執行緒進行建立client操作時,過載了私有的createClient方法,這個方法才是真正建立TransportClient方法,程式碼如下:

private TransportClient createClient(InetSocketAddress address) throws IOException {
    logger.debug("Creating new connection to {}", address);

    /** 構建初始載入程式並進行配置 */
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
      .channel(socketChannelClass)
      // Disable Nagle's Algorithm since we don't want packets to wait
      .option(ChannelOption.TCP_NODELAY, true)
      .option(ChannelOption.SO_KEEPALIVE, true)
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
      .option(ChannelOption.ALLOCATOR, pooledAllocator);

    final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
    final AtomicReference<Channel> channelRef = new AtomicReference<>();

    /** 初始引導設定初始化channel */
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) {
        TransportChannelHandler clientHandler = context.initializePipeline(ch);
        clientRef.set(clientHandler.getClient());
        channelRef.set(ch);
      }
    });

    // Connect to the remote server
    long preConnect = System.nanoTime();
    ChannelFuture cf = bootstrap.connect(address); // 連結遠端伺服器
    if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
      throw new IOException(
        String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
    } else if (cf.cause() != null) {
      throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
    }

    TransportClient client = clientRef.get();
    Channel channel = channelRef.get();
    assert client != null : "Channel future completed successfully with null client";

    // Execute any client bootstraps synchronously before marking the Client as successful.
    long preBootstrap = System.nanoTime();
    logger.debug("Connection to {} successful, running bootstraps...", address);
    try {
      /** 執行TransportClient載入程式列表 */
      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
        clientBootstrap.doBootstrap(client, channel);
      }
    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
      long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
      logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
      client.close();
      throw Throwables.propagate(e);
    }
    long postBootstrap = System.nanoTime();

    logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
      address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);

    return client;
  }

createClient方法主要做了以下幾件事:

  1. 初始化根載入程式並進行配置
  2. 根載入程式設定初始化管道
  3. 使用根載入程式連線遠端伺服器
  4. 執行TransportClient載入程式列表
  5. 返回TransportClient物件

TransportClinetFactory中連線池的結構如下

連結池結構

多個連結對應對個客戶端快取,快取中每個客戶端快取對應一個鎖,用於避免競爭,並採用隨機負載均衡的方式從快取中獲取客戶端。

RPC服務端TransportServer

TransportContext另一個很重要的作用就是建立org.apache.spark.network.server.TransportServer。

TransportContext有4個createServer過載方法(介紹TransportContext已經提到過了),但其實都是在用這個構造器建立TransportServer的。

TransportServer構造器

對一些成員變數context、conf、appRpcHandler、bootstraps進行賦值過後,開始執行init(String hostToBind, int portToBind)對TransportServer初始化。

init(String hostToBind, int portToBind)的程式碼如下:

private void init(String hostToBind, int portToBind) {
    /**
     * ioMode同TransportClientFactory初始化時是一樣的
     * conf中key為"spark."+module+"io.mode"的值,IO mode有兩種模式:nio(預設)和epoll
     * */
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    /**
     * 1、建立Netty服務端需要同時建立bossGroup和workerGroup
     * */
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
    EventLoopGroup workerGroup = bossGroup;

    /**
     * 2、建立ByteBuf分配器,對本地執行緒快取禁用
     * (ByteBuf由事件迴圈執行緒分配,執行執行緒釋放,本地快取會延遲迴收,加大開銷,所有禁用)
     * */
    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

    /**
     * 3、建立跟載入程式並配置
     * */
    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }

    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }

    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }

    /**
     * 4、初始化管道,執行TransportServer載入程式列表
     * */
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        context.initializePipeline(ch, rpcHandler);
      }
    });

    /**
     * 5、繫結IP 埠號
     * */
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
  }

初始化TransportServer一共做了5件事:

  1. 建立boosGroup和workerGroup(Netty服務端需要同時建立)。ioMode的建立與TransportClientFactory中建立TransportClient的方式是一樣的。
  2. 建立ByteBuf分配器,對本地執行緒快取禁用。ByteBuf由事件迴圈執行緒分配,執行執行緒釋放,本地快取會延遲迴收,加大開銷,所有禁用。
  3. 建立根載入程式並配置。
  4. 初始化管道,執行TransportServer載入程式列表。
  5. 繫結IP 埠號。

未完待續~~~