1. 程式人生 > >spark 2.3原始碼分析之SortShuffleWriter

spark 2.3原始碼分析之SortShuffleWriter

SortShuffleWriter

概述

SortShuffleWriter它主要是判斷在Map端是否需要本地進行combine操作。如果需要聚合,則使用PartitionedAppendOnlyMap;如果不進行combine操作,則使用PartitionedPairBuffer新增資料存放於記憶體中。然後無論哪一種情況都需要判斷記憶體是否足夠,如果記憶體不夠而且又申請不到記憶體,則需要進行本地磁碟溢寫操作,把相關的資料寫入溢寫到臨時檔案。最後把記憶體裡的資料和磁碟溢寫的臨時檔案的資料進行合併,如果需要則進行一次歸併排序,如果沒有發生溢寫則是不需要歸併排序,因為都在記憶體裡。最後生成合並後的data檔案和index檔案。

write方法

該方法實現如下:

1、建立外部排序器ExternalSorter, 只是根據是否需要本地combine與否從而決定是否傳入aggregator和keyOrdering引數;

2、呼叫ExternalSorter例項的insertAll方法,插入record;

如果ExternalSorter例項中用以儲存record的in-memory collection的大小達到閾值,會將record按順序溢寫到磁碟檔案。

3、 構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;

4、在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名;

5、呼叫ExternalSorter例項的writePartitionedFile方法,將插入到該sorter的record進行排序並寫入輸出檔案;

插入到sorter的record可以是在in-memory collection或者在溢寫檔案。

6、將每個partition的offset寫入index檔案方便reduce端fetch資料;

7、 把部分資訊封裝到MapStatus返回;

 /** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
   /*構造最終的輸出檔案例項,其中檔名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;
   */
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
   //在輸出檔名後加上uuid用於標識檔案正在寫入,結束後重命名
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      //將排序後的record寫入輸出檔案
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      //將每個partition的offset寫入index檔案方便reduce端fetch資料
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }

ExternalSorter

概述

對大量的(k, v)鍵值對進行排序,並且可能合併,從而產生(k, c)型別的key-combiner對。使用一個partitioner將key分組劃分到partition裡,然後使用自定義comparator對每個partition裡的key進行排序。最後,將每個partition中不同位元組範圍的(k, v)鍵值對寫入到一個輸出檔案,以便shuffle fetch。

如果禁用了combining,則型別C必須等於V - 我們將在最後轉換物件型別。

注意:雖然ExternalSorter是一個相當通用的分類器,但它的一些配置是繫結到基於sort的shuffle的使用當中。例如:block compression使用的是"spark.shuffle.compress"。如果是在非shuffle上下文使用ExternalSorter,也許我們應該重新審視這個類,使用不同配置設定。

該類幾個重要的建構函式引數如下:

@param aggregator 可選,aggregator 具有用於合併資料的組合函式
@param partitioner 可選; 如果給定,則按partitionID排序,然後按key
@param ordering 可選;對每個partition內的key進行排序時的順序,是一個總的順序
@param serializer 當溢位到磁碟時使用的serializer

請注意,如果給定了ordering,我們將始終使用它進行排序,所以只有在你確實想要輸出的key被排序時才提供這個引數。在沒有map端聚合的map task中,你可能想傳遞None作為ordering引數來避免意外排序。另一方面,如果你真的想做combining,有一個ordering引數的效率是比沒有的要高的。

使用者應該使用以下方式與這個類互動:

  1. 初始化一個ExternalSorter例項;
  2. 呼叫ExternalSorter例項的insertAll方法,插入一批record;
  3. 呼叫iterator()方法,使用迭代器迭代已經排序完成或者聚合完成的record;或者呼叫writePartitionedFile()方法,在sort shuffle中將已經排序完成或者聚合完成的的record寫入輸出檔案;

這個類的內部工作原理如下:

我們將記憶體上的資料反覆填充到PartitionedAppendOnlyMap(需要按照key合併時),或者PartitionedPairBuffer(不需要按照key合併時),將它們作為buffer。在這些buffer中,我們會按照PartitionId,以及可能按照key,對元素進行排序。為了避免每個key都呼叫partitioner多次,我們在每個record上儲存partitionId。

當每個buffer到達我們的記憶體限制時,我們會將其溢位到檔案中。這個檔案首先按照partitionId進行排序,然後按照key或者key的雜湊值進行排序,如果我們想要做聚合的話。對於每個檔案,我們都會追蹤記憶體中的每個partition的物件的數量,所以我們不需要為每個元素寫上partitionId。

當用戶請求使用迭代器或者檔案輸出時,溢位的檔案會被合併,同時包括記憶體上剩餘的資料。合併時使用的是上面定義的排序順序(除非sorting和aggregation都同時被禁用了)。如果我們需要按照key來聚合,我們要麼使用來自ordering引數的總的排序順序,要麼按照相同雜湊值讀取key值,並且互相比較以合併value值。

期望使用者在最後呼叫stop方法來刪除所有中間檔案。

ExternalSort的父類

Spillable是ExternalSort的父類。同時,Spillable也是MemoryConsumer的子類。

Spillable類用於當記憶體超過閾值時,溢位in-memory collection的內容到磁碟上。
in-memory collection指的是PartitionedAppendOnlyMap或者PartitionPairBuffer資料結構。

成員變數

  • serializerBatchSize:從serializer讀取物件,或將物件寫入serializer時,物件的批處理數量。當物件以批處理方式寫入時,每一批都使用它們自己的serialization stream。這在解序列化一個流時,能減少refrence-tracking map的初始化大小。注意,將這個值設定得過小,會導致在序列化時頻繁複制,因為有些serializer在每次物件數量翻倍時,增長內部資料結構是靠growing + copying。
  • PartitionedAppendOnlyMap和partitionedPairBuffer:in-memory collection,在spill之前在記憶體上儲存record的資料結構。根據是否需要聚合來決定將物件放到AppendOnlyMap還是PartitionedPairBuffer中。如果需要map端的聚合,使用PartitionedOnlyMap,否則使用partitionPairBuffer。
  • keyComparator:key值的比較器,用以將一個partition內的key進行排序,從而允許聚合或者排序。如果ordering引數沒有提供這個comparator,可以使用預設的comparator通過hashcode進行部分排序。部分排序意味著相等的key具有comparator.compare(k,k)= 0,但有些不相等的key也有這個,所以我們需要做一個稍後的傳遞來找到真正相等的key。ps:equals()方法相等的key,它的hashCode()方法一定相等;hashCode()方法相等的key,equals()方法不一定相等。所以通過比較hashCode只能實現部分排序。
  • spills:當in-memory collection的大小達到閾值,會將collection上的record按順序溢位到磁碟檔案。用該ArrayBuffer[SpilledFile]例項儲存溢寫檔案的相關資訊。
 // Size of object batches when reading/writing from serializers.
  //
  // Objects are written in batches, with each batch using its own serialization stream. This
  // cuts down on the size of reference-tracking maps constructed when deserializing a stream.
  //
  // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers
  // grow internal data structures by growing + copying every time the number of objects doubles.
  private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)

  // Data structures to store in-memory objects before we spill. Depending on whether we have an
  // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
  // store them in an array buffer.
  @volatile private var map = new PartitionedAppendOnlyMap[K, C]
  @volatile private var buffer = new PartitionedPairBuffer[K, C]

 // A comparator for keys K that orders them within a partition to allow aggregation or sorting.
  // Can be a partial ordering by hash code if a total ordering is not provided through by the
  // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
  // non-equal keys also have this, so we need to do a later pass to find truly equal keys).
  // Note that we ignore this if no aggregator and no ordering are given.
  private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {
    override def compare(a: K, b: K): Int = {
      val h1 = if (a == null) 0 else a.hashCode()
      val h2 = if (b == null) 0 else b.hashCode()
      if (h1 < h2) -1 else if (h1 == h2) 0 else 1
    }
  })

 // Information about a spilled file. Includes sizes in bytes of "batches" written by the
  // serializer as we periodically reset its stream, as well as number of elements in each
  // partition, used to efficiently keep track of partitions when merging.
  private[this] case class SpilledFile(
    file: File,
    blockId: BlockId,
    serializerBatchSizes: Array[Long],
    elementsPerPartition: Array[Long])

  private val spills = new ArrayBuffer[SpilledFile]

注意,如果aggregator和ordering引數都沒有給定,則我們忽略keyComparator。 

comparator方法返回ordering引數指定的comparator——也就是成員變數keyComparator;如果沒有定義ordering引數,comparator方法返回None

private def comparator: Option[Comparator[K]] = {
    if (ordering.isDefined || aggregator.isDefined) {
      Some(keyComparator)
    } else {
      None
    }
  }

ExternlSorter插入record

 insertAll方法

該方法實現如下:

1、如果需要map端的聚合:

      獲取aggregator的mergeValue函式和createCombiner函式,並以此建立update函式。update函式的作用是,如果有值進行mergeValue,如果沒有則createCombiner。

      迭代record,計算record的分割槽,並呼叫PartitionedAppendOnlyMap#changeValue方法,執行update函式。

      最後,呼叫maybeSpillCollection方法判斷需要溢位資料到磁碟。

2、如果不需要map端的聚合:

     迭代record,計算record的分割槽,並呼叫PartitionedPairBuffer#insert方法插入buffer。

     最後,呼叫maybeSpillCollection方法判斷需要溢位資料到磁碟。

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      // 使用AppendOnlyMap優先在記憶體中進行combine
      // 獲取aggregator的mergeValue函式,用於merge新的值到聚合記錄
      val mergeValue = aggregator.get.mergeValue
      // 獲取aggregator的createCombiner函式,用於建立聚合的初始值
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      //建立update函式,如果有值進行mergeValue,如果沒有則createCombiner
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        //處理一個元素,就更新一次結果
        addElementsRead()
        //取出一個(key,value)
        kv = records.next()
        // 對key計算分割槽,然後開始進行merge
        map.changeValue((getPartition(kv._1), kv._1), update)
        // 如果需要溢寫記憶體資料到磁碟
        maybeSpillCollection(usingMap = true)
      }
    } else { // 不需要進行本地combine
      // Stick values into our buffer
      while (records.hasNext) {
        //處理一個元素,就更新一次結果
        addElementsRead()
        // 取出一個(key,value)
        val kv = records.next()
        // 往PartitionedPairBuffer新增資料
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        // 如果需要溢寫記憶體資料到磁碟
        maybeSpillCollection(usingMap = false)
      }
    }
  }

maybeSpillCollection方法

該方法實現如下:

1、如果需要map端的聚合:

   估計map的大小,根據預估的map大小決定是否需要進行spill。如果需要spill,在spill之後,初始化一個新的PartitionedAppendOnlyMap。

2、如果不需要map端的聚合:

      估計buffer的大小,根據預估的buffer大小決定是否需要進行spill。如果需要spill,spill之後,初始化一個新的PartitionedPairBuffer。

