spark on yarn模式下內存資源管理(筆記2)
1.spark 2.2內存占用計算公式
https://blog.csdn.net/lingbo229/article/details/80914283
2.spark on yarn內存分配**
本文轉自:http://blog.javachen.com/2015/06/09/memory-in-spark-on-yarn.html?utm_source=tuicool
此文解決了Spark yarn-cluster模式運行時,內存不足的問題。
Spark yarn-cluster模式運行時,註意yarn.app.mapreduce.am.resource.mb的設置。默認為1G
本文主要了解Spark On YARN部署模式下的內存分配情況,因為沒有深入研究Spark的源代碼,所以只能根據日誌去看相關的源代碼,從而了解“為什麽會這樣,為什麽會那樣”。
說明
按照Spark應用程序中的driver分布方式不同,Spark on YARN有兩種模式: yarn-client
模式、yarn-cluster
模式。
當在YARN上運行Spark作業,每個Spark executor作為一個YARN容器運行。Spark可以使得多個Tasks在同一個容器裏面運行。
下圖是yarn-cluster模式的作業執行圖,圖片來源於網絡:
關於Spark On YARN相關的配置參數,請參考Spark配置參數。本文主要討論內存分配情況,所以只需要關註以下幾個內心相關的參數:
spark.driver.memory
:默認值512mspark.executor.memory
spark.yarn.am.memory
:默認值512mspark.yarn.executor.memoryOverhead
:值為executorMemory * 0.07, with minimum of 384
spark.yarn.driver.memoryOverhead
:值為driverMemory * 0.07, with minimum of 384
spark.yarn.am.memoryOverhead
:值為AM memory * 0.07, with minimum of 384
註意:
--executor-memory/spark.executor.memory
spark.yarn.XXX.memoryOverhead
屬性決定向 YARN 請求的每個 executor 或dirver或am 的額外堆內存大小,默認值為max(384, 0.07 * spark.executor.memory
)- 在 executor 執行的時候配置過大的 memory 經常會導致過長的GC延時,64G是推薦的一個 executor 內存大小的上限。
- HDFS client 在大量並發線程時存在性能問題。大概的估計是每個 executor 中最多5個並行的 task 就可以占滿寫入帶寬。
另外,因為任務是提交到YARN上運行的,所以YARN中有幾個關鍵參數,參考YARN的內存和CPU配置:
yarn.app.mapreduce.am.resource.mb
:AM能夠申請的最大內存,默認值為1536MByarn.nodemanager.resource.memory-mb
:nodemanager能夠申請的最大內存,默認值為8192MByarn.scheduler.minimum-allocation-mb
:調度時一個container能夠申請的最小資源,默認值為1024MByarn.scheduler.maximum-allocation-mb
:調度時一個container能夠申請的最大資源,默認值為8192MB
測試
Spark集群測試環境為:
- master:64G內存,16核cpu
- worker:128G內存,32核cpu
- worker:128G內存,32核cpu
- worker:128G內存,32核cpu
- worker:128G內存,32核cpu
註意:YARN集群部署在Spark集群之上的,每一個worker節點上同時部署了一個NodeManager,並且YARN集群中的配置如下:
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>106496</value> <!-- 104G -->
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>106496</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>2048</value>
</property>
將spark的日誌基本調為DEBUG,並將log4j.logger.org.apache.hadoop設置為WARN建設不必要的輸出,修改/etc/spark/conf/log4j.properties:
# Set everything to be logged to the console
log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
接下來是運行測試程序,以官方自帶的SparkPi例子為例,下面主要測試client模式,至於cluster模式請參考下面的過程
。運行下面命令:
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 4 --driver-memory 2g --executor-memory 3g --executor-cores 4 /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar 100000
觀察輸出日誌(無關的日誌被略去):
15/06/08 13:57:01 INFO SparkContext: Running Spark version 1.3.0
15/06/08 13:57:02 INFO SecurityManager: Changing view acls to: root
15/06/08 13:57:02 INFO SecurityManager: Changing modify acls to: root
15/06/08 13:57:03 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: [actor] handled message (24.52531 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#864850679]
15/06/08 13:57:05 INFO Client: Requesting a new application from cluster with 4 NodeManagers
15/06/08 13:57:05 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (106496 MB per container)
15/06/08 13:57:05 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
15/06/08 13:57:05 INFO Client: Setting up container launch context for our AM
15/06/08 13:57:07 DEBUG Client: ===============================================================================
15/06/08 13:57:07 DEBUG Client: Yarn AM launch context:
15/06/08 13:57:07 DEBUG Client: user class: N/A
15/06/08 13:57:07 DEBUG Client: env:
15/06/08 13:57:07 DEBUG Client: CLASSPATH -> <CPS>/__spark__.jar<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>:/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client: SPARK_DIST_CLASSPATH -> :/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES_FILE_SIZES -> 97237208
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_STAGING_DIR -> .sparkStaging/application_1433742899916_0001
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES_VISIBILITIES -> PRIVATE
15/06/08 13:57:07 DEBUG Client: SPARK_USER -> root
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_MODE -> true
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES_TIME_STAMPS -> 1433743027399
15/06/08 13:57:07 DEBUG Client: SPARK_YARN_CACHE_FILES -> hdfs://mycluster:8020/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar#__spark__.jar
15/06/08 13:57:07 DEBUG Client: resources:
15/06/08 13:57:07 DEBUG Client: __spark__.jar -> resource { scheme: "hdfs" host: "mycluster" port: 8020 file: "/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar" } size: 97237208 timestamp: 1433743027399 type: FILE visibility: PRIVATE
15/06/08 13:57:07 DEBUG Client: command:
15/06/08 13:57:07 DEBUG Client: /bin/java -server -Xmx512m -Djava.io.tmpdir=/tmp ‘-Dspark.eventLog.enabled=true‘ ‘-Dspark.executor.instances=4‘ ‘-Dspark.executor.memory=3g‘ ‘-Dspark.executor.cores=4‘ ‘-Dspark.driver.port=51568‘ ‘-Dspark.serializer=org.apache.spark.serializer.KryoSerializer‘ ‘-Dspark.driver.appUIAddress=http://bj03-bi-pro-hdpnamenn:4040‘ ‘-Dspark.executor.id=<driver>‘ ‘-Dspark.kryo.classesToRegister=scala.collection.mutable.BitSet,scala.Tuple2,scala.Tuple1,org.apache.spark.mllib.recommendation.Rating‘ ‘-Dspark.driver.maxResultSize=8g‘ ‘-Dspark.jars=file:/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar‘ ‘-Dspark.driver.memory=2g‘ ‘-Dspark.eventLog.dir=hdfs://mycluster:8020/user/spark/applicationHistory‘ ‘-Dspark.app.name=Spark Pi‘ ‘-Dspark.fileserver.uri=http://X.X.X.X:49172‘ ‘-Dspark.tachyonStore.folderName=spark-81ae0186-8325-40f2-867b-65ee7c922357‘ -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ExecutorLauncher --arg ‘bj03-bi-pro-hdpnamenn:51568‘ --executor-memory 3072m --executor-cores 4 --num-executors 4 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
15/06/08 13:57:07 DEBUG Client: ===============================================================================
從Will allocate AM container, with 896 MB memory including 384 MB overhead
日誌可以看到,AM占用了896 MB
內存,除掉384 MB
的overhead內存,實際上只有512 MB
,即spark.yarn.am.memory
的默認值,另外可以看到YARN集群有4個NodeManager,每個container最多有106496 MB內存。
Yarn AM launch context啟動了一個Java進程,設置的JVM內存為512m
,見/bin/java -server -Xmx512m
。
這裏為什麽會取默認值呢?查看打印上面這行日誌的代碼,見org.apache.spark.deploy.yarn.Client:
private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {
throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
amMem,
amMemoryOverhead))
}
args.amMemory來自ClientArguments類,這個類中會校驗輸出參數:
private def validateArgs(): Unit = {
if (numExecutors <= 0) {
throw new IllegalArgumentException(
"You must specify at least 1 executor!\n" + getUsageMessage())
}
if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) {
throw new SparkException("Executor cores must not be less than " +
"spark.task.cpus.")
}
if (isClusterMode) {
for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
}
}
amMemory = driverMemory
amCores = driverCores
} else {
for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in client mode.")
}
}
sparkConf.getOption(amMemKey)
.map(Utils.memoryStringToMb)
.foreach { mem => amMemory = mem }
sparkConf.getOption(amCoresKey)
.map(_.toInt)
.foreach { cores => amCores = cores }
}
}
從上面代碼可以看到當 isClusterMode 為true時,則args.amMemory值為driverMemory的值;否則,則從spark.yarn.am.memory
中取,如果沒有設置該屬性,則取默認值512m。isClusterMode 為true的條件是 userClass 不為空,def isClusterMode: Boolean = userClass != null
,即輸出參數需要有--class
參數,而從下面日誌可以看到ClientArguments的輸出參數中並沒有該參數。
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi
故,要想設置AM申請的內存值,要麽使用cluster模式,要麽在client模式中,是有--conf
手動設置spark.yarn.am.memory
屬性,例如:
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 4 --driver-memory 2g --executor-memory 3g --executor-cores 4 --conf spark.yarn.am.memory=1024m /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar 100000
打開YARN管理界面,可以看到:
a. Spark Pi 應用啟動了5個Container,使用了18G內存、5個CPU core
b. YARN為AM啟動了一個Container,占用內存為2048M
c. YARN啟動了4個Container運行任務,每一個Container占用內存為4096M
為什麽會是2G +4G *4=18G
呢?第一個Container只申請了2G內存,是因為我們的程序只為AM申請了512m內存,而yarn.scheduler.minimum-allocation-mb
參數決定了最少要申請2G內存。至於其余的Container,我們設置了executor-memory內存為3G,為什麽每一個Container占用內存為4096M呢?
為了找出規律,多測試幾組數據,分別測試並收集executor-memory為3G、4G、5G、6G時每個executor對應的Container內存申請情況:
- executor-memory=3g:2G+4G * 4=18G
- executor-memory=4g:2G+6G * 4=26G
- executor-memory=5g:2G+6G * 4=26G
- executor-memory=6g:2G+8G * 4=34G
關於這個問題,我是查看源代碼,根據org.apache.spark.deploy.yarn.ApplicationMaster -> YarnRMClient -> YarnAllocator的類查找路徑找到YarnAllocator中有這樣一段代碼:
// Executor memory in MB.
protected val executorMemory = args.executorMemory
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
// Number of cores per executor.
protected val executorCores = args.executorCores
// Resource capability requested for each executors
private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
因為沒有具體的去看YARN的源代碼,所以這裏猜測Container的大小是根據executorMemory + memoryOverhead
計算出來的,大概的規則是每一個Container的大小必須為yarn.scheduler.minimum-allocation-mb
值的整數倍,當executor-memory=3g
時,executorMemory + memoryOverhead
為3G+384M=3456M,需要申請的Container大小為yarn.scheduler.minimum-allocation-mb
* 2 =4096m=4G,其他依此類推。
註意:
- Yarn always rounds up memory requirement to multiples of
yarn.scheduler.minimum-allocation-mb
, which by default is 1024 or 1GB.- Spark adds an
overhead
toSPARK_EXECUTOR_MEMORY/SPARK_DRIVER_MEMORY
before asking Yarn for the amount.
另外,需要註意memoryOverhead的計算方法,當executorMemory的值很大時,memoryOverhead的值相應會變大,這個時候就不是384m了,相應的Container申請的內存值也變大了,例如:當executorMemory設置為90G時,memoryOverhead值為math.max(0.07 * 90G, 384m)=6.3G
,其對應的Container申請的內存為98G。
回頭看看給AM對應的Container分配2G內存原因,512+384=896,小於2G,故分配2G,你可以在設置spark.yarn.am.memory
的值之後再來觀察。
打開Spark的管理界面 http://ip:4040 ,可以看到driver和Executor中內存的占用情況:
從上圖可以看到Executor占用了1566.7 MB內存,這是怎樣計算出來的?參考Spark on Yarn: Where Have All the Memory Gone?這篇文章,totalExecutorMemory的計算方式為:
//yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
val MEMORY_OVERHEAD_FACTOR = 0.07
val MEMORY_OVERHEAD_MIN = 384
//yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
......
val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
s"memory including $memoryOverhead MB overhead")
這裏我們給executor-memory設置的3G內存,memoryOverhead的值為math.max(0.07 * 3072, 384)=384
,其最大可用內存通過下面代碼來計算:
//core/src/main/scala/org/apache/spark/storage/BlockManager.scala
/** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
即,對於executor-memory設置3G時,executor內存占用大約為 3072m * 0.6 * 0.9 = 1658.88m,註意:實際上是應該乘以Runtime.getRuntime.maxMemory
的值,該值小於3072m。
上圖中driver占用了1060.3 MB,此時driver-memory的值是位2G,故driver中存儲內存占用為:2048m * 0.6 * 0.9 =1105.92m,註意:實際上是應該乘以Runtime.getRuntime.maxMemory
的值,該值小於2048m。
這時候,查看worker節點CoarseGrainedExecutorBackend進程啟動腳本:
$ jps
46841 Worker
21894 CoarseGrainedExecutorBackend
9345
21816 ExecutorLauncher
43369
24300 NodeManager
38012 JournalNode
36929 QuorumPeerMain
22909 Jps
$ ps -ef|grep 21894
nobody 21894 21892 99 17:28 ? 00:04:49 /usr/java/jdk1.7.0_71/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms3072m -Xmx3072m -Djava.io.tmpdir=/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/tmp -Dspark.driver.port=60235 -Dspark.yarn.app.container.log.dir=/data/yarn/logs/application_1433742899916_0069/container_1433742899916_0069_01_000003 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@bj03-bi-pro-hdpnamenn:60235/user/CoarseGrainedScheduler --executor-id 2 --hostname X.X.X.X --cores 4 --app-id application_1433742899916_0069 --user-class-path file:/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/__app__.jar
可以看到每個CoarseGrainedExecutorBackend進程分配的內存為3072m,如果我們想查看每個executor的jvm運行情況,可以開啟jmx。在/etc/spark/conf/spark-defaults.conf中添加下面一行代碼:
spark.executor.extraJavaOptions -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
然後,通過jconsole監控jvm堆內存運行情況,這樣方便調試內存大小。
總結
由上可知,在client模式下,AM對應的Container內存由spark.yarn.am.memory
加上spark.yarn.am.memoryOverhead
來確定,executor加上spark.yarn.executor.memoryOverhead
的值之後確定對應Container需要申請的內存大小,driver和executor的內存加上spark.yarn.driver.memoryOverhead
或spark.yarn.executor.memoryOverhead
的值之後再乘以0.54確定storage memory內存大小。在YARN中,Container申請的內存大小必須為yarn.scheduler.minimum-allocation-mb
的整數倍。
下面這張圖展示了Spark on YARN 內存結構,圖片來自How-to: Tune Your Apache Spark Jobs (Part 2):
至於cluster模式下的分析,請參考上面的過程。希望這篇文章對你有所幫助
spark on yarn模式下內存資源管理(筆記2)