spark原始碼分析之UnsafeShuffleWriter
概述
SortShuffleManager會判斷在滿足以下條件時呼叫UnsafeShuffleWriter,否則降級為使用SortShuffleWriter:
- Serializer支援relocation。這是指Serializer可以對已經序列化的物件進行排序,這種排序起到的效果和先對資料排序再序列化一致。支援relocation的Serializer是KryoSerializer,Spark預設使用JavaSerializer,通過引數spark.serializer設定;
- 不需要map side aggregate,即不能定義aggregator;
- partition數量不能大於指定的閾值(2^24);
UnsafeShuffleWriter 將record序列化後插入sorter,然後對已經序列化的record進行排序,並在排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。在合併時會基於spill file的數量和IO compression codec選擇最合適的合併策略。
原始碼分析
ShuffleMapTask獲取ShuffleManager
在ShuffleMapTask呼叫runTask()方法執行任務的時候,會從SparkEnv中獲取ShuffleManager。
ShuffleMapTask的runTask()方法如下:
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { //獲取shuffleManager val manager = SparkEnv.get.shuffleManager //shuffleManger獲取writer writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //writer呼叫write方法 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) return writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
SortShufflemanager獲取writer
SortShuffleManager會根據條件是否滿足選擇相應的ShuffleHandle,ShuffleHandle對應的shuffle writer如下:
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter |
SerializedShuffleHandle | UnsafeShuffleWriter |
BaseShuffleHandle | SortShuffleWriter |
registerShuffle方法
SortShufflemanager.scala
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
canUseSerializedShuffle方法
該方法判斷是否滿足呼叫UnsafeShuffleWriter的條件:
- Serializer支援relocation;
- 不需要map side aggregate,即不能定義aggregator;
- partition數量不能大於指定的閾值(2^24);
SortShufflemanager.scala
/**
* Helper method for determining whether a shuffle should use an optimized serialized shuffle
* path or whether it should fall back to the original path that operates on deserialized objects.
*/
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
//判斷serializer是否支援relocation
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
//判斷是否map端的聚合
} else if (dependency.aggregator.isDefined) {
log.debug(
s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
false
//判斷是否大於指定的閾值
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
}
getWriter方法
如果滿足條件,則handle是SerializedShuffleHandle ,建立UnsafeShuffleWriter來寫資料。
SortShufflemanager.scala
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
//建立UnsafeShuffleWriter
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
UnsafeShuffleWriter
write方法
1、將record進行分割槽並序列化後插入sorter。
2、將record進行排序,並在排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
//將record進行分割槽並序列化後插入sorter
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
//將record進行排序,並在排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
insertRecordToSorter方法
該方法將record進行分割槽並序列化後插入sorter。方法實現如下:
- 呼叫partitioner.getPartition方法對record的key進行分割槽,從而確定record被分配到哪個分割槽,並獲取該分割槽的partitionId;
- 將record的key值和value值分別序列後儲存到BtyeArrayOutputStream底層的buf欄位;
- 將序列後的record插入到ShuffleExternalSorter;
在對record進行分割槽的過程中,假設使用的是HashPartitioner,則getPartition方法會將record的key的hashCode,和numPartition進行取模運算,從而確定record被分配的分割槽。
在record序列化的過程中,假設使用的是JavaSerializer,流的逐層(自底而上)呼叫關係為:
MyByteArrayOutputStream 》ObjectOutputStream 》JavaSerializationStream
其中,MyByteArrayOutputStream是ByteArrayOutputStream的子類,用以直接暴露buf[]欄位;
ObjectOutputStream是java io實現的序列流,它的writeObject方法用以將物件寫入流中;
JavaSerializationStream是ObjectOutputStream的代理類。
最終是將record序列化後儲存到ByteArrayOutputStream的buf欄位中。
下面程式碼中,serBuffer是MyByteArrayOutputStream的例項,呼叫getBuf方法可以獲取底層ByteArrayOutputStream的buf欄位。
serOutputStream是SerializationStream的例項,他會根據SparkConf初始化為JavaSerializationStream或者KryoSerializationStream。KryoSerializationStream的序列過程不在這裡贅述。
ps:當然,JavaSerializer是不支援relocation的,所以事實上不可能會使用JavaSerializationStream。這裡只是舉個例子。
@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
final K key = record._1();
//getPartition方法對key進行分割槽,從而確定record被分配到哪個分割槽,並獲取該分割槽的partitionId
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();
//將record的key值序列後儲存到serBuffer底層的buf欄位
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
//將record的value值序列後儲存到serBuffer底層的buf欄位
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
//將序列後的record插入到sorter
sorter.insertRecord(
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
closeAndWriteOutput方法
將record進行排序,排序完成後寫入磁碟檔案作為spill file,再將多個spill file合併成一個輸出檔案。
該方法實現如下:
1、將記憶體的record排序,排序完成後寫入磁碟檔案作為spill file,最後返回這些spill file的元資料資訊—— SpillInfo[];
2、構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;
3、在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名;
4、將多個spill檔案合併成一個輸出檔案。基於spill檔案的數量和IO壓縮編解碼器選擇最合適的合併策略;
5、將每個partition的offset寫入index檔案方便reduce端fetch資料;
@VisibleForTesting
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
//將記憶體的record排序,排序完成後寫入磁碟檔案作為spill file,最後返回這些spill file的元資料資訊—— SpillInfo[];
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
/**構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" +
mapId + "_" + reduceId;
**/
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
//在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名;
final File tmp = Utils.tempFileWith(output);
try {
try {
//將多個spill檔案合併成一個輸出檔案。基於spill檔案的數量和IO壓縮編解碼器選擇最合適的合併策略。
partitionLengths = mergeSpills(spills, tmp);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
//將每個partition的offset寫入index檔案方便reduce端fetch資料
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
mergeSpills方法
將多個spill檔案合併成一個輸出檔案。基於spill檔案的數量和IO compression codec選擇最合適的合併策略。
當有多個spill檔案時,它的合併策略選擇如下:
1、從SparkConf獲取是否允許和是否支援fastMerge的資訊,如果是,選擇fast merge路徑,否則選擇slow merge路徑。
2、當選擇fast merge路徑後,判斷是否允許TransferTo及不需要加密,如果是,使用基於TransferTo的fast merge,否則,使用基於file Stream的fast merge。
/**
* Merge zero or more spill files together, choosing the fastest merging strategy based on the
* number of spills and the IO compression codec.
*
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
//從sparkConf獲取是否允許compression的flag
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
//根據sparkConf的codec資訊生成對應的CompressionCodec
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
//從sparkConf獲取是否允許fastMerge的flag
final boolean fastMergeEnabled =
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
final boolean fastMergeIsSupported = !compressionEnabled ||
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
Files.move(spills[0].file, outputFile);
return spills[0].partitionLengths;
} else {
final long[] partitionLengths;
// There are multiple spills to merge, so none of these spill files' lengths were counted
// towards our shuffle write count or shuffle write time. If we use the slow merge path,
// then the final output file's size won't necessarily be equal to the sum of the spill
// files' sizes. To guard against this case, we look at the output file's actual size when
// computing shuffle bytes written.
// 這條條件(if/else)路徑,為有多個spill檔案要合併,所以沒有在shuffle write count或者
// shuffle write time時計算這些spill檔案的長度。 如果我們使用慢合併路徑,
// 那麼最終輸出檔案的大小不一定等於所有spill file的大小的總和。為了防止這種情況 ,我們在
// 計算shuffle寫入的位元組時,觀察輸出檔案的真實大小。
//
// We allow the individual merge methods to report their own IO times since different merge
// strategies use different IO techniques. We count IO during merge towards the shuffle
// shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
// branch in ExternalSorter.
// 我們允許各個合併方法報告它們自己的IO時間,既然不同的合併策略使用不用的IO技術。我們將合併期間
//的IO時間統計到shuffle write time。
//
if (fastMergeEnabled && fastMergeIsSupported) {
// Compression is disabled or we are using an IO compression codec that supports
// decompression of concatenated compressed streams, so we can perform a fast spill merge
// that doesn't need to interpret the spilled bytes.
//如果壓縮被禁用,或者我們正在使用支援被拼接的壓縮流的解壓縮的壓縮編解碼器,我們可以執行
//快速的spill檔案合併,不需要去解釋溢位的位元組。
if (transferToEnabled && !encryptionEnabled) {
logger.debug("Using transferTo-based fast merge");
partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
} else {
logger.debug("Using fileStream-based fast merge");
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
}
} else {
logger.debug("Using slow merge");
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
}
// When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
// in-memory records, we write out the in-memory records to a file but do not count that
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
if (outputFile.exists() && !outputFile.delete()) {
logger.error("Unable to delete output file {}", outputFile.getPath());
}
throw e;
}
}
mergeSpillsWithFileStream方法
使用java FileStream來合併spill file。
該合併方式明顯慢於基於NIO(transferTo)的合併方式——UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[],
* File)。因此,它主要用於以下情形:
1、IO compression codec不支援壓縮資料的拼接;
2、允許對資料加密;
3、使用者明確禁用了TransferTo。版本號為2.6.32的linux核心在使用NIO方式會產生bug,需要將spark.file.transferTo引數設定為false。
4、當一個spill file中各個partition的大小都很小的時候,使用mergeSpillsWithFileStream方法是更快的,因為mergeSpillsWithTransferTo方法執行很小的磁碟IO是低效率的。在這種磁碟IO小且數量多的情況下,使用大緩衝區給輸入輸出檔案有助於減少磁碟IO的數量,使檔案合併更快。
該方法實現如下:
1、為每個spill file建立輸入流。建立的輸入流及流的裝飾關係如下:
NioBufferedFileInputStream 》LimitedInputStream 》CryptoInputStream 》compressedInputStream
ps:compressedInputStream不是一個類,只是為了方便陳述用了該名詞,它指的是ZstdInputStream、SnappyInputStream、LZFInputStream、LZ4BlockInputStream這些流的其中某一種。compressedOutputStream也是如此。
2、為最終的輸出檔案outputFile建立輸出流。建立的輸出流及流的裝飾關係如下:
FileOutputStream 》BufferedOutputStream 》CountingOutputStream 》TimeTrackingOutputStream
》CloseAndFlushShieldOutputStream 》CryptoOutputStream 》compressedOutputStream
3、將輸入流的全部位元組複製到輸出流;
/**
* Merges spill files using Java FileStreams. This code path is typically slower than
* the NIO-based merge, {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[],
* File)}, and it's mostly used in cases where the IO compression codec does not support
* concatenation of compressed data, when encryption is enabled, or when users have
* explicitly disabled use of {@code transferTo} in order to work around kernel bugs.
* This code path might also be faster in cases where individual partition size in a spill
* is small and UnsafeShuffleWriter#mergeSpillsWithTransferTo method performs many small
* disk ios which is inefficient. In those case, Using large buffers for input and output
* files helps reducing the number of disk ios, making the file merging faster.
*
* @param spills the spills to merge.
* @param outputFile the file to write the merged data to.
* @param compressionCodec the IO compression codec, or null if shuffle compression is disabled.
* @return the partition lengths in the merged file.
*/
private long[] mergeSpillsWithFileStream(
SpillInfo[] spills,
File outputFile,
@Nullable CompressionCodec compressionCodec) throws IOException {
assert (spills.length >= 2);
final int numPartitions = partitioner.numPartitions();
final long[] partitionLengths = new long[numPartitions];
final InputStream[] spillInputStreams = new InputStream[spills.length];
//為最終的輸出檔案outputFile建立輸出流
final OutputStream bos = new BufferedOutputStream(
new FileOutputStream(outputFile),
outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos);
boolean threwException = true;
try {
//為每個spill file建立輸入流
for (int i = 0; i < spills.length; i++) {
spillInputStreams[i] = new NioBufferedFileInputStream(
spills[i].file,
inputBufferSizeInBytes);
}
//外迴圈遍歷partition
for (int partition = 0; partition < numPartitions; partition++) {
final long initialFileLength = mergedFileOutputStream.getByteCount();
// Shield the underlying output stream from close() and flush() calls, so that we can close
// the higher level streams to make sure all data is really flushed and internal state is
// cleaned.
OutputStream partitionOutput = new CloseAndFlushShieldOutputStream(
new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream));
partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);
if (compressionCodec != null) {
partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);
}
//內迴圈遍歷spill file
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
if (partitionLengthInSpill > 0) {
InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i],
partitionLengthInSpill, false);
try {
partitionInputStream = blockManager.serializerManager().wrapForEncryption(
partitionInputStream);
if (compressionCodec != null) {
partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
}
//將輸入流的全部位元組複製到輸出流
ByteStreams.copy(partitionInputStream, partitionOutput);
} finally {
partitionInputStream.close();
}
}
}
partitionOutput.flush();
partitionOutput.close();
partitionLengths[partition] = (mergedFileOutputStream.getByteCount() - initialFileLength);
}
threwException = false;
} finally {
// To avoid masking exceptions that caused us to prematurely enter the finally block, only
// throw exceptions during cleanup if threwException == false.
for (InputStream stream : spillInputStreams) {
Closeables.close(stream, threwException);
}
Closeables.close(mergedFileOutputStream, threwException);
}
return partitionLengths;
}
mergeSpillsWithTransferTo方法
合併多個spill file,通過使用NIO的transferTo方法來拼接spill partition的位元組。
只有當IO compression codec和seializer支援serialized stream的拼接時才是安全的。
該方法實現如下:
1、為每個spill file建立輸入流,並獲取輸入流對應的FileChannel;
2、為最終的輸出檔案outputFile建立輸出流,並獲取輸出流對應的FileChannel;
3、輸入流對應的FileChannel呼叫transferTo方法,將位元組轉移到輸出流對應的FileChannel;
/**
* Merges spill files by using NIO's transferTo to concatenate spill partitions' bytes.
* This is only safe when the IO compression codec and serializer support concatenation of
* serialized streams.
*
* @return the partition lengths in the merged file.
*/
private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
assert (spills.length >= 2);
final int numPartitions = partitioner.numPartitions();
final long[] partitionLengths = new long[numPartitions];
final FileChannel[] spillInputChannels = new FileChannel[spills.length];
final long[] spillInputChannelPositions = new long[spills.length];
FileChannel mergedFileOutputChannel = null;
boolean threwException = true;
try {
//為每個spill file建立輸入流,並獲取輸入流對應的通道
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
//為最終的輸出檔案outputFile建立輸出流,並獲取輸出流對應的通道
//輸出檔案需要以追加模式開啟
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
long bytesWrittenToMergedFile = 0;
//外迴圈遍歷partition
for (int partition = 0; partition < numPartitions; partition++) {
//內迴圈遍歷spill file
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
final FileChannel spillInputChannel = spillInputChannels[i];
final long writeStartTime = System.nanoTime();
//輸入流對應的FileChannel呼叫transferTo方法,將位元組轉移到輸出流對應的FileChannel
Utils.copyFileStreamNIO(
spillInputChannel,
mergedFileOutputChannel,
spillInputChannelPositions[i],
partitionLengthInSpill);
spillInputChannelPositions[i] += partitionLengthInSpill;
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
}
// Check the position after transferTo loop to see if it is in the right position and raise an
// exception if it is incorrect. The position will not be increased to the expected length
// after calling transferTo in kernel version 2.6.32. This issue is described at
// https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
throw new IOException(
"Current position " + mergedFileOutputChannel.position() + " does not equal expected " +
"position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" +
" version to see if it is 2.6.32, as there is a kernel bug which will lead to " +
"unexpected behavior when using transferTo. You can set spark.file.transferTo=false " +
"to disable this NIO feature."
);
}
threwException = false;
} finally {
// To avoid masking exceptions that caused us to prematurely enter the finally block, only
// throw exceptions during cleanup if threwException == false.
for (int i = 0; i < spills.length; i++) {
assert(spillInputChannelPositions[i] == spills[i].file.length());
Closeables.close(spillInputChannels[i], threwException);
}
Closeables.close(mergedFileOutputChannel, threwException);
}
return partitionLengths;
}