1. 程式人生 > >spark原始碼分析之UnsafeShuffleWriter

spark原始碼分析之UnsafeShuffleWriter

概述

 SortShuffleManager會判斷在滿足以下條件時呼叫UnsafeShuffleWriter,否則降級為使用SortShuffleWriter:

  1. Serializer支援relocation。這是指Serializer可以對已經序列化的物件進行排序,這種排序起到的效果和先對資料排序再序列化一致。支援relocation的Serializer是KryoSerializer,Spark預設使用JavaSerializer,通過引數spark.serializer設定;
  2. 不需要map side aggregate,即不能定義aggregator;
  3. partition數量不能大於指定的閾值(2^24);

UnsafeShuffleWriter 將record序列化後插入sorter,然後對已經序列化的record進行排序,並在排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。在合併時會基於spill file的數量和IO compression codec選擇最合適的合併策略。

原始碼分析

ShuffleMapTask獲取ShuffleManager

在ShuffleMapTask呼叫runTask()方法執行任務的時候,會從SparkEnv中獲取ShuffleManager。

 ShuffleMapTask的runTask()方法如下:

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
 
    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
       //獲取shuffleManager
      val manager = SparkEnv.get.shuffleManager
      //shuffleManger獲取writer
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //writer呼叫write方法
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

SortShufflemanager獲取writer

SortShuffleManager會根據條件是否滿足選擇相應的ShuffleHandle,ShuffleHandle對應的shuffle writer如下:

BypassMergeSortShuffleHandle BypassMergeSortShuffleWriter
SerializedShuffleHandle UnsafeShuffleWriter
BaseShuffleHandle SortShuffleWriter

registerShuffle方法 

SortShufflemanager.scala

/**
   * Obtains a [[ShuffleHandle]] to pass to tasks.
   */
  override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

canUseSerializedShuffle方法

該方法判斷是否滿足呼叫UnsafeShuffleWriter的條件:

  1. Serializer支援relocation;
  2. 不需要map side aggregate,即不能定義aggregator;
  3. partition數量不能大於指定的閾值(2^24);

SortShufflemanager.scala

/**
   * Helper method for determining whether a shuffle should use an optimized serialized shuffle
   * path or whether it should fall back to the original path that operates on deserialized objects.
   */
  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    //判斷serializer是否支援relocation
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
        s"${dependency.serializer.getClass.getName}, does not support object relocation")
      false
    //判斷是否map端的聚合
    } else if (dependency.aggregator.isDefined) {
      log.debug(
        s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
      false
    //判斷是否大於指定的閾值
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
      false
    } else {
      log.debug(s"Can use serialized shuffle for shuffle $shufId")
      true
    }
  }
}

getWriter方法

如果滿足條件,則handle是SerializedShuffleHandle ,建立UnsafeShuffleWriter來寫資料。

SortShufflemanager.scala

/** Get a writer for a given partition. Called on executors by map tasks. */
  override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        //建立UnsafeShuffleWriter
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

UnsafeShuffleWriter

write方法

1、將record進行分割槽並序列化後插入sorter。

2、將record進行排序,並在排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。

@Override
  public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
    // Keep track of success so we know if we encountered an exception
    // We do this rather than a standard try/catch/re-throw to handle
    // generic throwables.
    boolean success = false;
    try {
      //將record進行分割槽並序列化後插入sorter
      while (records.hasNext()) {
        insertRecordIntoSorter(records.next());
      }
      //將record進行排序,並在排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。
      closeAndWriteOutput();
      success = true;
    } finally {
      if (sorter != null) {
        try {
          sorter.cleanupResources();
        } catch (Exception e) {
          // Only throw this error if we won't be masking another
          // error.
          if (success) {
            throw e;
          } else {
            logger.error("In addition to a failure during writing, we failed during " +
                         "cleanup.", e);
          }
        }
      }
    }
  }

insertRecordToSorter方法

