1. 程式人生 > >ShuffleExternalSorter 外部排序器在Shuffle過程中的設計思路剖析-Spark商業環境實戰

ShuffleExternalSorter 外部排序器在Shuffle過程中的設計思路剖析-Spark商業環境實戰

Spark商業環境實戰及調優進階系列

1 ShuffleExternalSorter 外部排序器

1.1 ShuffleExternalSorter 外部排序器江湖地位

ShuffleExternalSorter和 ExternalSorter 外部排序器功能類似,但是也有不同的地方。不過在詳細剖析ShuffleExternalSorter之前,我們先看看ShuffleExternalSorter在下圖中所處的位置。可以看到最終的呼叫方是unsafeShuffleWriter。在下一節,我會詳細剖析UnsafeShuffleWriter。

1.2 ShuffleExternalSorter 外部排序器與眾不同的特色

相同點:

  • ShuffleExternalSorter與ExternalSorter都是將記錄插入到記憶體中。

不同點:

  • ExternalSorter除了將資料存入記憶體中,還會進行聚合操作,ShuffleExternalSorter沒有聚合功能。
  • ShuffleExternalSorter使用的是Tungsten快取(因此可能是JVM堆快取,也可能是作業系統的記憶體)
  • 溢位前排序操作:ExternalSorter是按照分割槽ID和key進行排序實現,ShuffleExternalSorter除了按照分割槽ID的排序外,也有基於基數排序(Radix Sort)的實現。
  • ShuffleExternalSorter沒有了applendOnlyMapz這種資料結構。

1.2 ShuffleExternalSorter 主要成員

  • ShuffleInMemorySorter :用於在記憶體中對插入的記錄進行排序,演算法還是TimSort。

  • spills :溢位檔案的元資料資訊列表。

  • numElementsForSpillThreshold :磁碟溢位的元素數量。可以通過spark.shuffle.spill.numElementsForceSpillThreshold屬性來進行配置,預設是1M

  • taskMemoryManager:

  • allocatedPages:已經分配的Page列表(即MemoryBlock)列表

     * Memory pages that hold the records being sorted. The pages in this list are freed when
     * spilling, although in principle we could recycle these pages across spills (on the other hand,
     * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
     * itself)。
    

1.3 ShuffleExternalSorter insertRecord 程式碼欣賞

  • 資料溢位,通過inMemSorter.numRecords() >= numElementsForSpillThreshold來判斷,若滿足直接溢位操作。

  • growPointerArrayIfNecessary:進行空間檢查和資料容量擴容。

  • acquireNewPageIfNecessary:進行空間檢查,若不滿足申請新page。

  • Platform.copyMemory:將資料拷貝到Page所代表的的記憶體塊中。

  • inMemSorter.insertRecord:將記錄的元資料存到內部的長整型陣列中,便於排序。其中高24位是儲存分割槽ID,中間13位為儲存頁號,低27位儲存偏移量。

      Write a record to the shuffle sorter.
      
      public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
          throws IOException {
          // for tests
          assert(inMemSorter != null);
          
          if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {    <= 神來之筆
          
            logger.info("Spilling data because number of spilledRecords crossed the threshold " +
              numElementsForSpillThreshold);
            spill();
          }
      
          growPointerArrayIfNecessary();                                  <= 神來之筆
          // Need 4 bytes to store the record length.
          final int required = length + 4;
          
          acquireNewPageIfNecessary(required);
      
          assert(currentPage != null);
          final Object base = currentPage.getBaseObject();
          final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
          Platform.putInt(base, pageCursor, length);
          pageCursor += 4;
          Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);  <= 神來之筆
          pageCursor += length;
          inMemSorter.insertRecord(recordAddress, partitionId);      <= 神來之筆,排序後寫入記憶體。
        }
    

1.3 ShuffleExternalSorter spill 程式碼欣賞

  • writeSortedFile:作用在於將記憶體中的記錄排序後輸出到磁碟中,排序規則有兩種: 一種:對分割槽ID進行排序。二種是採用基數排序(Radix Sort)

       public long spill(long size, MemoryConsumer trigger) throws IOException {
          if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
            return 0L;
          }
          logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
            Thread.currentThread().getId(),
            Utils.bytesToString(getMemoryUsage()),
            spills.size(),
            spills.size() > 1 ? " times" : " time");
      
          writeSortedFile(false);                              <= 神來之筆
          final long spillSize = freeMemory();
          inMemSorter.reset();
          
          // Reset the in-memory sorter's pointer array only after freeing up the memory pages
          holding the records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, we might not be able to get memory for the pointer array.
          
          taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
          return spillSize;
        }
    

2 最後

本篇需要挖掘的點還有很多,鑑於可參考的資料太少,只能暫時到此結束,後續會繼續完善

秦凱新 於深圳