1. 程式人生 > >spark原始碼分析之ShuffleExternalSorter

spark原始碼分析之ShuffleExternalSorter

概述

ShuffleExternalSorter是專門用於sort-based shuffle的external sorter。

傳入的record會被追加到data page。當所有的record都已經插入該sorter時,或者當前執行緒的shuffle memory已經到達閾值時,會使用ShuffleInMemorySorter來根據record的partition id將record排序。排序後的record然後會被寫入一個輸出檔案(或者多個檔案,如果我們已經spill),輸出檔案的格式,與最終SortShuffleSorter寫入的輸出檔案的格式是一致的。SortShuffleSorter會將每個輸出partition的record寫入一個序列化的、壓縮的流,可以通過一個解壓縮的、反序列化的流讀取。

與org.apache.spark.util.collection.ExternalSorter不同,這個sorter不會合並spill file。相反,合併操作交給了UnsafeShuffleWriter執行,UnsafeShuffleWriter會使用專門的合併方法來避免額外的序列化和反序列化。

原始碼分析

成員變數

ShuffleExternalSorter有以下幾個比較重要的成員變數:

  • allocatedPages是用來儲存待排序的record的page(MemoryBlock)連結串列。當發生spill後,連結串列中page就會被釋放;
  • spills是將排序後的record寫入磁碟檔案作為spill file後,這些spill file的元資料資訊。
  • inMemSorter是用來根據record的partition id將record排序的ShuffleInMemorySorter。
  • currentPage,page連結串列中用於儲存待排序的record的當前page。一個page會儲存多個record,record通過追加的方式新增到page。
  • pageCursor,record追加到當前page時,當前page的可用空間的起始地址(或下標)。
/**
   * Memory pages that hold the records being sorted. The pages in this list are freed when
   * spilling, although in principle we could recycle these pages across spills (on the other hand,
   * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
   * itself).
   */
   //用page連結串列儲存待排序的record。當發生spill時,連結串列中的page會被釋放,雖然原則上我們可以在整個spill期間
   //迴圈利用這些page(另一方面,這可能是不必要的如果我們在TaskMemoryManager中儲存了一個可重複使用的page pool)
  private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();

  private final LinkedList<SpillInfo> spills = new LinkedList<>();

  // These variables are reset after spilling:
  //spill之後會重置這些變數
  @Nullable private ShuffleInMemorySorter inMemSorter;
  @Nullable private MemoryBlock currentPage = null;
  private long pageCursor = -1;

 其中,inMemsorter的初始化如下:

它會從sparkConf中獲取配置資訊判斷是否使用RadixSort對record進行排序,預設為true。

 this.inMemSorter = new ShuffleInMemorySorter(
      this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));

insertRecord方法

插入一條record到shuffle external sorter。即追加一條record到當前page。record在當前page中的儲存格式為:

record length | (k, v)

同時它也會插入一條record到ShuffleInMemorySorter,在inMemorySorter中儲存record的編碼地址和partitionId。record在inMemorySorter的儲存格式為:

partitionId | pageNumber | offset in page

ShuffleExternalSorter 使用MemoryBlock儲存資料,每條記錄包括長度資訊和K-V Pair
ShuffleInMemorySorter 使用long陣列儲存每條記錄對應的位置資訊(page number + offset),以及其對應的PartitionId,共8 bytes
/**
   * Write a record to the shuffle sorter.
   */
   /*
     @param recordBase 儲存一個record的(k, v)鍵值對的位元組陣列
	 @param recordOffset 位元組陣列型別中第一個元素相對陣列的偏移
	 @param length 該位元組陣列的大小
	 @partitionId 該record對應的partitionId
   */
  public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
    throws IOException {

    // for tests
    assert(inMemSorter != null);
    if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
      logger.info("Spilling data because number of spilledRecords crossed the threshold " +
        numElementsForSpillThreshold);
      spill();
    }

    growPointerArrayIfNecessary();
    // Need 4 bytes to store the record length.
    final int required = length + 4;
    //判斷是否需要分配新的page,如果當前page的剩餘可用空間大於required個位元組,則不需要
    acquireNewPageIfNecessary(required);

    assert(currentPage != null);
    final Object base = currentPage.getBaseObject();
    //將record所在的當前page和在其page中的相對偏移,組裝成編碼地址
    final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
    //在當前page中儲存record的長度
	Platform.putInt(base, pageCursor, length);
	//record的長度為int型別,所以pageCursor加4
    pageCursor += 4;
	//在當前page中儲存record的(k,v)鍵值對
    Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
    pageCursor += length;
	//插入一條record到inMemSorter,在inMemSorter儲存record的編碼地址和partitionId
    inMemSorter.insertRecord(recordAddress, partitionId);
  }

closeAndGetSpills方法

將record進行排序並寫入到磁碟檔案中。完成後釋放相關記憶體。

/**
   * Close the sorter, causing any buffered data to be sorted and written out to disk.
   *
   * @return metadata for the spill files written by this sorter. If no records were ever inserted
   *         into this sorter, then this will return an empty array.
   * @throws IOException
   */
  public SpillInfo[] closeAndGetSpills() throws IOException {
    if (inMemSorter != null) {
      // Do not count the final file towards the spill count.
      writeSortedFile(true);
      freeMemory();
      inMemSorter.free();
      inMemSorter = null;
    }
    return spills.toArray(new SpillInfo[spills.size()]);
  }

writeSortedFile方法

將記憶體中的record進行排序,然後將排序後的record寫入一個磁碟檔案。

該方法實現如下:

1、呼叫inMemsorter的getSortedIterator方法對record進行排序,返回排序完成後的record pointer的迭代器。以使用RadixSort為例,內部會呼叫RadixSort.sort方法執行真實的排序。

2、建立一個spill file檔案,以及建立一個SpillInfo用以儲存該spill file的元資料資訊。

3、建立一個DiskBlockObjectWriter用於寫該spill file。

4、record pointer迭代器迭代record pointer,對record pointer進行解碼,然後將record寫入spill file:

  •  呼叫TaskMemoryManager#getPage方法將record的編碼地址——recordPointer解碼為record的所在page的baseObject。如果是堆內記憶體分配,返回值是long array的引用;如果是堆外記憶體分配,返回值為null。
  •  呼叫TaskMemoryManager#getOffsetInPage方法將record的編碼地址——recordPointer解碼為record相對所在page的偏移。該偏移是unsafe雙註冊定址模式下的offset,如果是堆內記憶體分配,返回值是long array陣列型別中元素相對陣列的偏移;如果是堆外記憶體分配,返回值是記憶體塊的絕對地址。
  •  呼叫platform的getInt方法,其底層呼叫unsafe.getInt(Object o, long offset)方法,從給定物件的給定offset處獲取一個int型別欄位值(int型別確定了要獲取的位元組數為4)。前面說過,record在插入page時首先儲存的是record length, 所以這裡返回record的長度。
  • 獲取record的(k, v)鍵值對相對於所在page的偏移。
  • 將record的(k, v)鍵值對儲存到writeBuffer。
  • 將writeBuffer寫入到spill file

5、將writer關閉,將本次spill file的元資料資訊-spillInfo新增到spills。