該方法將record進行分割槽並序列化後插入sorter。方法實現如下:

  1. 呼叫partitioner.getPartition方法對record的key進行分割槽,從而確定record被分配到哪個分割槽,並獲取該分割槽的partitionId;
  2. 將record的key值和value值分別序列後儲存到BtyeArrayOutputStream底層的buf欄位;
  3. 將序列後的record插入到ShuffleExternalSorter;

在對record進行分割槽的過程中,假設使用的是HashPartitioner,則getPartition方法會將record的key的hashCode,和numPartition進行取模運算,從而確定record被分配的分割槽。

在record序列化的過程中,假設使用的是JavaSerializer,流的逐層(自底而上)呼叫關係為:

MyByteArrayOutputStream 》ObjectOutputStream 》JavaSerializationStream

其中,MyByteArrayOutputStream是ByteArrayOutputStream的子類,用以直接暴露buf[]欄位;

ObjectOutputStream是java io實現的序列流,它的writeObject方法用以將物件寫入流中;

JavaSerializationStream是ObjectOutputStream的代理類。

最終是將record序列化後儲存到ByteArrayOutputStream的buf欄位中。

下面程式碼中,serBuffer是MyByteArrayOutputStream的例項,呼叫getBuf方法可以獲取底層ByteArrayOutputStream的buf欄位。

serOutputStream是SerializationStream的例項,他會根據SparkConf初始化為JavaSerializationStream或者KryoSerializationStream。KryoSerializationStream的序列過程不在這裡贅述。

ps:當然,JavaSerializer是不支援relocation的,所以事實上不可能會使用JavaSerializationStream。這裡只是舉個例子。

@VisibleForTesting
  void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    //getPartition方法對key進行分割槽,從而確定record被分配到哪個分割槽,並獲取該分割槽的partitionId
    final int partitionId = partitioner.getPartition(key);
    serBuffer.reset();
    //將record的key值序列後儲存到serBuffer底層的buf欄位
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    //將record的value值序列後儲存到serBuffer底層的buf欄位
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();

    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);
    //將序列後的record插入到sorter
    sorter.insertRecord(
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
  }

 closeAndWriteOutput方法

將record進行排序,排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。

該方法實現如下:

1、將記憶體的record排序,排序完成後寫入磁碟檔案作為spill file,最後返回這些spill file的元資料資訊—— SpillInfo[];

2、構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;

3、在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名;

4、將多個spill檔案合併成一個輸出檔案。基於spill檔案的數量和IO壓縮編解碼器選擇最合適的合併策略;

5、將每個partition的offset寫入index檔案方便reduce端fetch資料;

