1. 程式人生 > >Spark調優之Cloudera部落格(Part 2)

Spark調優之Cloudera部落格(Part 2)

概述

Cloudera關於Spark調優方面的第二篇部落格How-to: Tune Your Apache Spark Jobs (Part 2),主要關注resource tuning(資源使用)parallelism(並行度)data representation(資料格式)這三方面。

Tuning Resource Allocation

如何合理使用現有的資源,儘可能的最大化利用,是這小節的重點。

叢集資源管理器主要有YARN、 Mesos、Spark Standalone,下面的介紹基於YARN,關於YARN的介紹及為什麼選擇YARN,參考Apache Spark Resource Management and YARN App Models

,此外,不同的叢集資源管理器,指定資源的引數有差別。

引數設定

這裡的Resource是指core和memory,通過spark-submit引數、spark-defaults.conf配置檔案或環境變數設定,否則使用預設值,引數的優先順序也是上述順序,主要引數如下

spark-submit引數 spark-defaults.conf 環境變數 預設值 含義
- -driver-memory spark.driver.memory SPARK_DRIVER_MEMORY 1G Memory for driver
- -driver-cores spark.driver.cores - 1 Number of cores used by the driver, only in cluster mode
- -executor-memory spark.executor.memory SPARK_EXECUTOR_MEMORY 1G Memory per executor
- -executor-cores spark.executor.cores SPARK_EXECUTOR_CORES 1 Number of cores per executor
- -num-executors spark.executor.instances - 2 Number of executors to launch

Spark已實現動態調整Executor數量,即dynamic allocation,參考Dynamic Resource Allocation

向YARN申請資源

YARN中兩個引數控制core和memory資源,如下

yarn.nodemanager.resource.memory-mb container可以使用的最大記憶體
yarn.nodemanager.resource.cpu-vcores container可以使用的最多core

core的申請簡單,memory的申請複雜一些,複雜的原因主要有兩方面,如下

  • spark.executor.memory/- -executor-memory引數設定了Executor(JVM)的記憶體,但是JVM執行時會用到堆外記憶體,使用spark.yarn.executor.memoryOverhead引數設定,預設大小為max(0.1 * executorMemory, 384)。YARN管理下的Executor記憶體模型如下
  • YARN記憶體最小限制和規整
yarn.scheduler.minimum-allocation-mb 單個任務可申請的最少實體記憶體量,預設是1024(MB),如果一個任務申請的實體記憶體量少於該值,則該對應的值改為這個數
yarn.scheduler.increment-allocation-mb 記憶體規整化單位,預設是1024(MB),這意味著,如果一個Container請求資源是1.5GB,則將被排程器規整化為ceiling(1.5 GB / 1GB) * 1G=2GB

小結

上面介紹了資源的配置及申請,接下來關注涉及到效能的幾個方面

  • ApplicationMaster,YARN啟動的一個程序,用於申請及管理containers,從整體考慮叢集資源時,ApplicationMaster也要考慮在內。yarn-client模式下預設1G、1core,yarn-cluster 模式下,ApplicationMaster執行driver,可以通過- -driver-memory、- -driver-cores引數調整memory和core資源。
  • 單個Executor記憶體設定過大,可能導致嚴重的gc延遲,甚至hang住,64G是一個經驗性的上限值。
  • HDFS client的併發性較差,涉及到HDFS讀取寫入的操作,每個Executor上的task數不易超過5,即- -executor-cores小於等於5。
  • tiny executor(如1core、1G)失去了一個JVM中執行多個任務的優勢,同時,廣播變數也需要被copy更多份。

舉個例子

假設有6個節點執行NodeManager,配置為16core、64G 記憶體,每個節點預留給OS 1core、1G記憶體,剩餘15core、63G,則我們首先想到的配置如下

   --num-executors 6 --executor-cores 15 --executor-memory 63G

但根據小結中的總結,下面的配置可能更好

   --num-executors 17 --executor-cores 5 --executor-memory 19G

Tuning Parallelism

Partition數量是調優Spark程式重要的引數,我的部落格Spark RDD之Partition介紹了相關內容。

task數等於DAG中最後一個RDD的Partition數,最後一個RDD的Partition數等於其父RDD的Partition數,以下三個transformation例外,coalesce(repartition)、union cartesian

coalesce(repartition) 增大或減小Partition數
union 子RDD Partition的數量等於父RDD Partition的和
cartesian 子RDD Partition的數量等於父RDD Partition的積

對於建立RDD的transformation的Partition數,需要根據資料來源具體分析,可以參考Spark RDD之Partition

task數過少,叢集資源(core)利用不充分,也可能導致部分節點記憶體壓力過大,對於task數,或者說Partition數的調整,主要從以下兩方面著手

  • 使用coalesce(repartition),或者能夠觸發Shuffle的transformation中以引數的形式指定,如reduceByKey(_ + _,X)指定Partition數為X。
  • 從資料來源考慮,如hdfs的block數,以及通過資料來源建立RDD時指定,如sc.parallelize(List(“a c”), 2)。

其中,上面提到的reduceByKey(_ + _,X)這種方式,X的值如何確定?

  • 最簡單直接的方式,得到其父RDD的Partition數,每次1.5倍遞增,直至效能沒有提升。
  • 更科學的計算方式,如下
    這裡寫圖片描述

總之,過多的Partition數總是比過少要好(too many partitions is usually better than too few partitions)。

Slimming Down Your Data Structures

Spark記憶體中處理的資料通常是Java物件,持久化到硬碟的是序列化資料,Tungsten專案的出現,使得Spark記憶體中也能夠操作序列化資料,極大的減少了記憶體使用。

spark.serializer引數配置序列化器,預設是JavaSerializerKryoSerializer是更好的選擇,效率高。

Java物件通常要佔用比其儲存的實際資料大很多的空間,這導致Spark能夠cache的資料變少(MEMORY storage level),Spark tuning guide提供了一些措施對其進行瘦身。

臃腫的序列化物件會增加網路磁碟IO,也會導致Spark能夠cache的資料變少(MEMORY_SERstorage level),解決辦法是,使用SparkConf#registerKryoClasses API註冊自定義class。

Data Formats

外部儲存,如HDFS,考慮使用Avro、Parquet、 Thrift、 Protobuf等格式,而不是text,Parquet + LZO或Parquet + snappy是不錯的組合,具體根據自身環境和資料的測試情況決定。

JSON格式是不推薦的,需要消耗CPU一遍又一遍的解析。