/**
   * Sorts the in-memory records and writes the sorted records to an on-disk file.
   * This method does not free the sort data structures.
   *
   * @param isLastFile if true, this indicates that we're writing the final output file and that the
   *                   bytes written should be counted towards shuffle spill metrics rather than
   *                   shuffle write metrics.
   */
  private void writeSortedFile(boolean isLastFile) {

    final ShuffleWriteMetrics writeMetricsToUse;

    if (isLastFile) {
      // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
      writeMetricsToUse = writeMetrics;
    } else {
      // We're spilling, so bytes written should be counted towards spill rather than write.
      // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
      // them towards shuffle bytes written.
      writeMetricsToUse = new ShuffleWriteMetrics();
    }

    // This call performs the actual sort.
    //該呼叫對record進行排序,返回排序好的record的pointer的迭代器
    //以使用RadixSort為例,內部會呼叫RadixSort.sort方法執行真實的排序
    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
      inMemSorter.getSortedIterator();

    // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
    // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
    // data through a byte array. This array does not need to be large enough to hold a single
    // record;
    //對DiskBlockObjectWriter執行很小的寫操作是相當低效率的,既然沒有API可以直接將位元組從記憶體轉移到disk Writer,
    //那我們可以通過一個byte array來作為資料緩衝區。這個byte array不需要大到足以容納一個record
    final byte[] writeBuffer = new byte[diskWriteBufferSize];

    // Because this output will be read during shuffle, its compression codec must be controlled by
    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
    // createTempShuffleBlock here; see SPARK-3426 for more details.
    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
      blockManager.diskBlockManager().createTempShuffleBlock();
    //建立一個臨時的spill file檔案
    final File file = spilledFileInfo._2();
    final TempShuffleBlockId blockId = spilledFileInfo._1();
    //建立該spill file的元資料資訊
    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

    // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
    // Our write path doesn't actually use this serializer (since we end up calling the `write()`
    // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
    // around this, we pass a dummy no-op serializer.
    final SerializerInstance ser = DummySerializerInstance.INSTANCE;

    //建立一個DiskBlockObjectWriter用於寫該spill file
    final DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);

    int currentPartition = -1;
    //排序後的record pointer迭代器迭代record 
    while (sortedRecords.hasNext()) {
      sortedRecords.loadNext();
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      assert (partition >= currentPartition);
      if (partition != currentPartition) {
        // Switch to the new partition
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        }
        currentPartition = partition;
      }
      //獲取本次迭代的record pointer
      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      //將record的編碼地址解碼為record的所在page的baseObject
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
    //將record的編碼地址解碼為record相對於所在page的偏移
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
  //底層呼叫unsafe.getInt(Object o, long offset)方法,從給定物件的給定offset處獲取一個欄位值
  //前面說過,record在插入page時首先儲存的是record length, 所以這裡返回record的長度。
      int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
      //獲取record的(k, v)鍵值對相對於所在page的偏移
      long recordReadPosition = recordOffsetInPage + 4; // skip over record length
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
        //將record的(k, v)鍵值對儲存到writeBuffer
        Platform.copyMemory(
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        //將writeBuffer寫入到spill file
        writer.write(writeBuffer, 0, toTransfer);
        recordReadPosition += toTransfer;
        dataRemaining -= toTransfer;
      }
      writer.recordWritten();
    }

    final FileSegment committedSegment = writer.commitAndGet();
    writer.close();
    // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
    // then the file might be empty. Note that it might be better to avoid calling
    // writeSortedFile() in that case.
    if (currentPartition != -1) {
      spillInfo.partitionLengths[currentPartition] = committedSegment.length();
      spills.add(spillInfo);
    }

    if (!isLastFile) {  // i.e. this is a spill file
      // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
      // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
      // relies on its `recordWritten()` method being called in order to trigger periodic updates to
      // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
      // counter at a higher-level, then the in-progress metrics for records written and bytes
      // written would get out of sync.
      //
      // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
      // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
      // metrics to the true write metrics here. The reason for performing this copying is so that
      // we can avoid reporting spilled bytes as shuffle write bytes.
      //
      // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
      // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
      // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
      writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
      taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
    }
  }

freeMemory方法

釋放用來儲存待排序的record的page連結串列——allocatedPages;

重置currentPage、pageCursor這些狀態變數;

private long freeMemory() {
    updatePeakMemoryUsed();
    long memoryFreed = 0;
    for (MemoryBlock block : allocatedPages) {
      memoryFreed += block.size();
      freePage(block);
    }
    allocatedPages.clear();
    currentPage = null;
    pageCursor = 0;
    return memoryFreed;
  }