【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(二)
目錄
RPC管道處理TransportChannelHandler
接著【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一) 接著分析
RPC管道處理TransportChannelHandler
TransportContext最後一個作用就是使用org.apache.spark.network.server.TransportChannelHandler設定Netty Channel pipelines(Netty的通訊管道)。
在TransportClientFactory的createClient方法和TransportServer的init方法中都執行了初始化管道方法,也就是TransportContext中的initializePipeline方法。
initializePipeline方法的程式碼如下:
public TransportChannelHandler initializePipeline( SocketChannel channel, RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); /** * 管道設定 * Request時按照順序執行,TransportFrameDecoder-》MessageDecoder-》IdleStateHandler-》TransportChannelHandler * Response時按照逆序執行,IdleStateHandler-》MessageEncoder * */ channel.pipeline() .addLast("encoder", encoder) //為pipeline設定encoder .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) //為pipeline設定frameDecoder .addLast("decoder", decoder) //為pipeline設定decoder .addLast("idleStateHandler", //為pipeline設定IdleStateHandler,Netty內建物件 new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); throw e; } }
首先是呼叫createChannelHandler方法建立TransportChannelHandler物件,createChannelHandler程式碼如下:
/** * TransportChannelHandler * 在服務端代理TransportRequestHandler處理請求訊息 * 在客戶端代理TransportResponseHandler處理相應資訊 * * */ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) { // 建立TransportChannelHandler的同時,建立了TransportResponseHandler、TransportRequestHandler和TransportClient TransportResponseHandler responseHandler = new TransportResponseHandler(channel); // 真正意義上的建立TransportClient TransportClient client = new TransportClient(channel, responseHandler); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections); }
建立TransportChannelHandler之前,先建立了TransportResponseHandler、TransportClient和TransportRequestHandler,在這裡才是真正意義的建立TransportClient物件,與管道一一對應,保證所有使用者使用channel時得到的是同一個TrasportClient物件。TransportChannelHandler在服務端代理TransportRequestHandler處理請求訊息,在客戶端代理TransportResponseHandler處理相應資訊。建立了TransportChannelHandler之後,對管道pipeline進行設定,程式碼如下:
TransportFrameDecoder、MessageDecoder、TransportChannelHandler本質上都是繼承了ChannelInboundHandler,MessageEncoder本質上都是繼承了ChannelOutboundHandler,IdleStateHandler本質上都是繼承了ChannelInboundHandler,ChannelOutboundHandler(繼承和介面實現),根據Netty中handler的執行順序,得出如下:
Request時按照順序執行,TransportFrameDecoder-》MessageDecoder-》IdleStateHandler-》TransportChannelHandler,Response時按照逆序執行,IdleStateHandler-》MessageEncoder。結構如下:
RPC服務端處理RpcHandler
下面的程式碼是TransportRequestHandler中的程式碼,可以清楚的看到,TransportRequestHandler是將請求訊息交給rpcHandler做進一步處理。
RpcHandler是一個抽象類,主要有以下幾個方法:
receive方法,接收單一PRC訊息,RpcResponseCallback用來處理結束後的回掉,無論成功與否,都會執行一次。有一個receive過載方法,預設執行OneWayRpcCallback回撥,這個回撥只負責列印成功和失敗時的資訊。
/**
* 抽象方法用來接收單一RPC訊息
* RpcResponseCallback用來處理結束後的回掉,無論成功與否,都會執行一次
* */
public abstract void receive(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback);
/**
* 過載receive方法,預設執行ONE_WAY_CALLBACK回撥
* */
public void receive(TransportClient client, ByteBuffer message) {
receive(client, message, ONE_WAY_CALLBACK);
}
getStreamManager方法,抽象方法,用於獲取getStreamManager
/**
* 抽象方法獲取StreamManager
* */
public abstract StreamManager getStreamManager();
channelActive、channelInactive、exceptionCaught方法,分別與客戶端相關聯的channel處於活動/非活動/異常狀態時呼叫
/**
* 當與客戶端相關聯的channel處於活動狀態時呼叫
* */
public void channelActive(TransportClient client) { }
/**
* 當與客戶端相關聯的channel處於非活動狀態時呼叫
* */
public void channelInactive(TransportClient client) { }
/**
* 當與客戶端相關聯的channel產生異常時呼叫
* */
public void exceptionCaught(Throwable cause, TransportClient client) { }
瞭解了RpcHandler的結構後,再看一下TransportRequestHandler的handle(RequestMessage request)方法
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
TransportRequestHandler處理4中RequestMessage:
1、處理塊獲取請求
private void processFetchRequest(final ChunkFetchRequest req) {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
req.streamChunkId);
}
ManagedBuffer buf;
try {
// this.streamManager = rpcHandler.getStreamManager();
// 校驗客戶端是否有許可權從流中讀取訊息
streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
// 將一個流與一個客戶端的TCP連結關聯起來,單個流只會有一個客戶端讀取
streamManager.registerChannel(channel, req.streamChunkId.streamId);
// 獲取塊
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format("Error opening block %s for request from %s",
req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
// 將ManagedBuffer和流的塊ID封裝到ChunkFetchSuccess中,呼叫respond方法返回給客戶端
respond(new ChunkFetchSuccess(req.streamChunkId, buf));
}
processFetchRequest做了以下幾件事:
- 校驗客戶端是否有許可權從流中讀取訊息
- 將一個流與一個客戶端的TCP連結關聯起來,單個流只會有一個客戶端讀取
- 獲取塊
- 將ManagedBuffer和流的塊ID封裝到ChunkFetchSuccess中,呼叫respond方法返回給客戶端
2、處理RPC請求
程式碼如下:
private void processRpcRequest(final RpcRequest req) {
try {
/**
* 將傳送訊息的客戶端、RpcRequest訊息的內容和RpcResponseCallback回撥類作為引數傳遞給RpcHandler的receive方法
* 所以說真正處理訊息的是RpcHandler,而不是TrnsportRequestHandler
* */
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
}
}
將傳送訊息的客戶端、RpcRequest訊息的內容和RpcResponseCallback回撥類作為引數傳遞給RpcHandler的receive方法,所以說真正處理訊息的是RpcHandler,而不是TrnsportRequestHandler。
3、處理無需回覆的RPC請求
處理無需回覆的RPC請求,回撥類是OneWayRpcCallback,處理完RPC請求後不會給客戶端作出響應。
private void processOneWayMessage(OneWayMessage req) {
try {
/**
* 處理無需回覆的RPC請求,回撥類是OneWayRpcCallback,處理完RPC請求後不會給客戶端作出響應
* */
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
req.body().release();
}
}
4、處理流請求
使用streamManager.openStream方法將流資料封裝為ManagedBuffer。
private void processStreamRequest(final StreamRequest req) {
ManagedBuffer buf;
try {
// 將獲取的流資料封裝為ManagedBuffer
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
logger.error(String.format(
"Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
// 無論成功還是失敗,都要響應客戶端
if (buf != null) {
respond(new StreamResponse(req.streamId, buf.size(), buf));
} else {
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
}
}
上面這四種處理請求的方法,除了processOneWayMessage不需要呼叫respond方法外,其他三個都需要呼叫respond方法,用來響應客戶端。respond方法中實際是呼叫channel.writeAndFlush來響應客戶端的。
private void respond(final Encodable result) {
final SocketAddress remoteAddress = channel.remoteAddress();
//響應客戶端
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
}
}
);
}
載入程式Bootstrap
在TransportServer中有一個成員變數List<TransportServerBootstrap>是TransportServer載入程式列表,在初始化管道時,呼叫了每一個載入程式的doBootstrap方法。
TransportServerBootstrap定義了服務端載入程式的規範,當客戶端與服務端建立了連線,在服務端持有的客戶端管道上執行載入程式。
TransportServerBootstrap介面定義如下:
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}
TransportServerBootstrap有兩個實現類,一個是SaslServerBootstrap,另一個是EncryptionCheckerBootstrap,以SaslServerBootstrap為例說明載入程式的作用。
直接看SaslServerBootstrap的doBootstrap方法:
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
}
doBootstrap方法直接建立了一個RpcHandler的具體實現類SaslRpcHandler。SaslRpcHandler負責對管道進行SASL加密,它集成了RpcHandler,所以核心程式碼就在receive中:
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
if (isComplete) {
// Authentication complete, delegate to base handler.
// 將處理好的訊息傳遞給下游的RpcHandler
delegate.receive(client, message, callback);
return;
}
ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
SaslMessage saslMessage;
try {
// 進行SASL加密
saslMessage = SaslMessage.decode(nettyBuf);
} finally {
nettyBuf.release();
}
if (saslServer == null) {
// First message in the handshake, setup the necessary state.
client.setClientId(saslMessage.appId);
// 如果saslServer為空,建立SparkSaslServer
saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
conf.saslServerAlwaysEncrypt());
}
byte[] response;
try {
// saslServer處理已經解密的資訊
response = saslServer.response(JavaUtils.bufferToArray(
saslMessage.body().nioByteBuffer()));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
callback.onSuccess(ByteBuffer.wrap(response));
// Setup encryption after the SASL response is sent, otherwise the client can't parse the
// response. It's ok to change the channel pipeline here since we are processing an incoming
// message, so the pipeline is busy and no new incoming messages will be fed to it before this
// method returns. This assumes that the code ensures, through other means, that no outbound
// messages are being written to the channel while negotiation is still going on.
if (saslServer.isComplete()) {
logger.debug("SASL authentication successful for channel {}", client);
isComplete = true; // 處理完成
if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
logger.debug("Enabling encryption for channel {}", client);
// 進行管道加密
SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
saslServer = null;
} else {
saslServer.dispose();
saslServer = null;
}
}
}
receive做了以下幾件事:
- 如果認證已經完成(isComplete=true),將訊息傳遞給下游RpcHandler
- 如果認證未經完成(isComplete=false)對客戶端傳送的訊息進行加密
- 如果saslServer=null,建立SparkSaslServer,SaslRpcHandler接收客戶端第一條訊息時執行此操作
- 使用saslServer處理已解密的訊息,並執行回撥返回給客戶端
- 如果認證已經完成,改變isComplete=true
- 對管道進行Sasl加密
可以看到載入程式主要起到了引導、包裝、傳遞、代理的作用,類似的還有TransportClientBootstrap。
RPC客戶端TransportClient
看完RPC服務端利用RpcHandler處理訊息後,這裡也看看RPC客戶端如何處理訊息的,在TransportContext的createChannelHandler中建立TransportClient。TransportClient一共有5個方法用於傳送請求:
- fetchChunk:從遠端協商好的流中請求單個塊
- stream:使用流的ID,從遠端獲取流資料
- sendRpc:向服務端傳送RPC請求,通過at least once delivery原則保證請求不丟失
- sendRpcSync:向服務端傳送非同步RPC請求
- send:想服務端傳送RPC請求,但並期望獲取響應,不能保證可靠性
這裡重點分析一下sendRpc方法,程式碼如下:
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
// UUID生成requestId
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
/**
* 這裡的handler是TransportResponseHandler
* 更新最後一次請求時間
* addRpcRequest中利用Map設定requestId和回撥類的關係,requestId為key,callback為value
* */
handler.addRpcRequest(requestId, callback);
/**
* channel.writeAndFlush傳送請求,無論成功還是失敗都會回撥ChannelFutureListener的operationComplete方法
* 成功的話列印日誌資訊
* 失敗的話不僅要列印日誌,還要執行handler.removeRpcRequest(requestId),移除此次請求
* */
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
try {
callback.onFailure(new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
return requestId;
}
從上面的程式碼可以看出,sendRpc做了如下事情:
- 利用UUID生成requestId
- 更新最後一次請求時間,並利用Map設定了requestId和回撥類的對應關係
- channel.writeAndFlush傳送請求,無論成功還是失敗都會回撥ChannelFutureListener的operationComplete方法,成功的話列印日誌資訊,失敗的話不僅要列印日誌,還要執行handler.removeRpcRequest(requestId),移除此次請求
- 返回requestId
請求傳送成功後,客戶端將會等待接收服務端響應,返回的訊息會傳會給TransportChannelHandler的channelRead方法。
接著進入responseHandler.handle((ResponseMessage) request);方法,其中有6中型別的判斷,RPC對應的是RpcResponse和RpcFailure。
RpcResponse對應的處理如下:
RpcFailure對應的處理如下:
總結
根據上面對Spark RPC元件的分析可以得到RPC客戶端服務端的請求響應流程,如下圖所示:
客戶端傳送請求是在TransportClient的對應方法執行了channel.writeAndFlush方法,並設定了成功和失敗監聽;服務端響應請求是TransportRequestHandler的respond方法中執行了channel.writeAndFlush方法。無論是服務端得到請求還是客戶端接收響應都是通過TransportChannelHandler的channelRead方法判斷Message型別,服務端得到請求交由TransportRequestHandler處理,客戶端接收響應交由TransportResponseHandler處理。載入程式則貫穿於各個步驟當中。
最後總結出Spark RPC框架結構如下圖所示: