1. 程式人生 > >spark記憶體管理器--MemoryManager原始碼解析

spark記憶體管理器--MemoryManager原始碼解析

MemoryManager記憶體管理器

記憶體管理器可以說是spark核心中最重要的基礎模組之一,shuffle時的排序,rdd快取,展開記憶體,廣播變數,Task執行結果的儲存等等,凡是需要使用記憶體的地方都需要向記憶體管理器定額申請。我認為記憶體管理器的主要作用是為了儘可能減小記憶體溢位的同時提高記憶體利用率。舊版本的spark的記憶體管理是靜態記憶體管理器StaticMemoryManager,而新版本(應該是從1.6之後吧,記不清了)則改成了統一記憶體管理器UnifiedMemoryManager,同一記憶體管理器相對於靜態記憶體管理器最大的區別在於執行記憶體和儲存記憶體二者之間沒有明確的界限,可以相互借用,但是執行記憶體的優先順序更高,也就是說如果執行記憶體不夠用就會擠佔儲存記憶體,這時會將一部分快取的rdd溢寫到磁碟上直到騰出足夠的空間。但是執行記憶體任何情況下都不會被擠佔,想想這也可以理解,畢竟執行記憶體是用於shuffle時排序的,這隻能在記憶體中進行,而rdd快取的要求就沒有這麼嚴格。

有幾個引數控制各個部分記憶體的使用比例,

  • spark.memory.fraction,預設值0.6,這個引數控制spark記憶體管理器管理的記憶體佔記憶體存的比例(準確地說是:堆記憶體-300m,300m是為永久代預留),也就是說執行記憶體和儲存記憶體加起來只有(堆記憶體-300m)的0.6,剩餘的0.4是用於使用者程式碼執行過程中的記憶體佔用,比如你的程式碼中可能會載入一些較大的檔案到記憶體中,或者做一些排序,使用者程式碼使用的記憶體並不受記憶體管理器管理,所以需要預留一定的比例。
  • spark.memory.storageFraction,預設值0.5,顧名思義,這個值決定了儲存記憶體的佔比,注意是佔記憶體管理器管理的那部分記憶體的比例,剩餘的部分用作執行記憶體。例如,預設情況下,儲存記憶體佔堆記憶體的比例是0.6 * 0.5 = 0.3(當然準確地說是佔堆記憶體-300m的比例)。

MemoryManager概述

我們首先整體看一下MemoryManager這個類,

    maxOnHeapStorageMemory
    maxOffHeapStorageMemory
    setMemoryStore
    acquireStorageMemory
    acquireUnrollMemory
    acquireExecutionMemory
    releaseExecutionMemory
    releaseAllExecutionMemoryForTask
    releaseStorageMemory
    releaseAllStorageMemory
    releaseUnrollMemory
    executionMemoryUsed
    storageMemoryUsed
    getExecutionMemoryUsageForTask

可以發現,MemoryManager內部的方法比較少而且是有規律的,它將記憶體在功能上分為三種:StorageMemory,UnrollMemory,ExecutionMemory,
針對這三種記憶體分別有申請記憶體的方法和釋放記憶體的方法,並且三種申請記憶體的方法都是抽象方法,由子類實現。
此外,我們看一下MemoryManager內部有哪些成員變數:

    protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
    protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
    protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
    protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

這四個成員變數分別代表四種記憶體池。這裡要注意的是,MemoryPool的構造其中有一個Object型別引數用於同步鎖,MemoryPool內部的一些方法會獲取該物件鎖用於同步。
我們看一下他們的初始化:

    onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
    onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
    offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
    offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

MemoryManager.releaseExecutionMemory

其實就是呼叫ExecutionMemoryPool的相關方法,

  private[memory]
  def releaseExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Unit = synchronized {
    memoryMode match {
      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
    }
  }

ExecutionMemoryPool.releaseMemory

程式碼邏輯很簡單,就不多說了。
其實從這個方法,我們大概可以看出,spark記憶體管理的含義,其實spark的記憶體管理說到底就是對記憶體使用量的記錄和管理,而並不是像作業系統或jvm那樣真正地進行記憶體的分配和回收。

def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
// 從內部的簿記量中獲取該任務使用的記憶體
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
// 檢查要釋放的記憶體是否超過了該任務實際使用的記憶體,並列印告警日誌
var memoryToFree = if (curMem < numBytes) {
  logWarning(
    s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
      s"of memory from the $poolName pool")
  curMem
} else {
  numBytes
}
if (memoryForTask.contains(taskAttemptId)) {
  // 更新簿記量
  memoryForTask(taskAttemptId) -= memoryToFree
  // 如果該任務的記憶體使用量小於等於0,那麼從簿記量中移除該任務
  if (memoryForTask(taskAttemptId) <= 0) {
    memoryForTask.remove(taskAttemptId)
  }
}
// 最後通知其他等待的執行緒
// 因為可能會有其他的任務在等待獲取執行記憶體
lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
}

MemoryManager.releaseAllExecutionMemoryForTask

把堆上的執行記憶體和直接記憶體的執行記憶體中該任務使用的記憶體都釋放掉,
onHeapExecutionMemoryPool和offHeapExecutionMemoryPool是同一個類,只是一個記錄執行記憶體對直接記憶體的使用,一個記錄執行記憶體對堆記憶體的使用。

private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
  offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

MemoryManager.releaseStorageMemory

對於儲存記憶體的使用的記錄並沒有執行記憶體那麼細,不會記錄每個RDD使用了多少記憶體

def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
  case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
  case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
}
}

MemoryManager.releaseUnrollMemory

這裡,我們看一下釋放展開記憶體的方法,發現展開記憶體使用的就是儲存記憶體。回顧一下BlockManager部分,展開記憶體的申請主要是在將資料通過MemoryStore儲存成塊時需要將資料臨時放在記憶體中,這時就需要申請展開記憶體。

final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
releaseStorageMemory(numBytes, memoryMode)
}

小結

從上面分析的幾個釋放記憶體的方法不難看出,所謂的釋放記憶體其實只是對記憶體管理器內部的一些簿記量的改變,這就要求外部的呼叫者必須確保它們確實釋放了這麼多的記憶體,否則記憶體管理就會和實際的記憶體使用情況出現很大偏差。當然,好在記憶體管理器是spark內部的模組,並不向用戶開放,所以在使用者程式碼中不會呼叫記憶體管理模組。

UnifiedMemoryManager

開篇我們講到,spark的記憶體管理器分為兩種,而新的版本預設都是使用統一記憶體管理器UnifiedMemoryManager,後面靜態記憶體管理器會逐漸啟用,所以這裡我們也重點分析統一記憶體管理。
前面,我們分析了父類MemoryManager中釋放記憶體的幾個方法,而申請記憶體的幾個方法都是抽象方法,這些方法的實現都是在子類中,也就是UnifiedMemoryManager中實現的。

UnifiedMemoryManager.acquireExecutionMemory

這個方法是用來申請執行記憶體的。其中定義了幾個區域性方法,maybeGrowExecutionPool方法用來擠佔儲存記憶體以擴充套件執行記憶體空間;
computeMaxExecutionPoolSize方法用來計算最大的執行記憶體大小。
最後呼叫了executionPool.acquireMemory方法實際申請執行記憶體。

override private[memory] def acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long = synchronized {
// 檢查記憶體大小是否正確
assertInvariants()
assert(numBytes >= 0)
// 根據堆記憶體還是直接記憶體決定使用不同的記憶體池和記憶體大小
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
  case MemoryMode.ON_HEAP => (
    onHeapExecutionMemoryPool,
    onHeapStorageMemoryPool,
    onHeapStorageRegionSize,
    maxHeapMemory)
  case MemoryMode.OFF_HEAP => (
    offHeapExecutionMemoryPool,
    offHeapStorageMemoryPool,
    offHeapStorageMemory,
    maxOffHeapMemory)
}

/**
 * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
 *
 * When acquiring memory for a task, the execution pool may need to make multiple
 * attempts. Each attempt must be able to evict storage in case another task jumps in
 * and caches a large block between the attempts. This is called once per attempt.
 */
// 通過擠佔儲存記憶體來擴張執行記憶體,
// 通過將快取的塊溢寫到磁碟上,從而為執行記憶體騰出空間
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
  if (extraMemoryNeeded > 0) {
    // There is not enough free memory in the execution pool, so try to reclaim memory from
    // storage. We can reclaim any free memory from the storage pool. If the storage pool
    // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
    // the memory that storage has borrowed from execution.
    // 我們可以將剩餘的儲存記憶體都借過來用作執行記憶體
    // 另外,如果儲存記憶體向執行記憶體借用了一部分記憶體,也就是說此時儲存記憶體的實際大小大於配置的值
    // 那麼我們就將所有的借用的儲存記憶體都還回來
    val memoryReclaimableFromStorage = math.max(
      storagePool.memoryFree,
      storagePool.poolSize - storageRegionSize)
    if (memoryReclaimableFromStorage > 0) {
      // Only reclaim as much space as is necessary and available:
      // 只騰出必要大小的記憶體空間,這個方法會將記憶體中的block擠到磁碟中
      val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
        math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
      // 更新一些簿記量,儲存記憶體少了這麼多記憶體,相應的執行記憶體增加了這麼多記憶體
      storagePool.decrementPoolSize(spaceToReclaim)
      executionPool.incrementPoolSize(spaceToReclaim)
    }
  }
}

/**
 * The size the execution pool would have after evicting storage memory.
 *
 * The execution memory pool divides this quantity among the active tasks evenly to cap
 * the execution memory allocation for each task. It is important to keep this greater
 * than the execution pool size, which doesn't take into account potential memory that
 * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
 *
 * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
 * in execution memory allocation across tasks, Otherwise, a task may occupy more than
 * its fair share of execution memory, mistakenly thinking that other tasks can acquire
 * the portion of storage memory that cannot be evicted.
 */
def computeMaxExecutionPoolSize(): Long = {
  maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}

executionPool.acquireMemory(
  numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}

ExecutionMemoryPool.acquireMemory

這個方法的程式碼我就不貼了,主要是一些複雜的記憶體申請規則的計算,以及內部簿記量的維護,此外如果現有可用的記憶體量太小,則會等待(通過物件鎖等待)直到其他任務釋放一些記憶體;
除此之外最重要的就是對上面提到的maybeGrowExecutionPool方法的呼叫,所以我們重點還是看一下maybeGrowExecutionPool方法。

maybeGrowExecutionPool

由於這個方法在前面已經貼出來,並且標上了很詳細的註釋,所以程式碼邏輯略過,其中有一個關鍵的呼叫storagePool.freeSpaceToShrinkPool,這個方法實現了將記憶體中的塊擠出去的邏輯。

storagePool.freeSpaceToShrinkPool

我們發現其中呼叫了memoryStore.evictBlocksToFreeSpace方法,

def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
    if (remainingSpaceToFree > 0) {
      // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
      val spaceFreedByEviction =
        memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
      // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
      // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
    } else {
      spaceFreedByReleasingUnusedMemory
    }
  }

memoryStore.evictBlocksToFreeSpace

這個方法看似很長,其實大概可以總結為一點。
因為MemoryStore儲存了記憶體中所有塊的實際資料,所以可以根據這些資訊知道每個塊實際大小,這樣就能計算出需要擠出哪些塊,當然這個過程中還有一些細節的處理,比如塊的寫鎖的獲取和釋放等等。
這裡面,實際將塊從記憶體中釋放(本質上就是將塊的資料對應的MemoryEntry的引用設為null,這樣gc就可以回收這個塊)的功能程式碼在blockEvictionHandler.dropFromMemory方法中實現,也就是
BlockManager.dropFromMemory。

