1. 程式人生 > >Spark記憶體管理(4)—— UnifiedMemoryManager分析

Spark記憶體管理(4)—— UnifiedMemoryManager分析

acquireExecutionMemory方法

關注UnifiedMemoryManager中的accquireExecutionMemory方法: 這裡寫圖片描述

當前的任務嘗試從executor中獲取numBytes這麼大的記憶體 該方法直接向ExecutionMemoryPool索要所需記憶體,索要記憶體有以下幾個關注點:

  • 當ExecutionMemory 記憶體充足,則不會觸發向Storage申請記憶體
  • 每個Task能夠被使用的記憶體是被限制的
  • 索要記憶體的大小

我們通過原始碼來進行分析: UnifiedMemoryManager.scala中: 這裡寫圖片描述 我們點進去後會發現,會呼叫ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中: 這裡寫圖片描述 我們可以發現每Task能夠被使用的記憶體被限制在: poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中: 這裡寫圖片描述 其中maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize) maxMemory = storage + execution的最大記憶體 poolSize = 當前這個pool的大小 maxPoolSize = execution pool的最大記憶體

UnifiedMemoryManager.scala中: 這裡寫圖片描述 從上述程式碼中我們可以知道索要記憶體的大小: val memoryReclaimableFromStorage=math.max (storageMemoryPool.memoryFree, storageMemoryPool.poolSize -storageRegionSize) 取決於StorageMemoryPool的剩餘記憶體和 storageMemoryPool 從ExecutionMemory借來的記憶體哪個大,取最大的那個,作為可以重新歸還的最大記憶體 用公式表達出來就是這一個樣子: ExecutionMemory 能借到的最大記憶體 = StorageMemory 借的記憶體 + StorageMemory 空閒記憶體 注意:

如果實際需要的小於能夠借到的最大值,則以實際需要值為準

能回收的記憶體大小為: val spaceToReclaim =storageMemoryPool.freeSpaceToShrinkPool ( math.min(extraMemoryNeeded,memoryReclaimableFromStorage))

ExecutionMemoryPool.acquireMemory()解析

    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      maybeGrowPool(numBytes - memoryFree)

      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))

      val toGrant = math.min(maxToGrant, memoryFree)

      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }

整體流程解析: 程式一直處理該task的請求,直到系統判定無法滿足該請求或者已經為該請求分配到足夠的記憶體為止;如果當前execution記憶體池剩餘記憶體不足以滿足此次請求時,會向storage部分請求釋放出被借走的記憶體以滿足此次請求

根據此刻execution記憶體池的總大小maxPoolSize,以及從memoryForTask中統計出的處於active狀態的task的個數計算出: 每個task能夠得到的最大記憶體數 maxMemoryPerTask = maxPoolSize / numActiveTasks 每個task能夠得到的最少記憶體數 minMemoryPerTask = poolSize /(2 * numActiveTasks)

根據申請記憶體的task當前使用的execution記憶體大小決定分配給該task多少記憶體,總的記憶體不能超過maxMemoryPerTask;但是如果execution記憶體池能夠分配的最大記憶體小於numBytes,並且如果把能夠分配的記憶體分配給當前task,但是該task最終得到的execution記憶體還是小於minMemoryPerTask時,該task進入等待狀態,等其他task申請記憶體時再將其喚醒,喚醒之後如果此時滿足,就會返回能夠分配的記憶體數,並且更新memoryForTask,將該task使用的記憶體調整為分配後的值 一個Task最少需要minMemoryPerTask才能開始執行

acquireStorageMemory方法

流程和acquireExecutionMemory類似,當storage的記憶體不足時,同樣會向execution借記憶體,但區別是當且僅當ExecutionMemory有空閒記憶體時,StorageMemory 才能借走該記憶體

UnifiedMemoryManager.scala中: 這裡寫圖片描述 從上述程式碼中我們可以知道能借到的記憶體數為: val memoryBorrowedFromExecution = Math.min (onHeapExecutionMemoryPool.memoryFree,numBytes) 所以StorageMemory從ExecutionMemory借走的記憶體,完全取決於當時ExecutionMemory是不是有空閒記憶體;借到記憶體後,storageMemoryPool增加借到的這部分記憶體,之後同上一樣,會呼叫StorageMemoryPool的acquireMemory()方法

StorageMemoryPool.acquireMemory

這裡寫圖片描述 整體流程解析: 在申請記憶體時,如果numBytes大於此刻storage記憶體池的剩餘記憶體,即if (numBytesToFree > 0),那麼需要storage記憶體池釋放一部分記憶體以滿足申請需求 注意:這裡的numBytesToFree可以理解為numBytes大小減去Storage記憶體池剩餘大小,大於0,即所需要申請的numBytes大於Storage記憶體池剩餘的記憶體 釋放記憶體後如果memoryFree >= numBytes,就會把這部分記憶體分配給申請記憶體的task,並且更新storage記憶體池的使用情況 同時StorageMemoryPool與ExecutionMemoryPool不同的是,他不會像前者那樣分不到資源就進行等待,acquireStorageMemory只會返回一個true或是false,告知記憶體分配是否成功