@VisibleForTesting
  void closeAndWriteOutput() throws IOException {
    assert(sorter != null);
    updatePeakMemoryUsed();
    serBuffer = null;
    serOutputStream = null;
    //將記憶體的record排序,排序完成後寫入磁碟檔案作為spill file,最後返回這些spill file的元資料資訊—— SpillInfo[];
    final SpillInfo[] spills = sorter.closeAndGetSpills();
    sorter = null;
    final long[] partitionLengths;
    /**構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + 
       mapId + "_" + reduceId;
    **/
    final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
     //在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名;
    final File tmp = Utils.tempFileWith(output);
    try {
      try {
        //將多個spill檔案合併成一個輸出檔案。基於spill檔案的數量和IO壓縮編解碼器選擇最合適的合併策略。
        partitionLengths = mergeSpills(spills, tmp);
      } finally {
        for (SpillInfo spill : spills) {
          if (spill.file.exists() && ! spill.file.delete()) {
            logger.error("Error while deleting spill file {}", spill.file.getPath());
          }
        }
      }
      //將每個partition的offset寫入index檔案方便reduce端fetch資料
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }

mergeSpills方法

將多個spill檔案合併成一個輸出檔案。基於spill檔案的數量和IO compression codec選擇最合適的合併策略。

當有多個spill檔案時,它的合併策略選擇如下:

1、從SparkConf獲取是否允許和是否支援fastMerge的資訊,如果是,選擇fast merge路徑,否則選擇slow merge路徑。

2、當選擇fast merge路徑後,判斷是否允許TransferTo及不需要加密,如果是,使用基於TransferTo的fast merge,否則,使用基於file Stream的fast merge。       

/**
   * Merge zero or more spill files together, choosing the fastest merging strategy based on the
   * number of spills and the IO compression codec.
   *
   * @return the partition lengths in the merged file.
   */
  private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
	 //從sparkConf獲取是否允許compression的flag
    final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
	//根據sparkConf的codec資訊生成對應的CompressionCodec
    final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
	 //從sparkConf獲取是否允許fastMerge的flag
    final boolean fastMergeEnabled =
      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
    final boolean fastMergeIsSupported = !compressionEnabled ||
      CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
    final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
    try {
      if (spills.length == 0) {
        new FileOutputStream(outputFile).close(); // Create an empty file
        return new long[partitioner.numPartitions()];
      } else if (spills.length == 1) {
        // Here, we don't need to perform any metrics updates because the bytes written to this
        // output file would have already been counted as shuffle bytes written.
        Files.move(spills[0].file, outputFile);
        return spills[0].partitionLengths;
      } else {
        final long[] partitionLengths;
        // There are multiple spills to merge, so none of these spill files' lengths were counted
        // towards our shuffle write count or shuffle write time. If we use the slow merge path,
        // then the final output file's size won't necessarily be equal to the sum of the spill
        // files' sizes. To guard against this case, we look at the output file's actual size when
        // computing shuffle bytes written.
		// 這條條件(if/else)路徑,為有多個spill檔案要合併,所以沒有在shuffle write count或者
		// shuffle write time時計算這些spill檔案的長度。 如果我們使用慢合併路徑,
        //  那麼最終輸出檔案的大小不一定等於所有spill file的大小的總和。為了防止這種情況 ,我們在
        // 計算shuffle寫入的位元組時,觀察輸出檔案的真實大小。
        //
        // We allow the individual merge methods to report their own IO times since different merge
        // strategies use different IO techniques.  We count IO during merge towards the shuffle
        // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
        // branch in ExternalSorter.
		// 我們允許各個合併方法報告它們自己的IO時間,既然不同的合併策略使用不用的IO技術。我們將合併期間
		//的IO時間統計到shuffle write time。
		//
        if (fastMergeEnabled && fastMergeIsSupported) {
          // Compression is disabled or we are using an IO compression codec that supports
          // decompression of concatenated compressed streams, so we can perform a fast spill merge
          // that doesn't need to interpret the spilled bytes.
		  //如果壓縮被禁用,或者我們正在使用支援被拼接的壓縮流的解壓縮的壓縮編解碼器,我們可以執行
		  //快速的spill檔案合併,不需要去解釋溢位的位元組。
          if (transferToEnabled && !encryptionEnabled) {
            logger.debug("Using transferTo-based fast merge");
            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
          } else {
            logger.debug("Using fileStream-based fast merge");
            partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
          }
        } else {
          logger.debug("Using slow merge");
          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
        }
        // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
        // in-memory records, we write out the in-memory records to a file but do not count that
        // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
        // to be counted as shuffle write, but this will lead to double-counting of the final
        // SpillInfo's bytes.
        writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
        writeMetrics.incBytesWritten(outputFile.length());
        return partitionLengths;
      }
    } catch (IOException e) {
      if (outputFile.exists() && !outputFile.delete()) {
        logger.error("Unable to delete output file {}", outputFile.getPath());
      }
      throw e;
    }
  }

mergeSpillsWithFileStream方法

使用java FileStream來合併spill file。

該合併方式明顯慢於基於NIO(transferTo)的合併方式——UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[],
   * File)。因此,它主要用於以下情形:

1、IO compression codec不支援壓縮資料的拼接;

2、允許對資料加密;

3、使用者明確禁用了TransferTo。版本號為2.6.32的linux核心在使用NIO方式會產生bug,需要將spark.file.transferTo引數設定為false。