/**
   * Spill the current in-memory collection to disk if needed.
   *
   * @param usingMap whether we're using a map or buffer as our current in-memory collection
   */
  private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {  //如果使用PartitionedAppendOnlyMap
      //估計map的大小
      estimatedSize = map.estimateSize()
      //根據預估的map大小決定是否需要進行spill
      if (maybeSpill(map, estimatedSize)) {
        //spill之後,初始化一個新的PartitionedAppendOnlyMap
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else { //如果使用PartitionedPairBuffer
      //估計buffer的大小
      estimatedSize = buffer.estimateSize()
      //呼叫父類Spillable的maybeSpill方法,根據預估的buffer大小決定是否需要進行spill
      if (maybeSpill(buffer, estimatedSize)) {
        //spill之後,初始化一個新的PartitionedPairBuffer
        buffer = new PartitionedPairBuffer[K, C]
      }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }

maybeSpill方法

maybeSpillCollection方法會呼叫父類Spillable的maybeSpill方法。

該方法根據預估的buffer大小決定是否需要進行spill,如果需要spill則呼叫spill方法進行spill。

該方法實現如下:

如果讀取的資料是32的倍數,而且當前記憶體大於記憶體閥值,預設是5M
會先嚐試向TaskMemoryManager申請(2 * currentMemory - myMemoryThreshold)大小的記憶體
如果能夠申請到,則不進行Spill操作,而是繼續向Buffer中儲存資料,
否則就會呼叫spill()方法將Buffer中資料輸出到磁碟檔案

/**
   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
   * memory before spilling.
   *
   * @param collection collection to spill to disk
   * @param currentMemory estimated size of the collection in bytes
   * @return true if `collection` was spilled to disk; false otherwise
   */
  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      //底層呼叫TaskMemoryManager的acquireExecutionMemory方法分配記憶體
      val granted = acquireMemory(amountToRequest)
     // 更新現在記憶體閥值
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      //再次判斷當前記憶體是否大於閥值,如果還是大於閥值則spill
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      //開始spill
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

spill方法

 該方法將in-memory collection上的內容按照比較器的順序溢位到磁碟檔案。當在in-memory collection的大小達到閾值時被呼叫。

該方法實現如下:

1、獲取比較器。comparator方法返回ordering引數指定的comparator——也就是成員變數keyComparator;如果沒有定義ordering引數,comparator方法返回None。

2、獲取根據比較器排序後的的in-memory collection的迭代器。

3、溢寫in-memory collection的資料到磁碟一個臨時檔案。

4、 更新溢寫的臨時磁碟檔案。

 /**
   * Spill our in-memory collection to a sorted file that we can merge later.
   * We add this file into `spilledFiles` to find it later.
   *
   * @param collection whichever collection we're using (map or buffer)
   */
  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    //返回一個根據指定的比較器排序的迭代器
   //comparator方法返回ordering引數指定的comparator——也就是成員變數keyComparator,
   //如果沒有定義ordering引數,comparator方法返回null.
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
   // 溢寫in-memory collection的資料到磁碟一個臨時檔案
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
   // 更新溢寫的臨時磁碟檔案
    spills += spillFile
  }

 spillMemoryIteratorToDisk方法

1、建立臨時檔案

2、建立一個DiskBlockObjectWriter用於寫臨時檔案

3、迭代in-memory collection的 inMemoryIterator,用DiskBlockObjectWriter寫入當前迭代的record。如果寫入record的數量到達閾值,將disk writer的緩衝區內容flush到磁碟。

/**
   * Spill contents of in-memory iterator to a temporary file on disk.
   */
  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
      : SpilledFile = {
    // Because these files may be read during shuffle, their compression must be controlled by
    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
    // createTempShuffleBlock here; see SPARK-3426 for more context.
   //建立臨時檔案
    val (blockId, file) = diskBlockManager.createTempShuffleBlock()

    // These variables are reset after each flush
    var objectsWritten: Long = 0
    val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
    //建立一個DiskBlockObjectWriter用於寫臨時檔案
    val writer: DiskBlockObjectWriter =
      blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

    // List of batch sizes (bytes) in the order they are written to disk
    val batchSizes = new ArrayBuffer[Long]

    // How many elements we have in each partition
    val elementsPerPartition = new Array[Long](numPartitions)

    // Flush the disk writer's contents to disk, and update relevant variables.
    // The writer is committed at the end of this process.
   //將disk writr的緩衝區內容flush到磁碟,並更新相關變數
    def flush(): Unit = {
      val segment = writer.commitAndGet()
      batchSizes += segment.length
      _diskBytesSpilled += segment.length
      objectsWritten = 0
    }

    var success = false
    try {
      //迭代in-memory collection的排序且可寫入分割槽的Iterator(WritablePartitionedIterator)
      while (inMemoryIterator.hasNext) {
       // 獲取partitionId
        val partitionId = inMemoryIterator.nextPartition()
        require(partitionId >= 0 && partitionId < numPartitions,
          s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
       //用DiskBlockObjectWriter寫入當前迭代的record
        inMemoryIterator.writeNext(writer)
       //當前迭代的partitionId的record數量加一
        elementsPerPartition(partitionId) += 1
       //記錄寫入record的數量加一
        objectsWritten += 1

       //如果寫入record的數量到達閾值,將disk writer的緩衝區內容flush到磁碟
        if (objectsWritten == serializerBatchSize) {
          flush()
        }
      }
     //迭代完成之後,如果存在record寫入到disk writer的緩衝區,同樣需要flush到磁碟
      if (objectsWritten > 0) {
        flush()
      } else {
        writer.revertPartialWritesAndClose()
      }
      success = true
    } finally {
      if (success) {
        writer.close()
      } else {
        // This code path only happens if an exception was thrown above before we set success;
        // close our stuff and let the exception be thrown further
        writer.revertPartialWritesAndClose()
        if (file.exists()) {
          if (!file.delete()) {
            logWarning(s"Error deleting ${file}")
          }
        }
      }
    }
   
   //建立SpilledFile然後返回
    SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
  }

ExternalSorter將插入到該sorter的record進行排序並寫入到一個磁碟檔案

writePartitionedFile方法

該方法將插入到ExternalSorter的record寫入到一個磁碟檔案。插入到sorter的record可以是在in-memory collection或者在溢寫檔案。

# 溢寫檔案為空,則記憶體足夠,不需要溢寫結果到磁碟, 返回一個對結果排序的迭代器, 遍歷資料寫入data臨時檔案;再將資料刷到磁碟檔案,返回FileSegment物件;構造一個分割槽檔案長度的陣列

# 溢寫檔案不為空,則需要將溢寫的檔案和記憶體資料合併,合併之後則需要進行歸併排序(merge-sort);資料寫入data臨時檔案,再將資料刷到磁碟檔案,返回FileSegment物件;構造一個分割槽檔案長度的陣列

# 返回分割槽檔案長度的陣列

/**
   * Write all the data added into this ExternalSorter into a file in the disk store. This is
   * called by the SortShuffleWriter.
   *
   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
   */
  def writePartitionedFile(
      blockId: BlockId,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
      context.taskMetrics().shuffleWriteMetrics)

    //如果溢寫檔案資訊的陣列為空
    if (spills.isEmpty) {
      // Case where we only have in-memory data
      //則屬於只有in-memory data的情況
      //根據是否定義map端的聚合獲取相應的in-memory collection
      val collection = if (aggregator.isDefined) map else buffer
      //獲取collection的排序且可寫入分割槽的iterator
      //iterator迭代的元素型別為((Int, K), V)
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      //迭代元素
      while (it.hasNext) {
        //獲取當前迭代的partitionId
        val partitionId = it.nextPartition()
        //二次迭代,迭代當前的partitionId的所有record
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer)
        }
        //將同一個partitionId的所有record資料提交,作為一個block
        val segment = writer.commitAndGet()
        lengths(partitionId) = segment.length
      }
    } else {
      // We must perform merge-sort; get an iterator by partition and write everything directly.
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          val segment = writer.commitAndGet()
          lengths(id) = segment.length
        }
      }
    }

    writer.close()
    context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)

    lengths
  }

