1. 程式人生 > >spark原始碼分析之TaskMemoryManager

spark原始碼分析之TaskMemoryManager

概述

TaskMemoryManager用於管理每個task分配的記憶體。

在off-heap記憶體模式中,可以用64-bit的地址來表示記憶體地址。在on-heap記憶體模式中,通過base object的引用 和該物件中64-bit 的偏移量來表示記憶體地址。

當我們想要儲存其它結構內部的資料結構的指標時,這是一個問題,例如記錄hashmap或者sorting buffer的指標。即使我們使用128-bit來表示記憶體地址,我們也不能僅僅儲存base object的地址,因為它在堆中是不穩定的,會因為GC而重組。

相反,我們使用以下方法來編碼64-bit長的記錄指標:對於off-heap記憶體模式,僅僅儲存原始地址,對於on-heap記憶體模式,使用該地址的高13位來儲存一個page number,低51位來儲存這個page內的偏移。這些page number可以用來索引到MemoryManager內的page table 陣列,從而檢索到base object。

這允許我們定址8192個page。 在on-heap記憶體模式中,最大page大小受限於long []陣列的最大大小,允許我們定址8192 *(2 ^ 31 - 1)* 8個位元組,這是大約140 terabytes的記憶體。

TaskMemoryManager的實現其實是project Tungsten的一部分。Tungsten從memory和cpu層面對spark的效能進行了優化。

在預設情況下堆外記憶體並不啟用,可通過配置 spark.memory.offHeap.enabled 引數啟用,並由 spark.memory.offHeap.size 引數設定堆外空間的大小。

unsafe.allocateMemory方法分配堆外記憶體

unsafe.java

 /**
     * Allocates a new block of native memory, of the given size in bytes.  The
     * contents of the memory are uninitialized; they will generally be
     * garbage.  The resulting native pointer will never be zero, and will be
     * aligned for all value types.  Dispose of this memory by calling {@link
     * #freeMemory}, or resize it with {@link #reallocateMemory}.
     *
     * @throws IllegalArgumentException if the size is negative or too large
     *         for the native size_t type
     *
     * @throws OutOfMemoryError if the allocation is refused by the system
     *
     * @see #getByte(long)
     * @see #putByte(long, byte)
     */
	 //基於給定的目標記憶體的位元組大小,分配一塊新的本地記憶體,返回記憶體塊的起始地址
    public native long allocateMemory(long bytes);

 unsafe.cpp

UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
 //基於給定的目標記憶體的位元組大小,分配一塊新的本地記憶體,返回記憶體塊的起始地址的指標
  void* x = os::malloc(sz, mtInternal);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);  //將該指標轉換成jlong資料型別
UNSAFE_END
       void *malloc(size_t size);
       void free(void *ptr);
       void *realloc(void *ptr, size_t size);

       The malloc() function allocates size bytes and returns a pointer to
       the allocated memory.  The memory is not initialized.  If size is 0,
       then malloc() returns either NULL, or a unique pointer value that can
       later be successfully passed to free().

       The free() function frees the memory space pointed to by ptr, which
       must have been returned by a previous call to malloc(), calloc(), or
       realloc().  Otherwise, or if free(ptr) has already been called
       before, undefined behavior occurs.  If ptr is NULL, no operation is
       performed.  
  
       The realloc() function changes the size of the memory block pointed
       to by ptr to size bytes.  The contents will be unchanged in the range
       from the start of the region up to the minimum of the old and new
       sizes.  If the new size is larger than the old size, the added memory
       will not be initialized.  

 addr_to_java方法將地址指標轉換成jlong資料型別

inline void* addr_from_java(jlong addr) {
  // This assert fails in a variety of ways on 32-bit systems.
  // It is impossible to predict whether native code that converts
  // pointers to longs will sign-extend or zero-extend the addresses.
  //assert(addr == (uintptr_t)addr, "must not be odd high bits");
  return (void*)(uintptr_t)addr;
}

inline jlong addr_to_java(void* p) {
  assert(p == (void*)(uintptr_t)p, "must not be odd high bits");
  return (uintptr_t)p;
}

 關於uintptr_t的介紹如下:

它是一個可以儲存指標值的unsigned int資料型別。

It is an unsigned int that is capable of storing a pointer. Which typically means that it's the same size as a pointer.

It is optionally defined in C++11 and later standards.

A common reason to want an integer type that can hold an architecture's pointer type is to perform integer-specific operations on a pointer, or to obscure the type of a pointer by providing it as an integer "handle".

a UINT_PTR (as well as the more standardized uintptr_t) is defined to be an unsigned integer that is guaranteed to be large enough to hold a pointer value. It's typically used for tricky code where pointers are put into integer values and vice-versa.

The _W64 annotation is a note to the Miscrosoft compiler that when compiling for a 64-bit target, the variable should be 64 bits wide instead of the usual 32, since on 64-bit platforms, pointers are 64 bits, but unsigned ints are usually still 32 bits. This ensures that sizeof(UINT_PTR) >= sizeof(void*) for all target platforms.

 指標型別的長度和long型別的長度總是相同的。

32和64位C語言內建資料型別,如下表所示:

上表中第一行的大寫字母和數字含義如下所示:
I表示:int型別
L表示:long型別
P表示:pointer指標型別
32表示:32位系統
64表示64位系統
如:LP64表示,在64位系統下的long型別和pointer型別長度為64位。
64位Linux 使用了 LP64 標準,即:long型別和pointer型別長度為64位,其他型別的長度和32位系統下相同型別的長度相同。

unsafe對堆內記憶體的java物件的欄位地址定位

jniHandles.hpp

JNIHandles的resolve方法將物件引用(物件控制代碼)轉換成oop。

當我們在堆上建立一個物件例項後,就要通過虛擬機器棧中的reference型別資料來操作堆上的物件。在hotspot虛擬機器中,reference中儲存的就是物件地址。

物件引用到物件地址存在以下的轉換關係:

jobject handle >> oop p   >>  (address) p

// Interface for creating and resolving local/global JNI handles

class JNIHandles : AllStatic {
  friend class VMStructs;
 private:
  static JNIHandleBlock* _global_handles;             // First global handle block
  static JNIHandleBlock* _weak_global_handles;        // First weak global handle block
  static oop _deleted_handle;                         // Sentinel marking deleted handles

 public:
  // Resolve handle into oop
  inline static oop resolve(jobject handle);

unsafe.cpp

index_oop_from_field_offset_long方法

該方法根據傳入的引數:物件引用對應的oop;成員變數在物件的偏移;求出成員變數的地址。

其中,成員變數在物件的偏移必須大於物件頭的大小。

inline void* index_oop_from_field_offset_long(oop p, jlong field_offset) {
  jlong byte_offset = field_offset_to_byte_offset(field_offset);
  // Don't allow unsafe to be used to read or write the header word of oops
  assert(p == NULL || field_offset >= oopDesc::header_size(), "offset must be outside of header");
#ifdef ASSERT
  if (p != NULL) {
    assert(byte_offset >= 0 && byte_offset <= (jlong)MAX_OBJECT_SIZE, "sane offset");
    if (byte_offset == (jint)byte_offset) {
      void* ptr_plus_disp = (address)p + byte_offset;
      assert((void*)p->obj_field_addr<oop>((jint)byte_offset) == ptr_plus_disp,
             "raw [ptr+disp] must be consistent with oop::field_base");
    }
    jlong p_size = HeapWordSize * (jlong)(p->size());
    assert(byte_offset < p_size, err_msg("Unsafe access: offset " INT64_FORMAT " > object's size " INT64_FORMAT, byte_offset, p_size));
  }
#endif
  if (sizeof(char*) == sizeof(jint))    // (this constant folds!)
    return (address)p + (jint) byte_offset;
  else
    return (address)p +        byte_offset;
}

欄位定位——ObjectFieldOffset方法

該方法基於給定的靜態欄位的名稱,求出該靜態成員變數在物件的偏移。靜態成員變數在物件的偏移只跟類有關,一個類的所有物件,某個成員變數在物件的偏移都是一樣的。

/***
   * Returns the memory address offset of the given static field.
   * The offset is merely used as a means to access a particular field
   * in the other methods of this class.  The value is unique to the given
   * field and the same value should be returned on each subsequent call.
   * 返回指定靜態field的記憶體地址偏移量,在這個類的其他方法中這個值只是被用作一個訪問
   * 特定field的一個方式。這個值對於 給定的field是唯一的,並且後續對該方法的呼叫都應該
   * 返回相同的值。
   *
   * @param field the field whose offset should be returned.
   *              需要返回偏移量的field
   * @return the offset of the given field.
   *         指定field的偏移量
   */
  public native long objectFieldOffset(Field field);

陣列定位——arrayBaseOffset方法

該方法求出給定的陣列型別中,陣列第一個元素的偏移地址。

 /***
   * Returns the offset of the first element for a given array class.
   * To access elements of the array class, this value may be used along with
   * with that returned by 
   * <a href="#arrayIndexScale"><code>arrayIndexScale</code></a>,
   * if non-zero.
   * 獲取給定陣列中第一個元素的偏移地址。
   * 為了存取陣列中的元素,這個偏移地址與<a href="#arrayIndexScale"><code>arrayIndexScale
   * </code></a>方法的非0返回值一起被使用。
   * @param arrayClass the class for which the first element's address should
   *                   be obtained.
   *                   第一個元素地址被獲取的class
   * @return the offset of the first element of the array class.
   *    陣列第一個元素 的偏移地址
   * @see arrayIndexScale(Class)
   */
  public native int arrayBaseOffset(Class arrayClass);

spark Tungsten的記憶體管理

platform封裝unsafe

spark的platfom類只是對unsafe的簡單封裝。

其中,allocateMemory方法用於分配堆外記憶體,LONG_ARRAY_OFFSET用於定位堆內記憶體long array的陣列第一個元素的地址

paltform.java

 public static long allocateMemory(long size) {
    return _UNSAFE.allocateMemory(size);
  }

  public static void freeMemory(long address) {
    _UNSAFE.freeMemory(address);
  }


  public static final int LONG_ARRAY_OFFSET;

  static{
    //部分內容省略
    LONG_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(long[].class);
  }

 UnsafeMemoryAllocator分配堆外記憶體

UnsafeMemoryAllocator類用於分配堆外記憶體,底層呼叫unsafe類。

UnsafeMemoryAllocator.java

/**
 * A simple {@link MemoryAllocator} that uses {@code Unsafe} to allocate off-heap memory.
 */
public class UnsafeMemoryAllocator implements MemoryAllocator {

  @Override
  public MemoryBlock allocate(long size) throws OutOfMemoryError {
    long address = Platform.allocateMemory(size);
    MemoryBlock memory = new MemoryBlock(null, address, size);
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
    }
    return memory;
  }

  @Override
  public void free(MemoryBlock memory) {
    assert (memory.obj == null) :
      "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
      "page has already been freed";
    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
      "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";

    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
    }
    Platform.freeMemory(memory.offset);
    // As an additional layer of defense against use-after-free bugs, we mutate the
    // MemoryBlock to reset its pointer.
    memory.offset = 0;
    // Mark the page as freed (so we can detect double-frees).
    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
  }
}

HeapMemoryAllocator分配堆內記憶體

HeapMemoryAllocator類用於分配堆內記憶體,本質是建立long array作為堆內記憶體,並封裝成MemoryBlock。

Platform.LONG_ARRAY_OFFSET是long array陣列型別中,陣列第一個元素相對陣列的偏移。

HeapMemoryAllocator.java

public MemoryBlock allocate(long size) throws OutOfMemoryError {
    if (shouldPool(size)) {
      synchronized (this) {
        final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);  //long Arrray的弱引用連結串列
        if (pool != null) {
          while (!pool.isEmpty()) {
            final WeakReference<long[]> arrayReference = pool.pop();//從弱引用連結串列獲取(頭節點獲取)一個弱引用
            final long[] array = arrayReference.get(); //弱引用對應的long Array
            if (array != null) {
              assert (array.length * 8L >= size); //如果該long Array可容納的位元組數大於目標size
			  //將long Array封裝成MemoryBlock
              MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); 
              if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
                memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
              }
              return memory;
            }
          }
          bufferPoolsBySize.remove(size);
        }
      }
    }
    long[] array = new long[(int) ((size + 7) / 8)]; //建立一個long array
     //將long Array封裝成MemoryBlock
    MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
    }
    return memory;
  }

MemoryBlock封裝記憶體塊的起始地址和位元組大小

MemoryBlock繼承自MemoryLocation,與父類的最大區別,它擴充了2個欄位:

size:用於儲存記憶體塊的位元組大小;

pageNumber:用於儲存當TaskMemoryManager將該MemoryBlock新增到pageTable時,對應的陣列索引號。

MemoryBlock.java

/**
 * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
     一個連續的記憶體塊,從MemoryLocation記錄的起始地址開始,具有固定的位元組大小。
 */
public class MemoryBlock extends MemoryLocation {

  /** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers 
    特殊pageNumber值,當TaskMemoryManager未給MemoryBlock設定pageNumber時,設定pageNUmber為-1
  */
  public static final int NO_PAGE_NUMBER = -1;

  /**
   * Special `pageNumber` value for marking pages that have been freed in the TaskMemoryManager.
   * We set `pageNumber` to this value in TaskMemoryManager.freePage() so that MemoryAllocator
   * can detect if pages which were allocated by TaskMemoryManager have been freed in the TMM
   * before being passed to MemoryAllocator.free() (it is an error to allocate a page in
   * TaskMemoryManager and then directly free it in a MemoryAllocator without going through
   * the TMM freePage() call).
     特殊pageNumber值,用來標記該MemoryBlock已經被TaskMemoryManager釋放。
   */
  public static final int FREED_IN_TMM_PAGE_NUMBER = -2;

  /**
   * Special `pageNumber` value for pages that have been freed by the MemoryAllocator. This allows
   * us to detect double-frees.
   特殊pageNumber值,用來標記該MemoryBlock已經被MemoryAllocator釋放。
   */
  public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;

  private final long length;

  /**
   * Optional page number; used when this MemoryBlock represents a page allocated by a
   * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
   * which lives in a different package.
     初始化為-1。當MemoryBlock代表一個page被TaskMemoryManger分配時會使用該欄位。
   */
  public int pageNumber = NO_PAGE_NUMBER;

 
  //當是堆外記憶體分配時,obj為null,offset是malloc函式返回的絕對地址(address),
  //length是malloc函式的目標記憶體大小(size);
  //當是堆內記憶體分配時,obj為long array的引用,offset是long array型別的陣列第一個元素相對陣列的偏移,
  //length約等於long array的長度乘以8(因為分配時可能添多幾個位元組以使8位元組對齊)
  public MemoryBlock(@Nullable Object obj, long offset, long length) {
    super(obj, offset);
    this.length = length;
  }

  /**
   * Returns the size of the memory block.
   */
  public long size() {
    return length;
  }

  /**
   * Creates a memory block pointing to the memory used by the long array.
     建立一個MemoryBlock,用來指向目標long array使用的記憶體。
   */
  public static MemoryBlock fromLongArray(final long[] array) {
    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
  }

  /**
   * Fills the memory block with the specified byte value.
   */
  public void fill(byte value) {
    Platform.setMemory(obj, offset, length, value);
  }
}

MemoryLocation儲存記憶體塊的起始地址

如果是堆外記憶體,obj為null,offset是malloc函式返回的絕對地址;

如果是堆內記憶體,obj是物件引用,offset是成員變數在物件中的偏移;

 MemoryLocation.java

/**
 * A memory location. Tracked either by a memory address (with off-heap allocation),
 * or by an offset from a JVM object (in-heap allocation).
 記憶體位置,如果是堆外分配記憶體,根據記憶體地址追蹤;如果是堆內分配記憶體,根據jvm物件的偏移量追蹤。
 */
public class MemoryLocation {

  @Nullable
  Object obj;

  long offset;

  public MemoryLocation(@Nullable Object obj, long offset) {
    this.obj = obj;
    this.offset = offset;
  }

  public MemoryLocation() {
    this(null, 0);
  }

  public void setObjAndOffset(Object newObj, long newOffset) {
    this.obj = newObj;
    this.offset = newOffset;
  }

  public final Object getBaseObject() {
    return obj;
  }

  public final long getBaseOffset() {
    return offset;
  }
}

TaskMemoryManger記憶體管理

鋪墊了那麼久,終於來到本文的重頭戲了。

TaskMemoryManger用於管理分配給某個task的記憶體。

該類提供了以下功能:

1、一個MemoryConsumer的HashSet集合,用於管理所有獲得ExecutionMemory的MemoryConsumer;

2、一個MemoryBlock陣列型別——pageTable,用於管理已分配的MemoryBlock;

3、分配記憶體,並對分配的堆內記憶體或堆外記憶體的地址進行了統一表示。

4、釋放記憶體。

成員變數

/** The number of bits used to address the page table. */
  private static final int PAGE_NUMBER_BITS = 13;

  /** The number of bits used to encode offsets in data pages. */
  @VisibleForTesting
  static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS;  // 51

  /** The number of entries in the page table. */
  private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;  //pageTable的大小為2的13次方

  /**
   * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
   * (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
   * maximum page size is limited by the maximum amount of data that can be stored in a long[]
   * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17
   * gigabytes.
   */
  public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;

  /** Bit mask for the lower 51 bits of a long. */
  private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;

  /**
   * Similar to an operating system's page table, this array maps page numbers into base object
   * pointers, allowing us to translate between the hashtable's internal 64-bit address
   * representation and the baseObject+offset representation which we use to support both in- and
   * off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
   * When using an in-heap allocator, the entries in this map will point to pages' base objects.
   * Entries are added to this map as new data pages are allocated.
     與作業系統的page table相似,這個陣列將page number對映到base object的指標。它允許我們在hashtable
	 的內部64位地址表示和object+offset表示之間轉換,從而支援堆內地址和堆外地址。當我們使用堆外的allocator時,
	 在這個map中的每個entry都為null。當使用堆內的allocator時,在這個map中的entry指向page的base object。
	 當分配一個新的data page時,一個entry會新增到這個map。	 
   */
  private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];

  /**
   * Bitmap for tracking free pages.
   */
  private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);

  private final MemoryManager memoryManager;

  private final long taskAttemptId;

  /**
   * Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
   * without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
   * this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
   */
  final MemoryMode tungstenMemoryMode;

  /**
   * Tracks spillable memory consumers.
   */
  @GuardedBy("this")
  private final HashSet<MemoryConsumer> consumers;

allocatePage方法

該方法從堆內或堆外分配一塊MemoryBlock,並作為page新增到pageTable中。

/**
   * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
   * intended for allocating large blocks of Tungsten memory that will be shared between operators.
   *
   * Returns `null` if there was not enough memory to allocate the page. May return a page that
   * contains fewer bytes than requested, so callers should verify the size of returned pages.
   *
   * @throws TooLargePageException
   */
  public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
    assert(consumer != null);
    assert(consumer.getMode() == tungstenMemoryMode);
    if (size > MAXIMUM_PAGE_SIZE_BYTES) {
      throw new TooLargePageException(size);
    }

    long acquired = acquireExecutionMemory(size, consumer); //獲取consumer所需位元組
    if (acquired <= 0) {
      return null;
    }

    final int pageNumber;
    synchronized (this) {
      pageNumber = allocatedPages.nextClearBit(0);
      if (pageNumber >= PAGE_TABLE_SIZE) {
        releaseExecutionMemory(acquired, consumer);
        throw new IllegalStateException(
          "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
      }
      allocatedPages.set(pageNumber);
    }
    MemoryBlock page = null;
    try {
      page = memoryManager.tungstenMemoryAllocator().allocate(acquired);//分配MemoryBlock
    } catch (OutOfMemoryError e) {
      logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
      // there is no enough memory actually, it means the actual free memory is smaller than
      // MemoryManager thought, we should keep the acquired memory.
      synchronized (this) {
        acquiredButNotUsed += acquired;
        allocatedPages.clear(pageNumber);
      }
      // this could trigger spilling to free some pages.
      return allocatePage(size, consumer);
    }
    page.pageNumber = pageNumber;  //設定MemoryBlock的pageNumber欄位
    pageTable[pageNumber] = page;  //將MemoryBlock作為page新增到PageTable
    if (logger.isTraceEnabled()) {
      logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
    }
    return page;
  }

 對page進行統一編碼

通過上面的分析我們知道,page對應的記憶體可能來自堆或堆外。但這顯然不應該由上層操作者來操心,所以 TaskMemoryManager 提供了只需傳入 page 及要訪問該 page 上的 offset 就能獲得一個 long 型的地址。這樣應用者只需操作自該地址起的某一段記憶體即可,而不用關心這塊記憶體是來自哪。

/**
   * Given a memory page and offset within that page, encode this address into a 64-bit long.
   * This address will remain valid as long as the corresponding page has not been freed.
   *針對某個Page 的地址進行編碼
   * @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/
   * @param offsetInPage an offset in this page which incorporates the base offset. In other words,
   *                     this should be the value that you would pass as the base offset into an
   *                     UNSAFE call (e.g. page.baseOffset() + something).
     對於on-heap模式 :offsetInPage 是針對base object 的偏移量。
	 對於off-heap模式 :offsetInPage 是絕對地址
   * @return an encoded page address.
   */
  public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
    if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
      // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
      // encode. Due to our page size limitation, though, we can convert this into an offset that's
      // relative to the page's base offset; this relative offset will fit in 51 bits.
	  //在off-heap記憶體模式中,offset是用64-bit編碼的絕對地址。因為我們的page大小限制,需要轉換成51-bit編碼。
	  //我們把offsetInPage轉換成page內的相對offset,這個相對offset會用51-bit填充。
      offsetInPage -= page.getBaseOffset();
    }
    return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
  }

  @VisibleForTesting
  //將MemoryBlock的pageNumber欄位轉為13-bit長,offset欄位轉為51-bit長,並組裝成編碼地址
  //該編碼地址是邏輯地址,解碼後可轉為MemoryLocation的真實地址
  public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
    assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";	
    return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
  }

  @VisibleForTesting
  //將組裝後的編碼地址解碼成MemoryBlock的pageNumber,即對應的高13bit內容
  public static int decodePageNumber(long pagePlusOffsetAddress) {
	      return (int) (pagePlusOffsetAddress >>> OFFSET_BITS);
  }
  //將組裝後的編碼地址解碼成MemoryBlock的offset,即對應的低51bit內容
  private static long decodeOffset(long pagePlusOffsetAddress) {	  
    return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
  }

  /**
   * Get the page associated with an address encoded by
   * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
   */
   //在TaskMemoryManager類中還提供了針對on-heap記憶體模式下,獲取baseobject的方法
  public Object getPage(long pagePlusOffsetAddress) {
    if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
      final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
      assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
      final MemoryBlock page = pageTable[pageNumber];
      assert (page != null);
      assert (page.getBaseObject() != null);
      return page.getBaseObject();
    } else {
      return null;
    }
  }

  /**
   * Get the offset associated with an address encoded by
   * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
   */
  public long getOffsetInPage(long pagePlusOffsetAddress) {
    final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
    if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
      return offsetInPage;
    } else {
      // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
      // converted the absolute address into a relative address. Here, we invert that operation:
	  //在off-heap記憶體模式中,offset是絕對地址。在encodePageNumbrAndOffset方法中,
	  //我們把這個絕對地址轉換成相對地址,在這裡我們反轉這個操作,將相對地址轉換成絕對地址:
      final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
      assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
      final MemoryBlock page = pageTable[pageNumber];
      assert (page != null);
      return page.getBaseOffset() + offsetInPage;
    }
  }

配置是否使用堆外記憶體

在預設情況下堆外記憶體並不啟用,可通過配置 spark.memory.offHeap.enabled 引數啟用,並由 spark.memory.offHeap.size 引數設定堆外空間的大小。

MemoryManager.scala

 /**
   * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
   * sun.misc.Unsafe.
   */
  final val tungstenMemoryMode: MemoryMode = {
    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
      require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
        "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
      require(Platform.unaligned(),
        "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
      MemoryMode.OFF_HEAP
    } else {
      MemoryMode.ON_HEAP
    }
  }

 在Taskmemorymanager初始化的時候,會呼叫MemoryManager#tungstenMemoryMode()方法返回記憶體模式,並設定到tungstenMemoryMode欄位

 /**
   * Construct a new TaskMemoryManager.
   */
  public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
    this.memoryManager = memoryManager;
    this.taskAttemptId = taskAttemptId;
    this.consumers = new HashSet<>();
  }

 tungstenMemoryMode欄位可以通過getTungstenMemoryMode方法獲取

/**
   * Returns Tungsten memory mode
   */
  public MemoryMode getTungstenMemoryMode() {
    return tungstenMemoryMode;
  }

因為TaskMemoryManager有memoryManager欄位來儲存MemoryManager的引用,所以在呼叫allocatePage方法分配記憶體時,會呼叫MemoryManager#tungstenMemoryAllocator方法來選擇使用以下哪種allocator:

  • HeapMemoryAllocator
  • UnsafeMemoryAllocator
/**
   * Allocates memory for use by Unsafe/Tungsten code.
   */
  private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
    tungstenMemoryMode match {
      case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
      case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
    }
  }

參考: 

關於堆外記憶體和堆內記憶體的介紹可以參閱:Apache Spark 記憶體管理詳解,此處不再贅述。