private[spark] def evictBlocksToFreeSpace(
  blockId: Option[BlockId],
  space: Long,
  memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
  var freedMemory = 0L
  val rddToAdd = blockId.flatMap(getRddId)
  val selectedBlocks = new ArrayBuffer[BlockId]
  def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
    entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
  }
  // This is synchronized to ensure that the set of entries is not changed
  // (because of getValue or getBytes) while traversing the iterator, as that
  // can lead to exceptions.
  entries.synchronized {
    val iterator = entries.entrySet().iterator()
    while (freedMemory < space && iterator.hasNext) {
      val pair = iterator.next()
      val blockId = pair.getKey
      val entry = pair.getValue
      if (blockIsEvictable(blockId, entry)) {
        // We don't want to evict blocks which are currently being read, so we need to obtain
        // an exclusive write lock on blocks which are candidates for eviction. We perform a
        // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
        // 這裡之所以要獲取寫鎖是為了防止在塊正在被讀取或寫入的時候將其擠出去
        if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
          selectedBlocks += blockId
          freedMemory += pair.getValue.size
        }
      }
    }
  }

  def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
    val data = entry match {
      case DeserializedMemoryEntry(values, _, _) => Left(values)
      case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
    }
    // 這裡的呼叫將塊擠出記憶體,如果允許寫到磁碟則溢寫到磁碟上
    // 注意blockEvictionHandler的實現類就是BlockManager
    val newEffectiveStorageLevel =
      blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
    if (newEffectiveStorageLevel.isValid) {
      // The block is still present in at least one store, so release the lock
      // but don't delete the block info
      // 因為前面獲取了這些塊的寫鎖,還沒有釋放,
      // 所以在這裡釋放這些塊的寫鎖
      blockInfoManager.unlock(blockId)
    } else {
      // The block isn't present in any store, so delete the block info so that the
      // block can be stored again
      // 因為塊由於從記憶體中移除又沒有寫到磁碟上,所以直接從內部的簿記量中移除該塊的資訊
      blockInfoManager.removeBlock(blockId)
    }
  }

  // 如果騰出的記憶體足夠多,比申請的量要大,這時才會真正釋放相應的塊
  if (freedMemory >= space) {
    var lastSuccessfulBlock = -1
    try {
      logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
        s"(${Utils.bytesToString(freedMemory)} bytes)")
      (0 until selectedBlocks.size).foreach { idx =>
        val blockId = selectedBlocks(idx)
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        // This should never be null as only one task should be dropping
        // blocks and removing entries. However the check is still here for
        // future safety.
        if (entry != null) {
          dropBlock(blockId, entry)
          // 這時為測試留的一個鉤子方法
          afterDropAction(blockId)
        }
        lastSuccessfulBlock = idx
      }
      logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
        s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
      freedMemory
    } finally {
      // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
      // with InterruptedException
      // 如果不是所有的塊都轉移成功,那麼必然有的塊的寫鎖可能沒有釋放
      // 所以在這裡將這些沒有移除成功的塊的寫鎖釋放掉
      if (lastSuccessfulBlock != selectedBlocks.size - 1) {
        // the blocks we didn't process successfully are still locked, so we have to unlock them
        (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
          val blockId = selectedBlocks(idx)
          blockInfoManager.unlock(blockId)
        }
      }
    }
  } else {// 如果不能騰出足夠多的記憶體,那麼取消這次行動,釋放所有已經持有的塊的寫鎖
    blockId.foreach { id =>
      logInfo(s"Will not store $id")
    }
    selectedBlocks.foreach { id =>
      blockInfoManager.unlock(id)
    }
    0L
  }
}
}

BlockManager.dropFromMemory

總結一下這個方法的主要邏輯:

  • 如果儲存級別允許存到磁碟,那麼先溢寫到磁碟上
  • 將block從MemoryStore內部的map結構中移除掉
  • 向driver上的BlockManagerMaster彙報塊更新
  • 向任務度量系統彙報塊更新的統計資訊

所以,七繞八繞,饒了這麼一大圈,其實所謂的記憶體擠佔,其實就是把引用設為null ^_^當然肯定不是這麼簡單啦,其實在整個分析的過程中我們也能發現,所謂的記憶體管理大部分工作就是對任務使用記憶體一些簿記量的管理維護,這裡面有一些比較複雜的邏輯,例如給每個任務分配多少記憶體的計算邏輯就比較複雜。