partitionedIterator方法

該方法返回一個能迭代所有插入到ExternalSorter的record的迭代器。這些record已經經過partitionId進行分割槽,並經過aggregator的函式聚合。

該迭代器的泛型型別為Iterator[(Int, Iterator[Product2[K, C]])]。Int型別代表的是partitionId,每個partition都有一個與之對應的iterator迭代器,用以迭代該partition上的record。partition的迭代器之間是按順序訪問的,你不能在未迭代完當前的partition就跳過迭代一個新的partition。

/**
   * Return an iterator over all the data written to this object, grouped by partition and
   * aggregated by the requested aggregator. For each partition we then have an iterator over its
   * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
   * partition without reading the previous one). Guaranteed to return a key-value pair for each
   * partition, in order of partition ID.
   *
   * For now, we just merge all the spilled files in once pass, but this can be modified to
   * support hierarchical merging.
   * Exposed for testing.
   */
  def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
   // 是否需要本地combine
    val usingMap = aggregator.isDefined
   // 根據是否需要本地combine獲取相應的in-memory collection
    val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
   //如果沒有發生磁碟溢寫
    if (spills.isEmpty) {
      // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
      // we don't even need to sort by anything other than partition ID
      // 而且不需要排序
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition ID, not key
        //資料只是按照partitionId排序,並不會對key進行排序
        groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      } else { //如果需要排序
        // We do need to sort by both partition ID and key
       //先按照partitionId排序,然後分割槽內部對key進行排序
        groupByPartition(destructiveIterator(
          collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
      }
    } else {
      // Merge spilled and in-memory data
      // 如果發生了溢寫操作,則需要將磁碟上溢寫檔案和in-memory collection的資料進行合併
      merge(spills, destructiveIterator(
        collection.partitionedDestructiveSortedIterator(comparator)))
    }
  }

 merge方法

