1. 程式人生 > >Spark 統一記憶體管理模型詳解

Spark 統一記憶體管理模型詳解

  • 其實 Spark UI 上面顯示的 Storage Memory 可用記憶體等於堆內記憶體和堆外記憶體之和,計算公式如下:

    堆內

    systemMemory = 17179869184 位元組

    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800

    usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384

    totalOnHeapStorageMemory = usableMemory * spark.memory.fraction

    = 16865296384 * 0.6 = 10119177830

    堆外

    totalOffHeapStorageMemory = spark.memory.offHeap.size = 10737418240

    StorageMemory = totalOnHeapStorageMemory + totalOffHeapStorageMemory

    = (10119177830 + 10737418240) 位元組

    = (20856596070 / (1000 * 1000 * 1000)) GB

    = 20.9 GB

    注意,上面說的借用對方的記憶體需要借用方和被借用方的記憶體型別都一樣,都是堆內記憶體或者都是堆外記憶體,不存在堆內記憶體不夠去借用堆外記憶體的空間。

    Task 之間記憶體分佈

    為了更好地使用使用記憶體,Executor 內執行的 Task 之間共享著 Execution 記憶體。具體的,Spark 內部維護了一個 HashMap 用於記錄每個 Task 佔用的記憶體。當 Task 需要在 Execution 記憶體區域申請 numBytes 記憶體,其先判斷 HashMap 裡面是否維護著這個 Task 的記憶體使用情況,如果沒有,則將這個 Task 記憶體使用置為0,並且以 TaskId 為 key,記憶體使用為 value 加入到 HashMap 裡面。之後為這個 Task 申請 numBytes 記憶體,如果 Execution 記憶體區域正好有大於 numBytes 的空閒記憶體,則在 HashMap 裡面將當前 Task 使用的記憶體加上 numBytes,然後返回;如果當前 Execution 記憶體區域無法申請到每個 Task 最小可申請的記憶體,則當前 Task 被阻塞,直到有其他任務釋放了足夠的執行記憶體,該任務才可以被喚醒。每個 Task 可以使用 Execution 記憶體大小範圍為 1/2N ~ 1/N,其中 N 為當前 Executor 內正在執行的 Task 個數。一個 Task 能夠執行必須申請到最小記憶體為 (1/2N * Execution 記憶體);當 N = 1 的時候,Task 可以使用全部的 Execution 記憶體。

    比如如果 Execution 記憶體大小為 10GB,當前 Executor 內正在執行的 Task 個數為5,則該 Task 可以申請的記憶體範圍為 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的範圍。

    一個示例

    為了更好的理解上面堆內記憶體和堆外記憶體的使用情況,這裡給出一個簡單的例子。

    只用了堆內記憶體

    現在我們提交的 Spark 作業關於記憶體的配置如下:

    --executor-memory 18g

    由於沒有設定 spark.memory.fraction 和 spark.memory.storageFraction 引數,我們可以看到 Spark UI 關於 Storage Memory 的顯示如下:

    上圖很清楚地看到 Storage Memory 的可用記憶體是 10.1GB,這個數是咋來的呢?根據前面的規則,我們可以得出以下的計算:

    systemMemory = spark.executor.memory

    reservedMemory = 300MB

    usableMemory = systemMemory - reservedMemory

    StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction

    如果我們把資料代進去,得出以下的結果:

    systemMemory = 18Gb = 19327352832 位元組

    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800

    usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032

    StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction

    = 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB

    不對啊,和上面的 10.1GB 對不上啊。為什麼呢?這是因為 Spark UI 上面顯示的 Storage Memory 可用記憶體其實等於 Execution 記憶體和 Storage 記憶體之和,也就是 usableMemory * spark.memory.fraction

    StorageMemory= usableMemory * spark.memory.fraction

    = 19012780032 * 0.6 = 11407668019.2 = 10.62421GB

    還是不對,這是因為我們雖然設定了 --executor-memory 18g,但是 Spark 的 Executor 端通過 Runtime.getRuntime.maxMemory 拿到的記憶體其實沒這麼大,只有 17179869184 位元組,所以 systemMemory = 17179869184,然後計算的資料如下:

    systemMemory = 17179869184 位元組

    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800

    usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384

    StorageMemory= usableMemory * spark.memory.fraction

    = 16865296384 * 0.6 = 9.42421875 GB

    我們通過將上面的 16865296384 * 0.6 位元組除於 1024 * 1024 * 1024 轉換成 9.42421875 GB,和 UI 上顯示的還是對不上,這是因為 Spark UI 是通過除於 1000 * 1000 * 1000 將位元組轉換成 GB,如下:

    systemMemory = 17179869184 位元組

    reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800

    usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384

    StorageMemory= usableMemory * spark.memory.fraction

    = 16865296384 * 0.6 位元組 =  16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB

    現在終於對上了。

    具體將位元組轉換成 GB 的計算邏輯如下(core 模組下面的 /core/src/main/resources/org/apache/spark/ui/static/utils.js):

    function formatBytes(bytes, type) {

    if (type !== 'display') return bytes;

    if (bytes == 0) return '0.0 B';

    var k = 1000;

    var dm = 1;

    var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];

    var i = Math.floor(Math.log(bytes) / Math.log(k));

    return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];

    }

    我們設定了 --executor-memory 18g,但是 Spark 的 Executor 端通過 Runtime.getRuntime.maxMemory 拿到的記憶體其實沒這麼大,只有 17179869184 位元組,這個資料是怎麼計算的?
    Runtime.getRuntime.maxMemory 是程式能夠使用的最大記憶體,其值會比實際配置的執行器記憶體的值小。這是因為記憶體分配池的堆部分分為 Eden,Survivor 和 Tenured 三部分空間,而這裡面一共包含了兩個 Survivor 區域,而這兩個 Survivor 區域在任何時候我們只能用到其中一個,所以我們可以使用下面的公式進行描述:

    ExecutorMemory = Eden + 2 * Survivor + Tenured

    Runtime.getRuntime.maxMemory =  Eden + Survivor + Tenured

    上面的 17179869184 位元組可能因為你的 GC 配置不一樣得到的資料不一樣,但是上面的計算公式是一樣的。

    用了堆內和堆外記憶體

    現在如果我們啟用了堆外記憶體,情況咋樣呢?我們的記憶體相關配置如下:

    spark.executor.memory           18g

    spark.memory.offHeap.enabled    true

    spark.memory.offHeap.size       10737418240

    從上面可以看出,堆外記憶體為 10GB,現在 Spark UI 上面顯示的 Storage Memory 可用記憶體為 20.9GB,如下: