1. 程式人生 > >【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(二)

【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(二)

目錄

RPC管道處理TransportChannelHandler

RPC服務端處理RpcHandler

載入程式Bootstrap

RPC客戶端TransportClient

總結


接著【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一) 接著分析

RPC管道處理TransportChannelHandler

TransportContext最後一個作用就是使用org.apache.spark.network.server.TransportChannelHandler設定Netty Channel pipelines(Netty的通訊管道)。

在TransportClientFactory的createClient方法和TransportServer的init方法中都執行了初始化管道方法,也就是TransportContext中的initializePipeline方法。

TransportClientFactory中呼叫initializePipeline方法
TransportServer中呼叫initializePipeline方法

 

initializePipeline方法的程式碼如下:

public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      /**
       * 管道設定
       * Request時按照順序執行,TransportFrameDecoder-》MessageDecoder-》IdleStateHandler-》TransportChannelHandler
       * Response時按照逆序執行,IdleStateHandler-》MessageEncoder
       * */
      channel.pipeline()
        .addLast("encoder", encoder) //為pipeline設定encoder
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) //為pipeline設定frameDecoder
        .addLast("decoder", decoder) //為pipeline設定decoder
        .addLast("idleStateHandler", //為pipeline設定IdleStateHandler,Netty內建物件
                new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
        // would require more logic to guarantee if this were not part of the same event loop.
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

首先是呼叫createChannelHandler方法建立TransportChannelHandler物件,createChannelHandler程式碼如下:

/**
   * TransportChannelHandler
   * 在服務端代理TransportRequestHandler處理請求訊息
   * 在客戶端代理TransportResponseHandler處理相應資訊
   *
   * */
  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    // 建立TransportChannelHandler的同時,建立了TransportResponseHandler、TransportRequestHandler和TransportClient
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    // 真正意義上的建立TransportClient
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

建立TransportChannelHandler之前,先建立了TransportResponseHandler、TransportClient和TransportRequestHandler,在這裡才是真正意義的建立TransportClient物件,與管道一一對應,保證所有使用者使用channel時得到的是同一個TrasportClient物件。TransportChannelHandler在服務端代理TransportRequestHandler處理請求訊息,在客戶端代理TransportResponseHandler處理相應資訊。建立了TransportChannelHandler之後,對管道pipeline進行設定,程式碼如下:

管道設定

TransportFrameDecoder、MessageDecoder、TransportChannelHandler本質上都是繼承了ChannelInboundHandler,MessageEncoder本質上都是繼承了ChannelOutboundHandler,IdleStateHandler本質上都是繼承了ChannelInboundHandler,ChannelOutboundHandler(繼承和介面實現),根據Netty中handler的執行順序,得出如下:

Request時按照順序執行,TransportFrameDecoder-》MessageDecoder-》IdleStateHandler-》TransportChannelHandler,Response時按照逆序執行,IdleStateHandler-》MessageEncoder。結構如下:

管道處理請求和響應的流程

RPC服務端處理RpcHandler

下面的程式碼是TransportRequestHandler中的程式碼,可以清楚的看到,TransportRequestHandler是將請求訊息交給rpcHandler做進一步處理。

將請求訊息交給rpcHandler

 

RpcHandler是一個抽象類,主要有以下幾個方法:

receive方法,接收單一PRC訊息,RpcResponseCallback用來處理結束後的回掉,無論成功與否,都會執行一次。有一個receive過載方法,預設執行OneWayRpcCallback回撥,這個回撥只負責列印成功和失敗時的資訊。

/**
   * 抽象方法用來接收單一RPC訊息
   * RpcResponseCallback用來處理結束後的回掉,無論成功與否,都會執行一次
   * */
  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);
/**
   * 過載receive方法,預設執行ONE_WAY_CALLBACK回撥
   * */
  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

getStreamManager方法,抽象方法,用於獲取getStreamManager

/**
   * 抽象方法獲取StreamManager
   * */
  public abstract StreamManager getStreamManager();

channelActive、channelInactive、exceptionCaught方法,分別與客戶端相關聯的channel處於活動/非活動/異常狀態時呼叫

  /**
   * 當與客戶端相關聯的channel處於活動狀態時呼叫
   * */
  public void channelActive(TransportClient client) { }

  /**
   * 當與客戶端相關聯的channel處於非活動狀態時呼叫
   * */
  public void channelInactive(TransportClient client) { }

  /**
   * 當與客戶端相關聯的channel產生異常時呼叫
   * */
  public void exceptionCaught(Throwable cause, TransportClient client) { }

瞭解了RpcHandler的結構後,再看一下TransportRequestHandler的handle(RequestMessage request)方法

public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

TransportRequestHandler處理4中RequestMessage:

1、處理塊獲取請求