private[storage] override def dropFromMemory[T: ClassTag](
  blockId: BlockId,
  data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
val level = info.level

// Drop to disk, if storage level requires
// 如果儲存級別允許存到磁碟,那麼先溢寫到磁碟上
if (level.useDisk && !diskStore.contains(blockId)) {
  logInfo(s"Writing block $blockId to disk")
  data() match {
    case Left(elements) =>
      diskStore.put(blockId) { channel =>
        val out = Channels.newOutputStream(channel)
        serializerManager.dataSerializeStream(
          blockId,
          out,
          elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
      }
    case Right(bytes) =>
      diskStore.putBytes(blockId, bytes)
  }
  blockIsUpdated = true
}

// Actually drop from memory store
val droppedMemorySize =
  if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
  blockIsUpdated = true
} else {
  logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}

val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
  reportBlockStatus(blockId, status, droppedMemorySize)
}
// 向任務度量系統彙報塊更新的統計資訊
if (blockIsUpdated) {
  addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}

UnifiedMemoryManager.acquireStorageMemory

我們再來看一下對於儲存記憶體的申請。
其中,儲存記憶體向執行記憶體借用 的邏輯相對簡單,僅僅是將兩個記憶體池的大小改一下,執行記憶體池減少一定的大小,儲存記憶體池則增加相應的大小。

override def acquireStorageMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
  case MemoryMode.ON_HEAP => (
    onHeapExecutionMemoryPool,
    onHeapStorageMemoryPool,
    maxOnHeapStorageMemory)
  case MemoryMode.OFF_HEAP => (
    offHeapExecutionMemoryPool,
    offHeapStorageMemoryPool,
    maxOffHeapStorageMemory)
}
// 因為執行記憶體擠佔不了,所以這裡如果申請的記憶體超過現在可用的記憶體,那麼就申請不了了
if (numBytes > maxMemory) {
  // Fail fast if the block simply won't fit
  logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
    s"memory limit ($maxMemory bytes)")
  return false
}
// 如果大於儲存記憶體的可用記憶體,那麼就需要向執行記憶體借用一部分記憶體
if (numBytes > storagePool.memoryFree) {
  // There is not enough free memory in the storage pool, so try to borrow free memory from
  // the execution pool.
  val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
    numBytes - storagePool.memoryFree)
  // 儲存記憶體向執行記憶體借用的邏輯很簡單,
  // 僅僅是將兩個記憶體池的大小改一下,
  // 執行記憶體池減少一定的大小,儲存記憶體池則增加相應的大小
  executionPool.decrementPoolSize(memoryBorrowedFromExecution)
  storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
// 通過storagePool申請一定量的記憶體
storagePool.acquireMemory(blockId, numBytes)
}

StorageMemoryPool.acquireMemory

def acquireMemory(
  blockId: BlockId,
  numBytesToAcquire: Long,
  numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
// 首先呼叫MemoryStore的相關方法擠出一些塊以釋放記憶體
if (numBytesToFree > 0) {
  memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
// should have been updated.
// 因為前面擠出一些塊後釋放記憶體時,BlockManager會通過MemoryManager相關方法更新內部的簿記量,
// 所以這裡的memoryFree就會變化,會變大
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
  _memoryUsed += numBytesToAcquire
}
enoughMemory
}

可以看到,這裡也呼叫了memoryStore.evictBlocksToFreeSpace方法來講一部分塊擠出記憶體,以此來為新的block騰出空間。

UnifiedMemoryManager.acquireUnrollMemory

另外還有對展開記憶體的申請,實際就是申請儲存記憶體。

override def acquireUnrollMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, memoryMode)
}

總結

記憶體管理,本質上是對shuffle排序過程中使用的記憶體和rdd快取使用的記憶體的簿記,通過對記憶體使用量的詳細精確的記錄和管理,最大限度避免OOM的發生,同時儘量提高記憶體利用率