1. 程式人生 > >alluxio-rpc調用概述-client和worker之間的block模塊的通訊架構(netty版本)(2)

alluxio-rpc調用概述-client和worker之間的block模塊的通訊架構(netty版本)(2)

type ret proto worker fallback 關於 isp rom instance

(1.8版本)client和worker之間的block模塊的通訊架構

block作為alluxio文件讀取或者存儲的最小基本單位,都是通過BlockOutStream和BlockInputtream實現的

其中具體的數據包傳輸有Short circuit和netty兩種實現:

  • Short circuit:通過本地的ramdisk傳輸和netty傳輸
  • tcp傳輸:只通過netty傳輸

輸入流代碼中,關於兩種實現的選擇邏輯:

 1 public static BlockInStream create(FileSystemContext context, BlockInfo info,
 2     WorkerNetAddress dataSource, BlockInStreamSource dataSourceType, InStreamOptions options)
 3     throws IOException {
 4   URIStatus status = options.getStatus();
 5   OpenFileOptions readOptions = options.getOptions();
 6 
 7   boolean promote = readOptions.getReadType().isPromote();
 8 
 9   long blockId = info.getBlockId();
10   long blockSize = info.getLength();
11 
12   // Construct the partial read request
13   Protocol.ReadRequest.Builder builder =
14       Protocol.ReadRequest.newBuilder().setBlockId(blockId).setPromote(promote);
15   // Add UFS fallback options
16   builder.setOpenUfsBlockOptions(options.getOpenUfsBlockOptions(blockId));
17 
18   boolean shortCircuit = Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED);
19   boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource);
20   boolean sourceIsLocal = dataSourceType == BlockInStreamSource.LOCAL;
21 
22   // Short circuit
23   if (sourceIsLocal && shortCircuit && !sourceSupportsDomainSocket) {
24     LOG.debug("Creating short circuit input stream for block {} @ {}", blockId, dataSource);
25     try {
26       return createLocalBlockInStream(context, dataSource, blockId, blockSize, options);
27     } catch (NotFoundException e) {
28       // Failed to do short circuit read because the block is not available in Alluxio.
29       // We will try to read via netty. So this exception is ignored.
30       LOG.warn("Failed to create short circuit input stream for block {} @ {}. Falling back to "
31           + "network transfer", blockId, dataSource);
32     }
33   }
34 
35   // Netty
36   LOG.debug("Creating netty input stream for block {} @ {} from client {} reading through {}",
37       blockId, dataSource, NetworkAddressUtils.getClientHostName(), dataSource);
38   return createNettyBlockInStream(context, dataSource, dataSourceType, builder.buildPartial(),
39       blockSize, options);
40 }

輸出流代碼中,關於兩種實現的選擇邏輯:

 1 /**
 2      * @param context the file system context
 3      * @param blockId the block ID
 4      * @param blockSize the block size in bytes
 5      * @param address the Alluxio worker address
 6      * @param options the out stream options
 7      * @return the {@link PacketWriter} instance
 8      */
 9     public static PacketWriter create(FileSystemContext context, long blockId, long blockSize,
10         WorkerNetAddress address, OutStreamOptions options) throws IOException {
11       if (CommonUtils.isLocalHost(address) && Configuration
12           .getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED) && !NettyUtils
13           .isDomainSocketSupported(address)) {
14         LOG.debug("Creating short circuit output stream for block {} @ {}", blockId, address);
15         return LocalFilePacketWriter.create(context, address, blockId, options);
16       } else {
17         LOG.debug("Creating netty output stream for block {} @ {} from client {}", blockId, address,
18             NetworkAddressUtils.getClientHostName());
19         return NettyPacketWriter
20             .create(context, address, blockId, blockSize, Protocol.RequestType.ALLUXIO_BLOCK,
21                 options);
22       }
23     }

short-circuit策略

針對block的的創建/摧毀,通過netty進行網絡通訊

針對block的實際內容,直接通過ramdisk寫到本地,避免了使用netty進行網絡通訊,

short-circuit通訊-客戶端:

下圖是Netty版本的客戶端+LocalFileBlockWriter+LocalFileBlockReader調用過程

技術分享圖片

short-circuit通訊-服務端:

下圖是Netty版本的服務端調用

技術分享圖片

非short-circuit

非short-circuit通訊-客戶端:

下圖是Netty版本的客戶端調用過程

技術分享圖片

非short-circuit通訊-服務端:

下圖是Netty版本的服務端調用

技術分享圖片

下一篇將介紹BlockWorker相關的功能

alluxio-rpc調用概述-client和worker之間的block模塊的通訊架構(netty版本)(2)