1. 程式人生 > >Spark1.6-----原始碼解讀之BlockManager元件MemoryStore

Spark1.6-----原始碼解讀之BlockManager元件MemoryStore

MemoryStore負責將沒有序列化的java物件陣列或者序列化的ByteBuffer儲存到記憶體中:

MemoryStore記憶體模型

maxUnrollMemory:當前Driver或者Executor的block最多提前佔用的記憶體的大小,每個執行緒都能佔記憶體。(類似上課佔座,人沒到,但是位置有了)

maxMemory:當前Driver或者Executor儲存所能利用最大記憶體大小。

currentMemoey:當前Driver或者Executor以及用了記憶體。

freeMemory:當前Driver或者Executor為使用的記憶體。

currentUnrollMemory:當前Driver或者Executor的block已經提前佔用的記憶體的大小,所有執行緒block已經提前佔用的記憶體的大小之和

unrollMemoryMap:都存入map中 key為執行緒id,value為每個執行緒佔用的記憶體。

private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
  extends BlockStore(blockManager) {

  // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
  // acquiring or releasing unroll memory, must be synchronized on `memoryManager`!

  private val conf = blockManager.conf
  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

  // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
  // All accesses of this map are assumed to have manually synchronized on `memoryManager`
  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
  // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
  // Pending unroll memory refers to the intermediate memory occupied by a task
  // after the unroll but before the actual putting of the block in the cache.
  // This chunk of memory is expected to be released *as soon as* we finish
  // caching the corresponding block as opposed to until after the task finishes.
  // This is only used if a block is successfully unrolled in its entirety in
  // memory (SPARK-4777).
  private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()

  // Initial memory to request before unrolling any block
  private val unrollMemoryThreshold: Long =
    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
  /** Total amount of memory available for storage, in bytes. */
  private def maxMemory: Long = memoryManager.maxStorageMemory

MemoryStore繼承自BlockStore。實現了getBytes,putBytes,putArray,putIterator,getValues等方法。

資料儲存putBytes

如何Block的儲存級別為能序列化,則先進行序列化再呼叫putIterator,否則呼叫TryPut.

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    // Work on a duplicate - since the original input might be used elsewhere.
    val bytes = _bytes.duplicate()
    bytes.rewind()
    if (level.deserialized) {
      val values = blockManager.dataDeserialize(blockId, bytes)
      putIterator(blockId, values, level, returnValues = true)
    } else {
      val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
    }
  }

Iterator寫入方法putIterator

呼叫unrollSafely測試看看能不能去佔用Block塊大小的記憶體,如果返回的資料型別為Left(array Values)說明記憶體能裝下,呼叫putArray寫入記憶體。

返回的為Right(array Values)說明記憶體不足將寫入硬碟或者拋棄。

/**
   * Attempt to put the given block in memory store.
   *
   * There may not be enough space to fully unroll the iterator in memory, in which case we
   * optionally drop the values to disk if
   *   (1) the block's storage level specifies useDisk, and
   *   (2) `allowPersistToDisk` is true.
   *
   * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
   * back from disk and attempts to cache it in memory. In this case, we should not persist the
   * block back on disk again, as it is already in disk store.
   */
  private[storage] def putIterator(
      blockId: BlockId,
      values: Iterator[Any],
      level: StorageLevel,
      returnValues: Boolean,
      allowPersistToDisk: Boolean): PutResult = {
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
    unrolledValues match {
      case Left(arrayValues) =>
        // Values are fully unrolled in memory, so store them as an array
        val res = putArray(blockId, arrayValues, level, returnValues)
        droppedBlocks ++= res.droppedBlocks
        PutResult(res.size, res.data, droppedBlocks)
      case Right(iteratorValues) =>
        // Not enough space to unroll this block; drop to disk if applicable
        if (level.useDisk && allowPersistToDisk) {
          logWarning(s"Persisting block $blockId to disk instead.")
          val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
          PutResult(res.size, res.data, droppedBlocks)
        } else {
          PutResult(0, Left(iteratorValues), droppedBlocks)
        }
    }
  }

記憶體寫入PutArray

  override def putArray(
      blockId: BlockId,
      values: Array[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = {
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    if (level.deserialized) {
      //估算物件大小
      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
      //嘗試去寫入記憶體
      tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
      PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
    } else {
      val bytes = blockManager.dataSerialize(blockId, values.iterator)
      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
    }
  }

嘗試寫入記憶體方法tryToPut

  private def tryToPut(
      blockId: BlockId,
      value: () => Any,
      size: Long,
      deserialized: Boolean,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {

    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
     * been released, it must be ensured that those to-be-dropped blocks are not double counted
     * for freeing up more space for another block that needs to be put. Only then the actually
     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
    //多執行緒,必須要鎖住
    memoryManager.synchronized {
      // Note: if we have previously unrolled this block successfully, then pending unroll
      // memory should be non-zero. This is the amount that we already reserved during the
      // unrolling process. In this case, we can just reuse this space to cache our block.
      // The synchronization on `memoryManager` here guarantees that the release and acquire
      // happen atomically. This relies on the assumption that all memory acquisitions are
      // synchronized on the same lock.
      releasePendingUnrollMemoryForThisTask()
     //在測試一下,看現在記憶體還能不能放下該Block,因為多執行緒緣故,可能剛才滿足現在不滿足條件
      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
      if (enoughMemory) {
        // We acquired enough memory for the block, so go ahead and put it
        val entry = new MemoryEntry(value(), size, deserialized)
        entries.synchronized {
          //能放下就寫入記憶體
          entries.put(blockId, entry)
        }
        val valuesOrBytes = if (deserialized) "values" else "bytes"
        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
      } else {
        // Tell the block manager that we couldn't put it in memory so that it can drop it to
        // disk if the block allows disk storage.
        lazy val data = if (deserialized) {
          Left(value().asInstanceOf[Array[Any]])
        } else {
          Right(value().asInstanceOf[ByteBuffer].duplicate())
        }
        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
      }
      enoughMemory
    }
  }