合併磁碟上溢寫檔案的資料和in-memory collection的資料。

當存在溢寫檔案時,會呼叫到此方法。該方法返回一個泛型型別為Iterator[(Int, Iterator[Product2[K, C]])]的迭代器,用以迭代所有partition,再迭代每個partition上的所有record。

/**
   * Merge a sequence of sorted files, giving an iterator over partitions and then over elements
   * inside each partition. This can be used to either write out a new file or return data to
   * the user.
   *
   * Returns an iterator over all the data written to this object, grouped by partition. For each
   * partition we then have an iterator over its contents, and these are expected to be accessed
   * in order (you can't "skip ahead" to one partition without reading the previous one).
   * Guaranteed to return a key-value pair for each partition, in order of partition ID.
   */
  private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))
  //呼叫buffered方法返回一個BufferedIterator
    val inMemBuffered = inMemory.buffered
   //迭代partitionId,對每個partitionId使用對映函式映射出新值,並返回這些新值的迭代器
    (0 until numPartitions).iterator.map { p =>
      //返回給定partitionId相應的所有record的迭代器
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        // Perform partial aggregation across partitions
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

 mergeWithAggregation方法

  /**
   * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each
   * iterator is sorted by key with a given comparator. If the comparator is not a total ordering
   * (e.g. when we sort objects by hash code and different keys may compare as equal although
   * they're not), we still merge them by doing equality tests for all keys that compare as equal.
   */
  private def mergeWithAggregation(
      iterators: Seq[Iterator[Product2[K, C]]],
      mergeCombiners: (C, C) => C,
      comparator: Comparator[K],
      totalOrder: Boolean)
      : Iterator[Product2[K, C]] =
  {
    if (!totalOrder) {
      // We only have a partial ordering, e.g. comparing the keys by hash code, which means that
      // multiple distinct keys might be treated as equal by the ordering. To deal with this, we
      // need to read all keys considered equal by the ordering at once and compare them.
      new Iterator[Iterator[Product2[K, C]]] {
        val sorted = mergeSort(iterators, comparator).buffered

        // Buffers reused across elements to decrease memory allocation
        val keys = new ArrayBuffer[K]
        val combiners = new ArrayBuffer[C]

        override def hasNext: Boolean = sorted.hasNext

        override def next(): Iterator[Product2[K, C]] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          keys.clear()
          combiners.clear()
          val firstPair = sorted.next()
          keys += firstPair._1
          combiners += firstPair._2
          val key = firstPair._1
          while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
            val pair = sorted.next()
            var i = 0
            var foundKey = false
            while (i < keys.size && !foundKey) {
              if (keys(i) == pair._1) {
                combiners(i) = mergeCombiners(combiners(i), pair._2)
                foundKey = true
              }
              i += 1
            }
            if (!foundKey) {
              keys += pair._1
              combiners += pair._2
            }
          }

          // Note that we return an iterator of elements since we could've had many keys marked
          // equal by the partial order; we flatten this below to get a flat iterator of (K, C).
          keys.iterator.zip(combiners.iterator)
        }
      }.flatMap(i => i)
    } else {
      // We have a total ordering, so the objects with the same key are sequential.
      new Iterator[Product2[K, C]] {
        val sorted = mergeSort(iterators, comparator).buffered

        override def hasNext: Boolean = sorted.hasNext

        override def next(): Product2[K, C] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          val elem = sorted.next()
          val k = elem._1
          var c = elem._2
          while (sorted.hasNext && sorted.head._1 == k) {
            val pair = sorted.next()
            c = mergeCombiners(c, pair._2)
          }
          (k, c)
        }
      }
    }
  }

 SpillReader

  /**
   * An internal class for reading a spilled file partition by partition. Expects all the
   * partitions to be requested in order.
   */
  private[this] class SpillReader(spill: SpilledFile) {
    // Serializer batch offsets; size will be batchSize.length + 1
    val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)

    // Track which partition and which batch stream we're in. These will be the indices of
    // the next element we will read. We'll also store the last partition read so that
    // readNextPartition() can figure out what partition that was from.
    var partitionId = 0
    var indexInPartition = 0L
    var batchId = 0
    var indexInBatch = 0
    var lastPartitionId = 0

    skipToNextPartition()

    // Intermediate file and deserializer streams that read from exactly one batch
    // This guards against pre-fetching and other arbitrary behavior of higher level streams
    var fileStream: FileInputStream = null
    var deserializeStream = nextBatchStream()  // Also sets fileStream

    var nextItem: (K, C) = null
    var finished = false

    /** Construct a stream that only reads from the next batch */
    def nextBatchStream(): DeserializationStream = {
      // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
      // we're still in a valid batch.
      if (batchId < batchOffsets.length - 1) {
        if (deserializeStream != null) {
          deserializeStream.close()
          fileStream.close()
          deserializeStream = null
          fileStream = null
        }

        val start = batchOffsets(batchId)
        fileStream = new FileInputStream(spill.file)
        fileStream.getChannel.position(start)
        batchId += 1

        val end = batchOffsets(batchId)

        assert(end >= start, "start = " + start + ", end = " + end +
          ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))

        val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))

        val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
        serInstance.deserializeStream(wrappedStream)
      } else {
        // No more batches left
        cleanup()
        null
      }
    }

    /**
     * Update partitionId if we have reached the end of our current partition, possibly skipping
     * empty partitions on the way.
     */
    private def skipToNextPartition() {
      while (partitionId < numPartitions &&
          indexInPartition == spill.elementsPerPartition(partitionId)) {
        partitionId += 1
        indexInPartition = 0L
      }
    }

    /**
     * Return the next (K, C) pair from the deserialization stream and update partitionId,
     * indexInPartition, indexInBatch and such to match its location.
     *
     * If the current batch is drained, construct a stream for the next batch and read from it.
     * If no more pairs are left, return null.
     */
    private def readNextItem(): (K, C) = {
      if (finished || deserializeStream == null) {
        return null
      }
      val k = deserializeStream.readKey().asInstanceOf[K]
      val c = deserializeStream.readValue().asInstanceOf[C]
      lastPartitionId = partitionId
      // Start reading the next batch if we're done with this one
      indexInBatch += 1
      if (indexInBatch == serializerBatchSize) {
        indexInBatch = 0
        deserializeStream = nextBatchStream()
      }
      // Update the partition location of the element we're reading
      indexInPartition += 1
      skipToNextPartition()
      // If we've finished reading the last partition, remember that we're done
      if (partitionId == numPartitions) {
        finished = true
        if (deserializeStream != null) {
          deserializeStream.close()
        }
      }
      (k, c)
    }

    var nextPartitionToRead = 0

    def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] {
      val myPartition = nextPartitionToRead
      nextPartitionToRead += 1

      override def hasNext: Boolean = {
        if (nextItem == null) {
          nextItem = readNextItem()
          if (nextItem == null) {
            return false
          }
        }
        assert(lastPartitionId >= myPartition)
        // Check that we're still in the right partition; note that readNextItem will have returned
        // null at EOF above so we would've returned false there
        lastPartitionId == myPartition
      }

      override def next(): Product2[K, C] = {
        if (!hasNext) {
          throw new NoSuchElementException
        }
        val item = nextItem
        nextItem = null
        item
      }
    }

    // Clean up our open streams and put us in a state where we can't read any more data
    def cleanup() {
      batchId = batchOffsets.length  // Prevent reading any other batch
      val ds = deserializeStream
      deserializeStream = null
      fileStream = null
      if (ds != null) {
        ds.close()
      }
      // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop().
      // This should also be fixed in ExternalAppendOnlyMap.
    }
  }