private void processFetchRequest(final ChunkFetchRequest req) {
    if (logger.isTraceEnabled()) {
      logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
        req.streamChunkId);
    }

    ManagedBuffer buf;
    try {
      // this.streamManager = rpcHandler.getStreamManager();
      // 校驗客戶端是否有許可權從流中讀取訊息
      streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
      // 將一個流與一個客戶端的TCP連結關聯起來,單個流只會有一個客戶端讀取
      streamManager.registerChannel(channel, req.streamChunkId.streamId);
      // 獲取塊
      buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
    } catch (Exception e) {
      logger.error(String.format("Error opening block %s for request from %s",
        req.streamChunkId, getRemoteAddress(channel)), e);
      respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
      return;
    }
    // 將ManagedBuffer和流的塊ID封裝到ChunkFetchSuccess中,呼叫respond方法返回給客戶端
    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
  }

processFetchRequest做了以下幾件事:

  1. 校驗客戶端是否有許可權從流中讀取訊息
  2. 將一個流與一個客戶端的TCP連結關聯起來,單個流只會有一個客戶端讀取
  3. 獲取塊
  4. 將ManagedBuffer和流的塊ID封裝到ChunkFetchSuccess中,呼叫respond方法返回給客戶端

2、處理RPC請求

程式碼如下:

private void processRpcRequest(final RpcRequest req) {
    try {
      /**
       * 將傳送訊息的客戶端、RpcRequest訊息的內容和RpcResponseCallback回撥類作為引數傳遞給RpcHandler的receive方法
       * 所以說真正處理訊息的是RpcHandler,而不是TrnsportRequestHandler
       * */
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }

        @Override
        public void onFailure(Throwable e) {
          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
      });
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    } finally {
      req.body().release();
    }
  }

將傳送訊息的客戶端、RpcRequest訊息的內容和RpcResponseCallback回撥類作為引數傳遞給RpcHandler的receive方法,所以說真正處理訊息的是RpcHandler,而不是TrnsportRequestHandler。

3、處理無需回覆的RPC請求

處理無需回覆的RPC請求,回撥類是OneWayRpcCallback,處理完RPC請求後不會給客戶端作出響應。

private void processOneWayMessage(OneWayMessage req) {
    try {
      /**
       * 處理無需回覆的RPC請求,回撥類是OneWayRpcCallback,處理完RPC請求後不會給客戶端作出響應
       * */
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
      req.body().release();
    }
  }

4、處理流請求

使用streamManager.openStream方法將流資料封裝為ManagedBuffer。

private void processStreamRequest(final StreamRequest req) {
    ManagedBuffer buf;
    try {
      // 將獲取的流資料封裝為ManagedBuffer
      buf = streamManager.openStream(req.streamId);
    } catch (Exception e) {
      logger.error(String.format(
        "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
      respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
      return;
    }
    // 無論成功還是失敗,都要響應客戶端
    if (buf != null) {
      respond(new StreamResponse(req.streamId, buf.size(), buf));
    } else {
      respond(new StreamFailure(req.streamId, String.format(
        "Stream '%s' was not found.", req.streamId)));
    }
  }

上面這四種處理請求的方法,除了processOneWayMessage不需要呼叫respond方法外,其他三個都需要呼叫respond方法,用來響應客戶端。respond方法中實際是呼叫channel.writeAndFlush來響應客戶端的。

private void respond(final Encodable result) {
    final SocketAddress remoteAddress = channel.remoteAddress();
    //響應客戶端
    channel.writeAndFlush(result).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            logger.trace("Sent result {} to client {}", result, remoteAddress);
          } else {
            logger.error(String.format("Error sending result %s to %s; closing connection",
              result, remoteAddress), future.cause());
            channel.close();
          }
        }
      }
    );
  }

載入程式Bootstrap

在TransportServer中有一個成員變數List<TransportServerBootstrap>是TransportServer載入程式列表,在初始化管道時,呼叫了每一個載入程式的doBootstrap方法。

執行載入程式

 

TransportServerBootstrap定義了服務端載入程式的規範,當客戶端與服務端建立了連線,在服務端持有的客戶端管道上執行載入程式。

TransportServerBootstrap介面定義如下:

public interface TransportServerBootstrap {
  
  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

TransportServerBootstrap有兩個實現類,一個是SaslServerBootstrap,另一個是EncryptionCheckerBootstrap,以SaslServerBootstrap為例說明載入程式的作用。

直接看SaslServerBootstrap的doBootstrap方法:

public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
    return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
  }

