1. 程式人生 > >Spark中的記憶體管理(一)

Spark中的記憶體管理(一)

一個Spark應用執行的過程如下所示:
_2018_12_23_8_17_04

  • Driver
    使用者的主程式提交到Driver中執行,在Driver中建立SparkContext,SparkContext初始化DAGScheduler和TaskScheduler,作為coordinator負責從AppMaster申請資源,並將作業的Task排程到Executor上面執行。

在yarn-cluster模式下,AppMaster中包含了Driver,在YARN中啟動,spark-submit客戶端kill掉不影響程式的執行;
在yarn-client模式下,Driver在spark-submit的客戶端啟動(不在YARN中),跟AppMaster是分離的,spark-submit客戶端kill掉會導致Spark程式掛掉(如spark-sql/spark-shell等都是以yarn-client的方式提交)

Executor上面執行的每個MapTask結束後都會有MapStatus彙報給Driver, 當MapTask數量非常多的時候可能會導致Driver出現OOM,此時需要調整Driver的記憶體大小,通過--conf spark.driver.memory=4G或者--driver-memory 4G來進行設定。

  • Executor
    實際執行Task的節點,Executor的個數由--conf spark.executor.instances=4或者--num-executors 4來設定;每個Executor裡面併發跑的Task個數由--conf spark.executor.cores=2
    或者--executor-cores指定。

Executor的記憶體由--conf spark.executor.memory=4G或者--executor-memory 4G設定。

Spark記憶體管理

上面介紹了Spark中兩個角色(Driver/Executor),其中Executor是實際執行Task的節點,Spark記憶體管理主要在Executor上面。

Executor記憶體使用結構

_2018_12_23_9_54_06

如上圖所示, Spark on YARN模式下一個Executor的記憶體使用情況:

整個Executor是YARN的一個container,所以它的總記憶體受yarn.scheduler.maximum-allocation-mb

的引數控制;

當用戶提交作業的時候通過spark.executor.memory引數設定了executor的堆記憶體(heapsize),這部分記憶體的使用情況如上圖所示:

  • 系統預留(固定300MB)
    詳見SPARK-12081
  • spark.memory.fraction
    該引數控制executor內使用者計算(execution)和儲存(storage)總佔用多少記憶體,即(M-R)*spark.memory.fraction 大小的記憶體; 剩餘的(M-R)*(1-spark.memory.fraction)用於Spark內部的metadata以及使用者資料結構等使用

對於spark.executor.memroyOverhead,它是executor可額外使用的堆外(off-heap)記憶體,比如spark的shuffle過程使用的netty就會使用到堆外記憶體,如果程式有遇到相關的oom錯誤,可以嘗試調大該引數。該記憶體不屬於上面spark.executor.memory(on-heap),但是它們的總和不能超過yarn.scheduler.maximum-allocation-mb.

execution/storage記憶體管理

上圖中execution/storage的記憶體((M-R)*spark.memroy.fraction)是Task在executor中執行需要用到的記憶體,它們通過UnifiedMemoryManager這個統一記憶體管理器來管理。

UnifiedMemoryManager中的execution和storage的管理沒有硬性的邊界控制(比如execution固定佔比多少),它們之間是一個軟邊界,初始的邊界由spark.memory.storageFraction來設定(預設0.5),但這個並不是一個固定的邊界:
a) 當execution不夠的時候,可以從storage側借記憶體,如storage基本沒使用(如沒有cache資料等),execution可以從storage借記憶體甚至全部都借完,即使後續有storage需要用記憶體也不能強制從execution拿回,除非execution後續自己釋放了部分記憶體,storage才能拿來使用;

b) 當storage不夠的時候,如果execution有空閒多餘的記憶體,則也可以借,但是後續如果execution又需要更多記憶體了則可以強制從storage拿回記憶體(如可以將storge的資料寫到磁碟,然後釋放對應的記憶體),直到storage使用的記憶體減少到spark.memory.storageFraction的比例。

Task記憶體管理

一個Executor可以同時併發執行多個Task(通過spark.executor.cores控制),而每個Task在執行的過程中都需要從Executor申請記憶體來使用,那Executor如何將記憶體分配給併發執行的多個Task呢? 這塊留到下一篇文章來介紹。

_