4、當一個spill file中各個partition的大小都很小的時候,使用mergeSpillsWithFileStream方法是更快的,因為mergeSpillsWithTransferTo方法執行很小的磁碟IO是低效率的。在這種磁碟IO小且數量多的情況下,使用大緩衝區給輸入輸出檔案有助於減少磁碟IO的數量,使檔案合併更快。

該方法實現如下:

1、為每個spill file建立輸入流。建立的輸入流及流的裝飾關係如下:

NioBufferedFileInputStream  》LimitedInputStream 》CryptoInputStream 》compressedInputStream

ps:compressedInputStream不是一個類,只是為了方便陳述用了該名詞,它指的是ZstdInputStream、SnappyInputStream、LZFInputStream、LZ4BlockInputStream這些流的其中某一種。compressedOutputStream也是如此。

2、為最終的輸出檔案outputFile建立輸出流。建立的輸出流及流的裝飾關係如下:

FileOutputStream 》BufferedOutputStream 》CountingOutputStream 》TimeTrackingOutputStream 

》CloseAndFlushShieldOutputStream 》CryptoOutputStream 》compressedOutputStream

3、將輸入流的全部位元組複製到輸出流;

/**
   * Merges spill files using Java FileStreams. This code path is typically slower than
   * the NIO-based merge, {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[],
   * File)}, and it's mostly used in cases where the IO compression codec does not support
   * concatenation of compressed data, when encryption is enabled, or when users have
   * explicitly disabled use of {@code transferTo} in order to work around kernel bugs.
   * This code path might also be faster in cases where individual partition size in a spill
   * is small and UnsafeShuffleWriter#mergeSpillsWithTransferTo method performs many small
   * disk ios which is inefficient. In those case, Using large buffers for input and output
   * files helps reducing the number of disk ios, making the file merging faster.
   *
   * @param spills the spills to merge.
   * @param outputFile the file to write the merged data to.
   * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled.
   * @return the partition lengths in the merged file.
   */
  private long[] mergeSpillsWithFileStream(
      SpillInfo[] spills,
      File outputFile,
      @Nullable CompressionCodec compressionCodec) throws IOException {
    assert (spills.length >= 2);
    final int numPartitions = partitioner.numPartitions();
    final long[] partitionLengths = new long[numPartitions];
    final InputStream[] spillInputStreams = new InputStream[spills.length];
    
    //為最終的輸出檔案outputFile建立輸出流
    final OutputStream bos = new BufferedOutputStream(
            new FileOutputStream(outputFile),
            outputBufferSizeInBytes);
    // Use a counting output stream to avoid having to close the underlying file and ask
    // the file system for its size after each partition is written.
    final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos);

    boolean threwException = true;
    try {
      //為每個spill file建立輸入流
      for (int i = 0; i < spills.length; i++) {
        spillInputStreams[i] = new NioBufferedFileInputStream(
            spills[i].file,
            inputBufferSizeInBytes);
      }
      //外迴圈遍歷partition
      for (int partition = 0; partition < numPartitions; partition++) {
        final long initialFileLength = mergedFileOutputStream.getByteCount();
        // Shield the underlying output stream from close() and flush() calls, so that we can close
        // the higher level streams to make sure all data is really flushed and internal state is
        // cleaned.
        OutputStream partitionOutput = new CloseAndFlushShieldOutputStream(
          new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream));
        partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);
        if (compressionCodec != null) {
          partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);
        }
        //內迴圈遍歷spill file
        for (int i = 0; i < spills.length; i++) {
          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
          if (partitionLengthInSpill > 0) {
            InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i],
              partitionLengthInSpill, false);
            try {
              partitionInputStream = blockManager.serializerManager().wrapForEncryption(
                partitionInputStream);
              if (compressionCodec != null) {
                partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
              }
              //將輸入流的全部位元組複製到輸出流
              ByteStreams.copy(partitionInputStream, partitionOutput);
            } finally {
              partitionInputStream.close();
            }
          }
        }
        partitionOutput.flush();
        partitionOutput.close();
        partitionLengths[partition] = (mergedFileOutputStream.getByteCount() - initialFileLength);
      }
      threwException = false;
    } finally {
      // To avoid masking exceptions that caused us to prematurely enter the finally block, only
      // throw exceptions during cleanup if threwException == false.
      for (InputStream stream : spillInputStreams) {
        Closeables.close(stream, threwException);
      }
      Closeables.close(mergedFileOutputStream, threwException);
    }
    return partitionLengths;
  }