doBootstrap方法直接建立了一個RpcHandler的具體實現類SaslRpcHandler。SaslRpcHandler負責對管道進行SASL加密,它集成了RpcHandler,所以核心程式碼就在receive中:

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    if (isComplete) {
      // Authentication complete, delegate to base handler.
      // 將處理好的訊息傳遞給下游的RpcHandler
      delegate.receive(client, message, callback);
      return;
    }

    ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
    SaslMessage saslMessage;
    try {
      // 進行SASL加密
      saslMessage = SaslMessage.decode(nettyBuf);
    } finally {
      nettyBuf.release();
    }

    if (saslServer == null) {
      // First message in the handshake, setup the necessary state.
      client.setClientId(saslMessage.appId);
      // 如果saslServer為空,建立SparkSaslServer
      saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
        conf.saslServerAlwaysEncrypt());
    }

    byte[] response;
    try {
      // saslServer處理已經解密的資訊
      response = saslServer.response(JavaUtils.bufferToArray(
        saslMessage.body().nioByteBuffer()));
    } catch (IOException ioe) {
      throw new RuntimeException(ioe);
    }
    callback.onSuccess(ByteBuffer.wrap(response));

    // Setup encryption after the SASL response is sent, otherwise the client can't parse the
    // response. It's ok to change the channel pipeline here since we are processing an incoming
    // message, so the pipeline is busy and no new incoming messages will be fed to it before this
    // method returns. This assumes that the code ensures, through other means, that no outbound
    // messages are being written to the channel while negotiation is still going on.
    if (saslServer.isComplete()) {
      logger.debug("SASL authentication successful for channel {}", client);
      isComplete = true; // 處理完成
      if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
        logger.debug("Enabling encryption for channel {}", client);
        // 進行管道加密
        SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
        saslServer = null;
      } else {
        saslServer.dispose();
        saslServer = null;
      }
    }
  }

receive做了以下幾件事:

  1. 如果認證已經完成(isComplete=true),將訊息傳遞給下游RpcHandler
  2. 如果認證未經完成(isComplete=false)對客戶端傳送的訊息進行加密
  3. 如果saslServer=null,建立SparkSaslServer,SaslRpcHandler接收客戶端第一條訊息時執行此操作
  4. 使用saslServer處理已解密的訊息,並執行回撥返回給客戶端
  5. 如果認證已經完成,改變isComplete=true
  6. 對管道進行Sasl加密

可以看到載入程式主要起到了引導、包裝、傳遞、代理的作用,類似的還有TransportClientBootstrap。

RPC客戶端TransportClient

看完RPC服務端利用RpcHandler處理訊息後,這裡也看看RPC客戶端如何處理訊息的,在TransportContext的createChannelHandler中建立TransportClient。TransportClient一共有5個方法用於傳送請求:

  1. fetchChunk:從遠端協商好的流中請求單個塊
  2. stream:使用流的ID,從遠端獲取流資料
  3. sendRpc:向服務端傳送RPC請求,通過at least once delivery原則保證請求不丟失
  4. sendRpcSync:向服務端傳送非同步RPC請求
  5. send:想服務端傳送RPC請求,但並期望獲取響應,不能保證可靠性

這裡重點分析一下sendRpc方法,程式碼如下:

public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isTraceEnabled()) {
      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    }

    // UUID生成requestId
    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    /**
     * 這裡的handler是TransportResponseHandler
     * 更新最後一次請求時間
     * addRpcRequest中利用Map設定requestId和回撥類的關係,requestId為key,callback為value
     * */
    handler.addRpcRequest(requestId, callback);

    /**
     * channel.writeAndFlush傳送請求,無論成功還是失敗都會回撥ChannelFutureListener的operationComplete方法
     * 成功的話列印日誌資訊
     * 失敗的話不僅要列印日誌,還要執行handler.removeRpcRequest(requestId),移除此次請求
     * */
    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", requestId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeRpcRequest(requestId);
            channel.close();
            try {
              callback.onFailure(new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });

    return requestId;
  }

從上面的程式碼可以看出,sendRpc做了如下事情:

  1. 利用UUID生成requestId
  2. 更新最後一次請求時間,並利用Map設定了requestId和回撥類的對應關係
  3. channel.writeAndFlush傳送請求,無論成功還是失敗都會回撥ChannelFutureListener的operationComplete方法,成功的話列印日誌資訊,失敗的話不僅要列印日誌,還要執行handler.removeRpcRequest(requestId),移除此次請求
  4. 返回requestId

請求傳送成功後,客戶端將會等待接收服務端響應,返回的訊息會傳會給TransportChannelHandler的channelRead方法。

返回訊息傳遞給channelRead

接著進入responseHandler.handle((ResponseMessage) request);方法,其中有6中型別的判斷,RPC對應的是RpcResponse和RpcFailure。

RpcResponse對應的處理如下:

RpcFailure對應的處理如下:

總結

根據上面對Spark RPC元件的分析可以得到RPC客戶端服務端的請求響應流程,如下圖所示:

請求、響應流程圖

客戶端傳送請求是在TransportClient的對應方法執行了channel.writeAndFlush方法,並設定了成功和失敗監聽;服務端響應請求是TransportRequestHandler的respond方法中執行了channel.writeAndFlush方法。無論是服務端得到請求還是客戶端接收響應都是通過TransportChannelHandler的channelRead方法判斷Message型別,服務端得到請求交由TransportRequestHandler處理,客戶端接收響應交由TransportResponseHandler處理。載入程式則貫穿於各個步驟當中。

最後總結出Spark RPC框架結構如下圖所示:

Spark RPC框架結構