1. 程式人生 > >spark on yarn模式下內存資源管理(筆記2)

spark on yarn模式下內存資源管理(筆記2)

warn 計算 nta 堆內存 註意 layout led -o exc

1.spark 2.2內存占用計算公式

https://blog.csdn.net/lingbo229/article/details/80914283

2.spark on yarn內存分配**

本文轉自:http://blog.javachen.com/2015/06/09/memory-in-spark-on-yarn.html?utm_source=tuicool

此文解決了Spark yarn-cluster模式運行時,內存不足的問題。

Spark yarn-cluster模式運行時,註意yarn.app.mapreduce.am.resource.mb的設置。默認為1G

本文主要了解Spark On YARN部署模式下的內存分配情況,因為沒有深入研究Spark的源代碼,所以只能根據日誌去看相關的源代碼,從而了解“為什麽會這樣,為什麽會那樣”。

說明

按照Spark應用程序中的driver分布方式不同,Spark on YARN有兩種模式: yarn-client模式、yarn-cluster模式。

當在YARN上運行Spark作業,每個Spark executor作為一個YARN容器運行。Spark可以使得多個Tasks在同一個容器裏面運行。

下圖是yarn-cluster模式的作業執行圖,圖片來源於網絡:

技術分享圖片

關於Spark On YARN相關的配置參數,請參考Spark配置參數。本文主要討論內存分配情況,所以只需要關註以下幾個內心相關的參數:

  • spark.driver.memory:默認值512m
  • spark.executor.memory
    :默認值512m
  • spark.yarn.am.memory:默認值512m
  • spark.yarn.executor.memoryOverhead:值為executorMemory * 0.07, with minimum of 384
  • spark.yarn.driver.memoryOverhead:值為driverMemory * 0.07, with minimum of 384
  • spark.yarn.am.memoryOverhead:值為AM memory * 0.07, with minimum of 384

註意:

  • --executor-memory/spark.executor.memory
    控制 executor 的堆的大小,但是 JVM 本身也會占用一定的堆空間,比如內部的 String 或者直接 byte buffer,spark.yarn.XXX.memoryOverhead屬性決定向 YARN 請求的每個 executor 或dirver或am 的額外堆內存大小,默認值為 max(384, 0.07 * spark.executor.memory)
  • 在 executor 執行的時候配置過大的 memory 經常會導致過長的GC延時,64G是推薦的一個 executor 內存大小的上限。
  • HDFS client 在大量並發線程時存在性能問題。大概的估計是每個 executor 中最多5個並行的 task 就可以占滿寫入帶寬。

另外,因為任務是提交到YARN上運行的,所以YARN中有幾個關鍵參數,參考YARN的內存和CPU配置:

  • yarn.app.mapreduce.am.resource.mb:AM能夠申請的最大內存,默認值為1536MB
  • yarn.nodemanager.resource.memory-mb:nodemanager能夠申請的最大內存,默認值為8192MB
  • yarn.scheduler.minimum-allocation-mb:調度時一個container能夠申請的最小資源,默認值為1024MB
  • yarn.scheduler.maximum-allocation-mb:調度時一個container能夠申請的最大資源,默認值為8192MB

測試

Spark集群測試環境為:

  • master:64G內存,16核cpu
  • worker:128G內存,32核cpu
  • worker:128G內存,32核cpu
  • worker:128G內存,32核cpu
  • worker:128G內存,32核cpu

註意:YARN集群部署在Spark集群之上的,每一個worker節點上同時部署了一個NodeManager,並且YARN集群中的配置如下:

<property>
      <name>yarn.nodemanager.resource.memory-mb</name>
      <value>106496</value> <!-- 104G -->
  </property>
  <property>
      <name>yarn.scheduler.minimum-allocation-mb</name>
      <value>2048</value>
  </property>
  <property>
      <name>yarn.scheduler.maximum-allocation-mb</name>
      <value>106496</value>
  </property>
  <property>
      <name>yarn.app.mapreduce.am.resource.mb</name>
      <value>2048</value>
  </property>

將spark的日誌基本調為DEBUG,並將log4j.logger.org.apache.hadoop設置為WARN建設不必要的輸出,修改/etc/spark/conf/log4j.properties:

# Set everything to be logged to the console
log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

接下來是運行測試程序,以官方自帶的SparkPi例子為例,下面主要測試client模式,至於cluster模式請參考下面的過程。運行下面命令:

spark-submit --class org.apache.spark.examples.SparkPi     --master yarn-client      --num-executors 4     --driver-memory 2g     --executor-memory 3g     --executor-cores 4     /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar     100000

觀察輸出日誌(無關的日誌被略去):

15/06/08 13:57:01 INFO SparkContext: Running Spark version 1.3.0
15/06/08 13:57:02 INFO SecurityManager: Changing view acls to: root
15/06/08 13:57:02 INFO SecurityManager: Changing modify acls to: root

15/06/08 13:57:03 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB

15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi
15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: [actor] handled message (24.52531 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#864850679]
15/06/08 13:57:05 INFO Client: Requesting a new application from cluster with 4 NodeManagers
15/06/08 13:57:05 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (106496 MB per container)
15/06/08 13:57:05 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
15/06/08 13:57:05 INFO Client: Setting up container launch context for our AM

15/06/08 13:57:07 DEBUG Client: ===============================================================================
15/06/08 13:57:07 DEBUG Client: Yarn AM launch context:
15/06/08 13:57:07 DEBUG Client:     user class: N/A
15/06/08 13:57:07 DEBUG Client:     env:
15/06/08 13:57:07 DEBUG Client:         CLASSPATH -> <CPS>/__spark__.jar<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>:/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client:         SPARK_DIST_CLASSPATH -> :/usr/lib/spark/lib/spark-assembly.jar::/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/*
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES_FILE_SIZES -> 97237208
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_STAGING_DIR -> .sparkStaging/application_1433742899916_0001
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES_VISIBILITIES -> PRIVATE
15/06/08 13:57:07 DEBUG Client:         SPARK_USER -> root
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_MODE -> true
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES_TIME_STAMPS -> 1433743027399
15/06/08 13:57:07 DEBUG Client:         SPARK_YARN_CACHE_FILES -> hdfs://mycluster:8020/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar#__spark__.jar
15/06/08 13:57:07 DEBUG Client:     resources:
15/06/08 13:57:07 DEBUG Client:         __spark__.jar -> resource { scheme: "hdfs" host: "mycluster" port: 8020 file: "/user/root/.sparkStaging/application_1433742899916_0001/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar" } size: 97237208 timestamp: 1433743027399 type: FILE visibility: PRIVATE
15/06/08 13:57:07 DEBUG Client:     command:
15/06/08 13:57:07 DEBUG Client:         /bin/java -server -Xmx512m -Djava.io.tmpdir=/tmp ‘-Dspark.eventLog.enabled=true‘ ‘-Dspark.executor.instances=4‘ ‘-Dspark.executor.memory=3g‘ ‘-Dspark.executor.cores=4‘ ‘-Dspark.driver.port=51568‘ ‘-Dspark.serializer=org.apache.spark.serializer.KryoSerializer‘ ‘-Dspark.driver.appUIAddress=http://bj03-bi-pro-hdpnamenn:4040‘ ‘-Dspark.executor.id=<driver>‘ ‘-Dspark.kryo.classesToRegister=scala.collection.mutable.BitSet,scala.Tuple2,scala.Tuple1,org.apache.spark.mllib.recommendation.Rating‘ ‘-Dspark.driver.maxResultSize=8g‘ ‘-Dspark.jars=file:/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar‘ ‘-Dspark.driver.memory=2g‘ ‘-Dspark.eventLog.dir=hdfs://mycluster:8020/user/spark/applicationHistory‘ ‘-Dspark.app.name=Spark Pi‘ ‘-Dspark.fileserver.uri=http://X.X.X.X:49172‘ ‘-Dspark.tachyonStore.folderName=spark-81ae0186-8325-40f2-867b-65ee7c922357‘ -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ExecutorLauncher --arg ‘bj03-bi-pro-hdpnamenn:51568‘ --executor-memory 3072m --executor-cores 4 --num-executors  4 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
15/06/08 13:57:07 DEBUG Client: ===============================================================================

Will allocate AM container, with 896 MB memory including 384 MB overhead日誌可以看到,AM占用了896 MB內存,除掉384 MB的overhead內存,實際上只有512 MB,即spark.yarn.am.memory的默認值,另外可以看到YARN集群有4個NodeManager,每個container最多有106496 MB內存。

Yarn AM launch context啟動了一個Java進程,設置的JVM內存為512m,見/bin/java -server -Xmx512m

這裏為什麽會取默認值呢?查看打印上面這行日誌的代碼,見org.apache.spark.deploy.yarn.Client:

  private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    logInfo("Verifying our application has not requested more than the maximum " +
      s"memory capability of the cluster ($maxMem MB per container)")
    val executorMem = args.executorMemory + executorMemoryOverhead
    if (executorMem > maxMem) {
      throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
        s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
    }
    val amMem = args.amMemory + amMemoryOverhead
    if (amMem > maxMem) {
      throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
    }
    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
      amMem,
      amMemoryOverhead))
  }

args.amMemory來自ClientArguments類,這個類中會校驗輸出參數:

  private def validateArgs(): Unit = {
    if (numExecutors <= 0) {
      throw new IllegalArgumentException(
        "You must specify at least 1 executor!\n" + getUsageMessage())
    }
    if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) {
      throw new SparkException("Executor cores must not be less than " +
        "spark.task.cpus.")
    }
    if (isClusterMode) {
      for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
        if (sparkConf.contains(key)) {
          println(s"$key is set but does not apply in cluster mode.")
        }
      }
      amMemory = driverMemory
      amCores = driverCores
    } else {
      for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
        if (sparkConf.contains(key)) {
          println(s"$key is set but does not apply in client mode.")
        }
      }
      sparkConf.getOption(amMemKey)
        .map(Utils.memoryStringToMb)
        .foreach { mem => amMemory = mem }
      sparkConf.getOption(amCoresKey)
        .map(_.toInt)
        .foreach { cores => amCores = cores }
    }
  }

從上面代碼可以看到當 isClusterMode 為true時,則args.amMemory值為driverMemory的值;否則,則從spark.yarn.am.memory中取,如果沒有設置該屬性,則取默認值512m。isClusterMode 為true的條件是 userClass 不為空,def isClusterMode: Boolean = userClass != null,即輸出參數需要有--class參數,而從下面日誌可以看到ClientArguments的輸出參數中並沒有該參數。

15/06/08 13:57:04 DEBUG YarnClientSchedulerBackend: ClientArguments called with: --arg bj03-bi-pro-hdpnamenn:51568 --num-executors 4 --num-executors 4 --executor-memory 3g --executor-memory 3g --executor-cores 4 --executor-cores 4 --name Spark Pi

故,要想設置AM申請的內存值,要麽使用cluster模式,要麽在client模式中,是有--conf手動設置spark.yarn.am.memory屬性,例如:

spark-submit --class org.apache.spark.examples.SparkPi     --master yarn-client      --num-executors 4     --driver-memory 2g     --executor-memory 3g     --executor-cores 4     --conf spark.yarn.am.memory=1024m     /usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar     100000

打開YARN管理界面,可以看到:

a. Spark Pi 應用啟動了5個Container,使用了18G內存、5個CPU core

技術分享圖片

b. YARN為AM啟動了一個Container,占用內存為2048M

技術分享圖片

c. YARN啟動了4個Container運行任務,每一個Container占用內存為4096M

技術分享圖片

為什麽會是2G +4G *4=18G呢?第一個Container只申請了2G內存,是因為我們的程序只為AM申請了512m內存,而yarn.scheduler.minimum-allocation-mb參數決定了最少要申請2G內存。至於其余的Container,我們設置了executor-memory內存為3G,為什麽每一個Container占用內存為4096M呢?

為了找出規律,多測試幾組數據,分別測試並收集executor-memory為3G、4G、5G、6G時每個executor對應的Container內存申請情況:

  • executor-memory=3g:2G+4G * 4=18G
  • executor-memory=4g:2G+6G * 4=26G
  • executor-memory=5g:2G+6G * 4=26G
  • executor-memory=6g:2G+8G * 4=34G

關於這個問題,我是查看源代碼,根據org.apache.spark.deploy.yarn.ApplicationMaster -> YarnRMClient -> YarnAllocator的類查找路徑找到YarnAllocator中有這樣一段代碼:

  // Executor memory in MB.
  protected val executorMemory = args.executorMemory
  // Additional memory overhead.
  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
  // Number of cores per executor.
  protected val executorCores = args.executorCores
  // Resource capability requested for each executors
  private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)

因為沒有具體的去看YARN的源代碼,所以這裏猜測Container的大小是根據executorMemory + memoryOverhead計算出來的,大概的規則是每一個Container的大小必須為yarn.scheduler.minimum-allocation-mb值的整數倍,當executor-memory=3g時,executorMemory + memoryOverhead為3G+384M=3456M,需要申請的Container大小為yarn.scheduler.minimum-allocation-mb * 2 =4096m=4G,其他依此類推。

註意:

  • Yarn always rounds up memory requirement to multiples of yarn.scheduler.minimum-allocation-mb, which by default is 1024 or 1GB.
  • Spark adds an overhead to SPARK_EXECUTOR_MEMORY/SPARK_DRIVER_MEMORY before asking Yarn for the amount.

另外,需要註意memoryOverhead的計算方法,當executorMemory的值很大時,memoryOverhead的值相應會變大,這個時候就不是384m了,相應的Container申請的內存值也變大了,例如:當executorMemory設置為90G時,memoryOverhead值為math.max(0.07 * 90G, 384m)=6.3G,其對應的Container申請的內存為98G。

回頭看看給AM對應的Container分配2G內存原因,512+384=896,小於2G,故分配2G,你可以在設置spark.yarn.am.memory的值之後再來觀察。

打開Spark的管理界面 http://ip:4040 ,可以看到driver和Executor中內存的占用情況:

技術分享圖片

從上圖可以看到Executor占用了1566.7 MB內存,這是怎樣計算出來的?參考Spark on Yarn: Where Have All the Memory Gone?這篇文章,totalExecutorMemory的計算方式為:

//yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
  val MEMORY_OVERHEAD_FACTOR = 0.07
  val MEMORY_OVERHEAD_MIN = 384

//yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
......
      val totalExecutorMemory = executorMemory + memoryOverhead
      numPendingAllocate.addAndGet(missing)
      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
        s"memory including $memoryOverhead MB overhead")

這裏我們給executor-memory設置的3G內存,memoryOverhead的值為math.max(0.07 * 3072, 384)=384,其最大可用內存通過下面代碼來計算:

//core/src/main/scala/org/apache/spark/storage/BlockManager.scala
/** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
  val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
  val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
  (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

即,對於executor-memory設置3G時,executor內存占用大約為 3072m * 0.6 * 0.9 = 1658.88m,註意:實際上是應該乘以Runtime.getRuntime.maxMemory的值,該值小於3072m。

上圖中driver占用了1060.3 MB,此時driver-memory的值是位2G,故driver中存儲內存占用為:2048m * 0.6 * 0.9 =1105.92m,註意:實際上是應該乘以Runtime.getRuntime.maxMemory的值,該值小於2048m。

這時候,查看worker節點CoarseGrainedExecutorBackend進程啟動腳本:

$ jps
46841 Worker
21894 CoarseGrainedExecutorBackend
9345
21816 ExecutorLauncher
43369
24300 NodeManager
38012 JournalNode
36929 QuorumPeerMain
22909 Jps

$ ps -ef|grep 21894
nobody   21894 21892 99 17:28 ?        00:04:49 /usr/java/jdk1.7.0_71/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms3072m -Xmx3072m  -Djava.io.tmpdir=/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/tmp -Dspark.driver.port=60235 -Dspark.yarn.app.container.log.dir=/data/yarn/logs/application_1433742899916_0069/container_1433742899916_0069_01_000003 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@bj03-bi-pro-hdpnamenn:60235/user/CoarseGrainedScheduler --executor-id 2 --hostname X.X.X.X --cores 4 --app-id application_1433742899916_0069 --user-class-path file:/data/yarn/local/usercache/root/appcache/application_1433742899916_0069/container_1433742899916_0069_01_000003/__app__.jar

可以看到每個CoarseGrainedExecutorBackend進程分配的內存為3072m,如果我們想查看每個executor的jvm運行情況,可以開啟jmx。在/etc/spark/conf/spark-defaults.conf中添加下面一行代碼:

spark.executor.extraJavaOptions -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false

然後,通過jconsole監控jvm堆內存運行情況,這樣方便調試內存大小。

總結

由上可知,在client模式下,AM對應的Container內存由spark.yarn.am.memory加上spark.yarn.am.memoryOverhead來確定,executor加上spark.yarn.executor.memoryOverhead的值之後確定對應Container需要申請的內存大小,driver和executor的內存加上spark.yarn.driver.memoryOverheadspark.yarn.executor.memoryOverhead的值之後再乘以0.54確定storage memory內存大小。在YARN中,Container申請的內存大小必須為yarn.scheduler.minimum-allocation-mb的整數倍。

下面這張圖展示了Spark on YARN 內存結構,圖片來自How-to: Tune Your Apache Spark Jobs (Part 2):

技術分享圖片

至於cluster模式下的分析,請參考上面的過程。希望這篇文章對你有所幫助

spark on yarn模式下內存資源管理(筆記2)