mergeSpillsWithTransferTo方法

合併多個spill file,通過使用NIO的transferTo方法來拼接spill partition的位元組。

只有當IO compression codec和seializer支援serialized stream的拼接時才是安全的。

該方法實現如下:

1、為每個spill file建立輸入流,並獲取輸入流對應的FileChannel;

2、為最終的輸出檔案outputFile建立輸出流,並獲取輸出流對應的FileChannel;

3、輸入流對應的FileChannel呼叫transferTo方法,將位元組轉移到輸出流對應的FileChannel;

/**
   * Merges spill files by using NIO's transferTo to concatenate spill partitions' bytes.
   * This is only safe when the IO compression codec and serializer support concatenation of
   * serialized streams.
   *
   * @return the partition lengths in the merged file.
   */
  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
    assert (spills.length >= 2);
    final int numPartitions = partitioner.numPartitions();
    final long[] partitionLengths = new long[numPartitions];
    final FileChannel[] spillInputChannels = new FileChannel[spills.length];
    final long[] spillInputChannelPositions = new long[spills.length];
    FileChannel mergedFileOutputChannel = null;

    boolean threwException = true;
    try {
      //為每個spill file建立輸入流,並獲取輸入流對應的通道
      for (int i = 0; i < spills.length; i++) {
        spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
      }
      // This file needs to opened in append mode in order to work around a Linux kernel bug that
      // affects transferTo; see SPARK-3948 for more details.
      //為最終的輸出檔案outputFile建立輸出流,並獲取輸出流對應的通道
      //輸出檔案需要以追加模式開啟
      mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();

      long bytesWrittenToMergedFile = 0;
      //外迴圈遍歷partition
      for (int partition = 0; partition < numPartitions; partition++) {
        //內迴圈遍歷spill file
        for (int i = 0; i < spills.length; i++) {
          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
          final FileChannel spillInputChannel = spillInputChannels[i];
          final long writeStartTime = System.nanoTime();
          //輸入流對應的FileChannel呼叫transferTo方法,將位元組轉移到輸出流對應的FileChannel
          Utils.copyFileStreamNIO(
            spillInputChannel,
            mergedFileOutputChannel,
            spillInputChannelPositions[i],
            partitionLengthInSpill);
          spillInputChannelPositions[i] += partitionLengthInSpill;
          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
          bytesWrittenToMergedFile += partitionLengthInSpill;
          partitionLengths[partition] += partitionLengthInSpill;
        }
      }
      // Check the position after transferTo loop to see if it is in the right position and raise an
      // exception if it is incorrect. The position will not be increased to the expected length
      // after calling transferTo in kernel version 2.6.32. This issue is described at
      // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
      if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
        throw new IOException(
          "Current position " + mergedFileOutputChannel.position() + " does not equal expected " +
            "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" +
            " version to see if it is 2.6.32, as there is a kernel bug which will lead to " +
            "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " +
            "to disable this NIO feature."
        );
      }
      threwException = false;
    } finally {
      // To avoid masking exceptions that caused us to prematurely enter the finally block, only
      // throw exceptions during cleanup if threwException == false.
      for (int i = 0; i < spills.length; i++) {
        assert(spillInputChannelPositions[i] == spills[i].file.length());
        Closeables.close(spillInputChannels[i], threwException);
      }
      Closeables.close(mergedFileOutputChannel, threwException);
    }
    return partitionLengths;
  }