spark 2.3原始碼分析之SortShuffleWriter
SortShuffleWriter
概述
SortShuffleWriter它主要是判斷在Map端是否需要本地進行combine操作。如果需要聚合,則使用PartitionedAppendOnlyMap;如果不進行combine操作,則使用PartitionedPairBuffer新增資料存放於記憶體中。然後無論哪一種情況都需要判斷記憶體是否足夠,如果記憶體不夠而且又申請不到記憶體,則需要進行本地磁碟溢寫操作,把相關的資料寫入溢寫到臨時檔案。最後把記憶體裡的資料和磁碟溢寫的臨時檔案的資料進行合併,如果需要則進行一次歸併排序,如果沒有發生溢寫則是不需要歸併排序,因為都在記憶體裡。最後生成合並後的data檔案和index檔案。
write方法
該方法實現如下:
1、建立外部排序器ExternalSorter, 只是根據是否需要本地combine與否從而決定是否傳入aggregator和keyOrdering引數;
2、呼叫ExternalSorter例項的insertAll方法,插入record;
如果ExternalSorter例項中用以儲存record的in-memory collection的大小達到閾值,會將record按順序溢寫到磁碟檔案。
3、 構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;
4、在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名;
5、呼叫ExternalSorter例項的writePartitionedFile方法,將插入到該sorter的record進行排序並寫入輸出檔案;
插入到sorter的record可以是在in-memory collection或者在溢寫檔案。
6、將每個partition的offset寫入index檔案方便reduce端fetch資料;
7、 把部分資訊封裝到MapStatus返回;
/** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). /*構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId; */ val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) //在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名 val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) //將排序後的record寫入輸出檔案 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) //將每個partition的offset寫入index檔案方便reduce端fetch資料 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
ExternalSorter
概述
對大量的(k, v)鍵值對進行排序,並且可能合併,從而產生(k, c)型別的key-combiner對。使用一個partitioner將key分組劃分到partition裡,然後使用自定義comparator對每個partition裡的key進行排序。最後,將每個partition中不同位元組範圍的(k, v)鍵值對寫入到一個輸出檔案,以便shuffle fetch。
如果禁用了combining,則型別C必須等於V - 我們將在最後轉換物件型別。
注意:雖然ExternalSorter是一個相當通用的分類器,但它的一些配置是繫結到基於sort的shuffle的使用當中。例如:block compression使用的是"spark.shuffle.compress"。如果是在非shuffle上下文使用ExternalSorter,也許我們應該重新審視這個類,使用不同配置設定。
該類幾個重要的建構函式引數如下:
@param aggregator 可選,aggregator 具有用於合併資料的組合函式
@param partitioner 可選; 如果給定,則按partitionID排序,然後按key
@param ordering 可選;對每個partition內的key進行排序時的順序,是一個總的順序
@param serializer 當溢位到磁碟時使用的serializer
請注意,如果給定了ordering,我們將始終使用它進行排序,所以只有在你確實想要輸出的key被排序時才提供這個引數。在沒有map端聚合的map task中,你可能想傳遞None作為ordering引數來避免意外排序。另一方面,如果你真的想做combining,有一個ordering引數的效率是比沒有的要高的。
使用者應該使用以下方式與這個類互動:
- 初始化一個ExternalSorter例項;
- 呼叫ExternalSorter例項的insertAll方法,插入一批record;
- 呼叫iterator()方法,使用迭代器迭代已經排序完成或者聚合完成的record;或者呼叫writePartitionedFile()方法,在sort shuffle中將已經排序完成或者聚合完成的的record寫入輸出檔案;
這個類的內部工作原理如下:
我們將記憶體上的資料反覆填充到PartitionedAppendOnlyMap(需要按照key合併時),或者PartitionedPairBuffer(不需要按照key合併時),將它們作為buffer。在這些buffer中,我們會按照PartitionId,以及可能按照key,對元素進行排序。為了避免每個key都呼叫partitioner多次,我們在每個record上儲存partitionId。
當每個buffer到達我們的記憶體限制時,我們會將其溢位到檔案中。這個檔案首先按照partitionId進行排序,然後按照key或者key的雜湊值進行排序,如果我們想要做聚合的話。對於每個檔案,我們都會追蹤記憶體中的每個partition的物件的數量,所以我們不需要為每個元素寫上partitionId。
當用戶請求使用迭代器或者檔案輸出時,溢位的檔案會被合併,同時包括記憶體上剩餘的資料。合併時使用的是上面定義的排序順序(除非sorting和aggregation都同時被禁用了)。如果我們需要按照key來聚合,我們要麼使用來自ordering引數的總的排序順序,要麼按照相同雜湊值讀取key值,並且互相比較以合併value值。
期望使用者在最後呼叫stop方法來刪除所有中間檔案。
ExternalSort的父類
Spillable是ExternalSort的父類。同時,Spillable也是MemoryConsumer的子類。
Spillable類用於當記憶體超過閾值時,溢位in-memory collection的內容到磁碟上。
in-memory collection指的是PartitionedAppendOnlyMap或者PartitionPairBuffer資料結構。
成員變數
- serializerBatchSize:從serializer讀取物件,或將物件寫入serializer時,物件的批處理數量。當物件以批處理方式寫入時,每一批都使用它們自己的serialization stream。這在解序列化一個流時,能減少refrence-tracking map的初始化大小。注意,將這個值設定得過小,會導致在序列化時頻繁複制,因為有些serializer在每次物件數量翻倍時,增長內部資料結構是靠growing + copying。
- PartitionedAppendOnlyMap和partitionedPairBuffer:in-memory collection,在spill之前在記憶體上儲存record的資料結構。根據是否需要聚合來決定將物件放到AppendOnlyMap還是PartitionedPairBuffer中。如果需要map端的聚合,使用PartitionedOnlyMap,否則使用partitionPairBuffer。
- keyComparator:key值的比較器,用以將一個partition內的key進行排序,從而允許聚合或者排序。如果ordering引數沒有提供這個comparator,可以使用預設的comparator通過hashcode進行部分排序。部分排序意味著相等的key具有comparator.compare(k,k)= 0,但有些不相等的key也有這個,所以我們需要做一個稍後的傳遞來找到真正相等的key。ps:equals()方法相等的key,它的hashCode()方法一定相等;hashCode()方法相等的key,equals()方法不一定相等。所以通過比較hashCode只能實現部分排序。
- spills:當in-memory collection的大小達到閾值,會將collection上的record按順序溢位到磁碟檔案。用該ArrayBuffer[SpilledFile]例項儲存溢寫檔案的相關資訊。
// Size of object batches when reading/writing from serializers.
//
// Objects are written in batches, with each batch using its own serialization stream. This
// cuts down on the size of reference-tracking maps constructed when deserializing a stream.
//
// NOTE: Setting this too low can cause excessive copying when serializing, since some serializers
// grow internal data structures by growing + copying every time the number of objects doubles.
private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)
// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
@volatile private var map = new PartitionedAppendOnlyMap[K, C]
@volatile private var buffer = new PartitionedPairBuffer[K, C]
// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
// Can be a partial ordering by hash code if a total ordering is not provided through by the
// user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
// non-equal keys also have this, so we need to do a later pass to find truly equal keys).
// Note that we ignore this if no aggregator and no ordering are given.
private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {
override def compare(a: K, b: K): Int = {
val h1 = if (a == null) 0 else a.hashCode()
val h2 = if (b == null) 0 else b.hashCode()
if (h1 < h2) -1 else if (h1 == h2) 0 else 1
}
})
// Information about a spilled file. Includes sizes in bytes of "batches" written by the
// serializer as we periodically reset its stream, as well as number of elements in each
// partition, used to efficiently keep track of partitions when merging.
private[this] case class SpilledFile(
file: File,
blockId: BlockId,
serializerBatchSizes: Array[Long],
elementsPerPartition: Array[Long])
private val spills = new ArrayBuffer[SpilledFile]
注意,如果aggregator和ordering引數都沒有給定,則我們忽略keyComparator。
comparator方法返回ordering引數指定的comparator——也就是成員變數keyComparator;如果沒有定義ordering引數,comparator方法返回None
private def comparator: Option[Comparator[K]] = {
if (ordering.isDefined || aggregator.isDefined) {
Some(keyComparator)
} else {
None
}
}
ExternlSorter插入record
insertAll方法
該方法實現如下:
1、如果需要map端的聚合:
獲取aggregator的mergeValue函式和createCombiner函式,並以此建立update函式。update函式的作用是,如果有值進行mergeValue,如果沒有則createCombiner。
迭代record,計算record的分割槽,並呼叫PartitionedAppendOnlyMap#changeValue方法,執行update函式。
最後,呼叫maybeSpillCollection方法判斷需要溢位資料到磁碟。
2、如果不需要map端的聚合:
迭代record,計算record的分割槽,並呼叫PartitionedPairBuffer#insert方法插入buffer。
最後,呼叫maybeSpillCollection方法判斷需要溢位資料到磁碟。
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
// 使用AppendOnlyMap優先在記憶體中進行combine
// 獲取aggregator的mergeValue函式,用於merge新的值到聚合記錄
val mergeValue = aggregator.get.mergeValue
// 獲取aggregator的createCombiner函式,用於建立聚合的初始值
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
//建立update函式,如果有值進行mergeValue,如果沒有則createCombiner
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
//處理一個元素,就更新一次結果
addElementsRead()
//取出一個(key,value)
kv = records.next()
// 對key計算分割槽,然後開始進行merge
map.changeValue((getPartition(kv._1), kv._1), update)
// 如果需要溢寫記憶體資料到磁碟
maybeSpillCollection(usingMap = true)
}
} else { // 不需要進行本地combine
// Stick values into our buffer
while (records.hasNext) {
//處理一個元素,就更新一次結果
addElementsRead()
// 取出一個(key,value)
val kv = records.next()
// 往PartitionedPairBuffer新增資料
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
// 如果需要溢寫記憶體資料到磁碟
maybeSpillCollection(usingMap = false)
}
}
}
maybeSpillCollection方法
該方法實現如下:
1、如果需要map端的聚合:
估計map的大小,根據預估的map大小決定是否需要進行spill。如果需要spill,在spill之後,初始化一個新的PartitionedAppendOnlyMap。
2、如果不需要map端的聚合:
估計buffer的大小,根據預估的buffer大小決定是否需要進行spill。如果需要spill,spill之後,初始化一個新的PartitionedPairBuffer。
/**
* Spill the current in-memory collection to disk if needed.
*
* @param usingMap whether we're using a map or buffer as our current in-memory collection
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) { //如果使用PartitionedAppendOnlyMap
//估計map的大小
estimatedSize = map.estimateSize()
//根據預估的map大小決定是否需要進行spill
if (maybeSpill(map, estimatedSize)) {
//spill之後,初始化一個新的PartitionedAppendOnlyMap
map = new PartitionedAppendOnlyMap[K, C]
}
} else { //如果使用PartitionedPairBuffer
//估計buffer的大小
estimatedSize = buffer.estimateSize()
//呼叫父類Spillable的maybeSpill方法,根據預估的buffer大小決定是否需要進行spill
if (maybeSpill(buffer, estimatedSize)) {
//spill之後,初始化一個新的PartitionedPairBuffer
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
maybeSpill方法
maybeSpillCollection方法會呼叫父類Spillable的maybeSpill方法。
該方法根據預估的buffer大小決定是否需要進行spill,如果需要spill則呼叫spill方法進行spill。
該方法實現如下:
如果讀取的資料是32的倍數,而且當前記憶體大於記憶體閥值,預設是5M
會先嚐試向TaskMemoryManager申請(2 * currentMemory - myMemoryThreshold)大小的記憶體
如果能夠申請到,則不進行Spill操作,而是繼續向Buffer中儲存資料,
否則就會呼叫spill()方法將Buffer中資料輸出到磁碟檔案
/**
* Spills the current in-memory collection to disk if needed. Attempts to acquire more
* memory before spilling.
*
* @param collection collection to spill to disk
* @param currentMemory estimated size of the collection in bytes
* @return true if `collection` was spilled to disk; false otherwise
*/
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
//底層呼叫TaskMemoryManager的acquireExecutionMemory方法分配記憶體
val granted = acquireMemory(amountToRequest)
// 更新現在記憶體閥值
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
//再次判斷當前記憶體是否大於閥值,如果還是大於閥值則spill
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
//開始spill
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
spill方法
該方法將in-memory collection上的內容按照比較器的順序溢位到磁碟檔案。當在in-memory collection的大小達到閾值時被呼叫。
該方法實現如下:
1、獲取比較器。comparator方法返回ordering引數指定的comparator——也就是成員變數keyComparator;如果沒有定義ordering引數,comparator方法返回None。
2、獲取根據比較器排序後的的in-memory collection的迭代器。
3、溢寫in-memory collection的資料到磁碟一個臨時檔案。
4、 更新溢寫的臨時磁碟檔案。
/**
* Spill our in-memory collection to a sorted file that we can merge later.
* We add this file into `spilledFiles` to find it later.
*
* @param collection whichever collection we're using (map or buffer)
*/
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
//返回一個根據指定的比較器排序的迭代器
//comparator方法返回ordering引數指定的comparator——也就是成員變數keyComparator,
//如果沒有定義ordering引數,comparator方法返回null.
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 溢寫in-memory collection的資料到磁碟一個臨時檔案
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
// 更新溢寫的臨時磁碟檔案
spills += spillFile
}
spillMemoryIteratorToDisk方法
1、建立臨時檔案
2、建立一個DiskBlockObjectWriter用於寫臨時檔案
3、迭代in-memory collection的 inMemoryIterator,用DiskBlockObjectWriter寫入當前迭代的record。如果寫入record的數量到達閾值,將disk writer的緩衝區內容flush到磁碟。
/**
* Spill contents of in-memory iterator to a temporary file on disk.
*/
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
: SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
//建立臨時檔案
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
// These variables are reset after each flush
var objectsWritten: Long = 0
val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
//建立一個DiskBlockObjectWriter用於寫臨時檔案
val writer: DiskBlockObjectWriter =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
// List of batch sizes (bytes) in the order they are written to disk
val batchSizes = new ArrayBuffer[Long]
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is committed at the end of this process.
//將disk writr的緩衝區內容flush到磁碟,並更新相關變數
def flush(): Unit = {
val segment = writer.commitAndGet()
batchSizes += segment.length
_diskBytesSpilled += segment.length
objectsWritten = 0
}
var success = false
try {
//迭代in-memory collection的排序且可寫入分割槽的Iterator(WritablePartitionedIterator)
while (inMemoryIterator.hasNext) {
// 獲取partitionId
val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId < numPartitions,
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
//用DiskBlockObjectWriter寫入當前迭代的record
inMemoryIterator.writeNext(writer)
//當前迭代的partitionId的record數量加一
elementsPerPartition(partitionId) += 1
//記錄寫入record的數量加一
objectsWritten += 1
//如果寫入record的數量到達閾值,將disk writer的緩衝區內容flush到磁碟
if (objectsWritten == serializerBatchSize) {
flush()
}
}
//迭代完成之後,如果存在record寫入到disk writer的緩衝區,同樣需要flush到磁碟
if (objectsWritten > 0) {
flush()
} else {
writer.revertPartialWritesAndClose()
}
success = true
} finally {
if (success) {
writer.close()
} else {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
}
}
//建立SpilledFile然後返回
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}
ExternalSorter將插入到該sorter的record進行排序並寫入到一個磁碟檔案
writePartitionedFile方法
該方法將插入到ExternalSorter的record寫入到一個磁碟檔案。插入到sorter的record可以是在in-memory collection或者在溢寫檔案。
# 溢寫檔案為空,則記憶體足夠,不需要溢寫結果到磁碟, 返回一個對結果排序的迭代器, 遍歷資料寫入data臨時檔案;再將資料刷到磁碟檔案,返回FileSegment物件;構造一個分割槽檔案長度的陣列
# 溢寫檔案不為空,則需要將溢寫的檔案和記憶體資料合併,合併之後則需要進行歸併排序(merge-sort);資料寫入data臨時檔案,再將資料刷到磁碟檔案,返回FileSegment物件;構造一個分割槽檔案長度的陣列
# 返回分割槽檔案長度的陣列
/**
* Write all the data added into this ExternalSorter into a file in the disk store. This is
* called by the SortShuffleWriter.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
//如果溢寫檔案資訊的陣列為空
if (spills.isEmpty) {
// Case where we only have in-memory data
//則屬於只有in-memory data的情況
//根據是否定義map端的聚合獲取相應的in-memory collection
val collection = if (aggregator.isDefined) map else buffer
//獲取collection的排序且可寫入分割槽的iterator
//iterator迭代的元素型別為((Int, K), V)
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
//迭代元素
while (it.hasNext) {
//獲取當前迭代的partitionId
val partitionId = it.nextPartition()
//二次迭代,迭代當前的partitionId的所有record
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
//將同一個partitionId的所有record資料提交,作為一個block
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
partitionedIterator方法
該方法返回一個能迭代所有插入到ExternalSorter的record的迭代器。這些record已經經過partitionId進行分割槽,並經過aggregator的函式聚合。
該迭代器的泛型型別為Iterator[(Int, Iterator[Product2[K, C]])]。Int型別代表的是partitionId,每個partition都有一個與之對應的iterator迭代器,用以迭代該partition上的record。partition的迭代器之間是按順序訪問的,你不能在未迭代完當前的partition就跳過迭代一個新的partition。
/**
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
* partition without reading the previous one). Guaranteed to return a key-value pair for each
* partition, in order of partition ID.
*
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
* Exposed for testing.
*/
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
// 是否需要本地combine
val usingMap = aggregator.isDefined
// 根據是否需要本地combine獲取相應的in-memory collection
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
//如果沒有發生磁碟溢寫
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
// 而且不需要排序
if (!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
//資料只是按照partitionId排序,並不會對key進行排序
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else { //如果需要排序
// We do need to sort by both partition ID and key
//先按照partitionId排序,然後分割槽內部對key進行排序
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
// 如果發生了溢寫操作,則需要將磁碟上溢寫檔案和in-memory collection的資料進行合併
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
merge方法
合併磁碟上溢寫檔案的資料和in-memory collection的資料。
當存在溢寫檔案時,會呼叫到此方法。該方法返回一個泛型型別為Iterator[(Int, Iterator[Product2[K, C]])]的迭代器,用以迭代所有partition,再迭代每個partition上的所有record。
/**
* Merge a sequence of sorted files, giving an iterator over partitions and then over elements
* inside each partition. This can be used to either write out a new file or return data to
* the user.
*
* Returns an iterator over all the data written to this object, grouped by partition. For each
* partition we then have an iterator over its contents, and these are expected to be accessed
* in order (you can't "skip ahead" to one partition without reading the previous one).
* Guaranteed to return a key-value pair for each partition, in order of partition ID.
*/
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_))
//呼叫buffered方法返回一個BufferedIterator
val inMemBuffered = inMemory.buffered
//迭代partitionId,對每個partitionId使用對映函式映射出新值,並返回這些新值的迭代器
(0 until numPartitions).iterator.map { p =>
//返回給定partitionId相應的所有record的迭代器
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
mergeWithAggregation方法
/**
* Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each
* iterator is sorted by key with a given comparator. If the comparator is not a total ordering
* (e.g. when we sort objects by hash code and different keys may compare as equal although
* they're not), we still merge them by doing equality tests for all keys that compare as equal.
*/
private def mergeWithAggregation(
iterators: Seq[Iterator[Product2[K, C]]],
mergeCombiners: (C, C) => C,
comparator: Comparator[K],
totalOrder: Boolean)
: Iterator[Product2[K, C]] =
{
if (!totalOrder) {
// We only have a partial ordering, e.g. comparing the keys by hash code, which means that
// multiple distinct keys might be treated as equal by the ordering. To deal with this, we
// need to read all keys considered equal by the ordering at once and compare them.
new Iterator[Iterator[Product2[K, C]]] {
val sorted = mergeSort(iterators, comparator).buffered
// Buffers reused across elements to decrease memory allocation
val keys = new ArrayBuffer[K]
val combiners = new ArrayBuffer[C]
override def hasNext: Boolean = sorted.hasNext
override def next(): Iterator[Product2[K, C]] = {
if (!hasNext) {
throw new NoSuchElementException
}
keys.clear()
combiners.clear()
val firstPair = sorted.next()
keys += firstPair._1
combiners += firstPair._2
val key = firstPair._1
while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
val pair = sorted.next()
var i = 0
var foundKey = false
while (i < keys.size && !foundKey) {
if (keys(i) == pair._1) {
combiners(i) = mergeCombiners(combiners(i), pair._2)
foundKey = true
}
i += 1
}
if (!foundKey) {
keys += pair._1
combiners += pair._2
}
}
// Note that we return an iterator of elements since we could've had many keys marked
// equal by the partial order; we flatten this below to get a flat iterator of (K, C).
keys.iterator.zip(combiners.iterator)
}
}.flatMap(i => i)
} else {
// We have a total ordering, so the objects with the same key are sequential.
new Iterator[Product2[K, C]] {
val sorted = mergeSort(iterators, comparator).buffered
override def hasNext: Boolean = sorted.hasNext
override def next(): Product2[K, C] = {
if (!hasNext) {
throw new NoSuchElementException
}
val elem = sorted.next()
val k = elem._1
var c = elem._2
while (sorted.hasNext && sorted.head._1 == k) {
val pair = sorted.next()
c = mergeCombiners(c, pair._2)
}
(k, c)
}
}
}
}
SpillReader
/**
* An internal class for reading a spilled file partition by partition. Expects all the
* partitions to be requested in order.
*/
private[this] class SpillReader(spill: SpilledFile) {
// Serializer batch offsets; size will be batchSize.length + 1
val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)
// Track which partition and which batch stream we're in. These will be the indices of
// the next element we will read. We'll also store the last partition read so that
// readNextPartition() can figure out what partition that was from.
var partitionId = 0
var indexInPartition = 0L
var batchId = 0
var indexInBatch = 0
var lastPartitionId = 0
skipToNextPartition()
// Intermediate file and deserializer streams that read from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
var fileStream: FileInputStream = null
var deserializeStream = nextBatchStream() // Also sets fileStream
var nextItem: (K, C) = null
var finished = false
/** Construct a stream that only reads from the next batch */
def nextBatchStream(): DeserializationStream = {
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
// we're still in a valid batch.
if (batchId < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileStream.close()
deserializeStream = null
fileStream = null
}
val start = batchOffsets(batchId)
fileStream = new FileInputStream(spill.file)
fileStream.getChannel.position(start)
batchId += 1
val end = batchOffsets(batchId)
assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
serInstance.deserializeStream(wrappedStream)
} else {
// No more batches left
cleanup()
null
}
}
/**
* Update partitionId if we have reached the end of our current partition, possibly skipping
* empty partitions on the way.
*/
private def skipToNextPartition() {
while (partitionId < numPartitions &&
indexInPartition == spill.elementsPerPartition(partitionId)) {
partitionId += 1
indexInPartition = 0L
}
}
/**
* Return the next (K, C) pair from the deserialization stream and update partitionId,
* indexInPartition, indexInBatch and such to match its location.
*
* If the current batch is drained, construct a stream for the next batch and read from it.
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
if (finished || deserializeStream == null) {
return null
}
val k = deserializeStream.readKey().asInstanceOf[K]
val c = deserializeStream.readValue().asInstanceOf[C]
lastPartitionId = partitionId
// Start reading the next batch if we're done with this one
indexInBatch += 1
if (indexInBatch == serializerBatchSize) {
indexInBatch = 0
deserializeStream = nextBatchStream()
}
// Update the partition location of the element we're reading
indexInPartition += 1
skipToNextPartition()
// If we've finished reading the last partition, remember that we're done
if (partitionId == numPartitions) {
finished = true
if (deserializeStream != null) {
deserializeStream.close()
}
}
(k, c)
}
var nextPartitionToRead = 0
def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] {
val myPartition = nextPartitionToRead
nextPartitionToRead += 1
override def hasNext: Boolean = {
if (nextItem == null) {
nextItem = readNextItem()
if (nextItem == null) {
return false
}
}
assert(lastPartitionId >= myPartition)
// Check that we're still in the right partition; note that readNextItem will have returned
// null at EOF above so we would've returned false there
lastPartitionId == myPartition
}
override def next(): Product2[K, C] = {
if (!hasNext) {
throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item
}
}
// Clean up our open streams and put us in a state where we can't read any more data
def cleanup() {
batchId = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
if (ds != null) {
ds.close()
}
// NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop().
// This should also be fixed in ExternalAppendOnlyMap.
}
}