【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一)
目錄
RPC客戶端工廠TransprotClientFactory
作為一個分散式計算引擎,既然是分散式,那麼網路通訊是肯定少不了的,在Spark中很多地方都涉及到了網路通訊,各個元件之間訊息傳輸、使用者檔案和資源的上傳、Shuffle過程、Block的資料複製與備份等等,都少不了網路通訊。
在Spark2.X之前,Spark元件通訊使用Akka,使用者檔案和資源的上傳是基於Jetty實現的HttpFileServer,Shuffle過程、Block的資料複製與備份是基於Netty實現的。Spark2.0版本之後,Spark放棄了Akka和Jetty,各個元件之間訊息傳輸、使用者檔案和資源的上傳、Shuffle過程、Block的資料複製與備份等等統一採用Spark的RPC框架NettyStreamManager,將通訊框架統一起來了。
RPC上下文TransportContext
org.apache.spark.network.TransportContext,傳輸上下文,其主要作用是建立TransportServer,TransportClientFactory和使用TransportChannelHandler設定Netty Channel pipelines(Netty的通訊管道)。
TransportClient 提供了兩個通訊協議,分別是RPC控制層和資料層。RPC的處理是在TransportContext之外的,它負責設定流,這個流以zero-copy IO的形式與資料塊進行通訊傳輸。
TransportServer和TransportClientFactory都為每一個channel建立一個TransportChannelHandler。每一個TransportChannelHandler都包含一個TransportClient,可以使得服務程序通過現有的channel傳送訊息給客戶端。
下面是TransportContext的原始碼分析:
TransportContext中包含TransportConf、RpcHandler、MessageEncoder、MessageDecoder和一個建立TransportChannelHandler時使用的closeIdleConnections(布林型)屬性;兩個構造方法,TransportConf、RpcHandler是必須傳入TransportContext中的,closeIdleConnections選填。如下圖所示:
TransportContext的主要作用之一就是建立TransprotClientFactory,建立TransportClientFactory時需要傳入TransportClientBootstrap列表。
TransportContext另一個主要作用就是建立TransportServer,RPC框架的服務端,建立服務端時可以設定指定的埠,也可以指定特定的IP地址+埠,當然也可以既不指定IP地址也不指定埠(此時,埠預設為0)。建立TransportServer時還需要傳入一個TransportServerBootstrap列表。
TransportContext的最後一個作用就是建立TransportChannelHandler,並使用TransportChannelHandler設定Netty Channel pipelines(Netty的通訊管道)。初始化Netty通訊管道,設定編碼/解碼,TransportChannelHandler還進行傳送/接收訊息處理。TransportChannelHandler包含TransportClient,在此channel上進行通訊,與此channel直接關聯,確保所有使用者使用channel時得到的是同一個TrasportClient物件。
RPC配置TransportConf
TransportContext中包含org.apache.spark.network.util.TransportConf,TransportConf提供整個RPC框架的配置資訊。TransprotConf有兩個主要的成員屬性,分別是配置提供者conf和模組配置名稱module,還有一些關鍵配置資訊的KEY,根據這些KEY,TransportConf為RPC框架提供相應的API可獲取配置資訊。
模組配置名稱module與getConfKey(String suffix)方法結合,為關鍵配置資訊的KEY賦值,賦值的格式是:
"spark." + module + "." + suffix,suffix是具體的字尾
具體實現是:
另一個主要的成員屬性就是配置提供者ConfigProvider conf,根據配置資訊的KEY從配置提供者ConfigProvider conf中得到具體的配置資訊。
org.apache.spark.network.util.ConfigProvider是一個抽象類,有一個get抽象方法,其他的get、getInt、getLong、getDouble、getBoolean具體方法都是基於這個抽象方法進行的型別轉換。
Spark中使用org.apache.spark.network.netty.SparkTransportConf來建立TransportConf。fromSparkConf方法構建了TransportConf,該方法需要傳遞三個引數SparkConf、模組名module和可用核心數numUsableCores(預設為0)。如果numUsableCores小於等於0,執行緒數量就是系統可用處理器數量,如果大於0便和MAX_DEFAULT_NETTY_THREADS=8進行比較,執行緒數量取小的,因為系統不可能將全部核心數都用來網路傳輸,因此需要設定上限。我們可以通過在Spark的配置中手動設定serverthread和clientthread的數量來覆蓋MAX_DEFAULT_NETTY_THREADS。具體程式碼如下:
在fromSparkConf方法中設定分別設定了服務端傳輸執行緒數(spark.$module.io.serverThreads)和客戶端傳輸執行緒數(spark.$module.io.clientThreads),建立TransportConf物件時,傳遞的是ConfigProvider的匿名內部類,該匿名內部類實現的get方法就是呼叫了SparkConf的get方法。
RPC客戶端工廠TransprotClientFactory
TransportContext的一個很重要的作用就是建立org.apache.spark.network.client.TransprotClientFactory。
TransprotClientFactory為其他主機維護著一個連線池,並且確保相同的遠端主機返回相同的TransportClient。它還為所有的TransportCliet維護著一個共享的工作執行緒池。
TransportClientFactory構造方法如下:
public TransportClientFactory(
TransportContext context,
List<TransportClientBootstrap> clientBootstraps) {
this.context = Preconditions.checkNotNull(context);// TransportContext
this.conf = context.getConf();// TransportConf
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));// TransportClientBootstrap列表
this.connectionPool = new ConcurrentHashMap<>();// 建立連線池
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();// conf中key為"spark."+module+"io.numConnectionsPerPeer"的值
this.rand = new Random();// 在ClientPool中隨機選擇TransportClient
IOMode ioMode = IOMode.valueOf(conf.ioMode());// conf中key為"spark."+module+"io.mode"的值,IO mode有兩種模式:nio(預設)和epoll
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);// 根據ioMode匹配Channel建立模式,有兩種:nio(預設)和epoll
// TODO: Make thread pool name configurable.
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");// 建立Netty的WorkerGroup
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}
在建立TransportClientFactory時傳入了兩個引數,一個是傳輸上下文TransportContext,還有一個是TransportClient載入程式列表List<TransportClientBootstrap>。
TransportClientBootstrap是一個介面,有兩個實現類分別是:SaslClientBootstrap和EncryptionDisablerBootstrap
TransportClientBootstrap在TransportClient給使用者使用之前做了一些引導工作,是對連結初始化之前做的一些準備工作,比如SASL身份驗證令牌,因為建立的連結可以重複使用,因此載入程式只會執行一次。
TransportClientFactory的主要作用就是建立RPC客戶端org.apache.spark.network.client.TransportClient,程式碼如下:
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
// Use unresolved address here to avoid DNS resolution each time we creates a client.
// 建立InetSocketAddress
final InetSocketAddress unresolvedAddress =
InetSocketAddress.createUnresolved(remoteHost, remotePort);
// Create the ClientPool if we don't have it yet.
// 根據InetSocketAddress,在連線池找到對應的ClientPool快取
// 如果找不到建立新的ClientPool快取,快取大小為conf中key為"spark."+module+"io.numConnectionsPerPeer"的值
ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(unresolvedAddress);
}
// 隨機選擇TransportClient快取
int clientIndex = rand.nextInt(numConnectionsPerPeer);
TransportClient cachedClient = clientPool.clients[clientIndex];
/**
* client快取存在並且可使用時的操作:
* 設定TransportChannelHandler最後一次的使用時間,確保不超時;
* 然後再檢查client是否存活,存活的話建立TransportClient成功
* */
if (cachedClient != null && cachedClient.isActive()) {
// Make sure that the channel will not timeout by updating the last use time of the
// handler. Then check that the client is still alive, in case it timed out before
// this code was able to update things.
TransportChannelHandler handler = cachedClient.getChannel().pipeline()
.get(TransportChannelHandler.class);
synchronized (handler) {
handler.getResponseHandler().updateTimeOfLastRequest();
}
if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}",
cachedClient.getSocketAddress(), cachedClient);
return cachedClient;
}
}
/**
* client快取存在或者未啟用時候的操作:
* 根據IP和埠號重新建立InetSocketAddress
* */
// If we reach here, we don't have an existing connection open. Let's create a new one.
// Multiple threads might race here to create new connections. Keep only one of them active.
final long preResolveHost = System.nanoTime();
final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
if (hostResolveTimeMs > 2000) {
logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
} else {
logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
}
/**
* 建立InetSocketAddress的過程會產生靜態條件,此時標記了clientPool中的locks陣列
* 在clientPool中locks陣列中的元素與TransportClient陣列中的元素一對一對應關係
* 先進入的執行緒會過載createClient方法,並放入到clientPool中與clientIndex對應位置上
* 後面進入的執行緒就會直接得到第一個執行緒進來時建立好的client了
* */
synchronized (clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex];
// 後續執行緒進入時直接使用
if (cachedClient != null) {
if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
return cachedClient;
} else {
logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
}
}
// 第一個執行緒進來時建立client,過載了createClient方法
clientPool.clients[clientIndex] = createClient(resolvedAddress);
return clientPool.clients[clientIndex];
}
}
從上面的建立過程可以知道TransportClinetFactory採用隨機負載均衡的方式,從clientPool(也就是快取)中獲取client。
第一個執行緒進行建立client操作時,過載了私有的createClient方法,這個方法才是真正建立TransportClient方法,程式碼如下:
private TransportClient createClient(InetSocketAddress address) throws IOException {
logger.debug("Creating new connection to {}", address);
/** 構建初始載入程式並進行配置 */
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();
/** 初始引導設定初始化channel */
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address); // 連結遠端伺服器
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
TransportClient client = clientRef.get();
Channel channel = channelRef.get();
assert client != null : "Channel future completed successfully with null client";
// Execute any client bootstraps synchronously before marking the Client as successful.
long preBootstrap = System.nanoTime();
logger.debug("Connection to {} successful, running bootstraps...", address);
try {
/** 執行TransportClient載入程式列表 */
for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
clientBootstrap.doBootstrap(client, channel);
}
} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
client.close();
throw Throwables.propagate(e);
}
long postBootstrap = System.nanoTime();
logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
return client;
}
createClient方法主要做了以下幾件事:
- 初始化根載入程式並進行配置
- 根載入程式設定初始化管道
- 使用根載入程式連線遠端伺服器
- 執行TransportClient載入程式列表
- 返回TransportClient物件
TransportClinetFactory中連線池的結構如下
多個連結對應對個客戶端快取,快取中每個客戶端快取對應一個鎖,用於避免競爭,並採用隨機負載均衡的方式從快取中獲取客戶端。
RPC服務端TransportServer
TransportContext另一個很重要的作用就是建立org.apache.spark.network.server.TransportServer。
TransportContext有4個createServer過載方法(介紹TransportContext已經提到過了),但其實都是在用這個構造器建立TransportServer的。
對一些成員變數context、conf、appRpcHandler、bootstraps進行賦值過後,開始執行init(String hostToBind, int portToBind)對TransportServer初始化。
init(String hostToBind, int portToBind)的程式碼如下:
private void init(String hostToBind, int portToBind) {
/**
* ioMode同TransportClientFactory初始化時是一樣的
* conf中key為"spark."+module+"io.mode"的值,IO mode有兩種模式:nio(預設)和epoll
* */
IOMode ioMode = IOMode.valueOf(conf.ioMode());
/**
* 1、建立Netty服務端需要同時建立bossGroup和workerGroup
* */
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
/**
* 2、建立ByteBuf分配器,對本地執行緒快取禁用
* (ByteBuf由事件迴圈執行緒分配,執行執行緒釋放,本地快取會延遲迴收,加大開銷,所有禁用)
* */
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
/**
* 3、建立跟載入程式並配置
* */
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
/**
* 4、初始化管道,執行TransportServer載入程式列表
* */
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
/**
* 5、繫結IP 埠號
* */
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
初始化TransportServer一共做了5件事:
- 建立boosGroup和workerGroup(Netty服務端需要同時建立)。ioMode的建立與TransportClientFactory中建立TransportClient的方式是一樣的。
- 建立ByteBuf分配器,對本地執行緒快取禁用。ByteBuf由事件迴圈執行緒分配,執行執行緒釋放,本地快取會延遲迴收,加大開銷,所有禁用。
- 建立根載入程式並配置。
- 初始化管道,執行TransportServer載入程式列表。
- 繫結IP 埠號。
未完待續~~~