1. 程式人生 > >【轉載】Apache Spark Jobs 性能調優(二)

【轉載】Apache Spark Jobs 性能調優(二)

放棄 instance bar 並行處理 defaults 執行 .exe nag 原則

調試資源分配

Spark 的用戶郵件郵件列表中經常會出現 “我有一個500個節點的集群,為什麽但是我的應用一次只有兩個 task 在執行”,鑒於 Spark 控制資源使用的參數的數量,這些問題不應該出現。但是在本章中,你將學會壓榨出你集群的每一分資源。推薦的配置將根據不同的集群管理系統(YARN、Mesos、Spark Standalone)而有所不同,我們將主要集中在YARN 上,因為這個Cloudera 推薦的方式。


Spark(以及YARN) 需要關心的兩項主要的資源是 CPU 和 內存, 磁盤 和 IO 當然也影響著 Spark 的性能,但是不管是 Spark 還是 Yarn 目前都沒法對他們做實時有效的管理。

在一個 Spark 應用中,每個 Spark executor 擁有固定個數的 core 以及固定大小的堆大小。core 的個數可以在執行 spark-submit 或者 pyspark 或者 spark-shell 時,通過參數 --executor-cores 指定,或者在 spark-defaults.conf 配置文件或者 SparkConf 對象中設置 spark.executor.cores 參數。同樣地,堆的大小可以通過 --executor-memory 參數或者 spark.executor.memory 配置項。core 配置項控制一個 executor 中task的並發數。 --executor-cores 5 意味著每個 executor 中最多同時可以有5個 task 運行。memory 參數影響 Spark 可以緩存的數據的大小,也就是在 group aggregate 以及 join 操作時 shuffle 的數據結構的最大值。

--num-executors 命令行參數或者spark.executor.instances 配置項控制需要的 executor 個數。從 CDH 5.4/Spark 1.3 開始,你可以避免使用這個參數,只要你通過設置 spark.dynamicAllocation.enabled 參數打開 動態分配 。動態分配可以使的 Spark 的應用在有後續積壓的在等待的 task 時請求 executor,並且在空閑時釋放這些 executor。

同時 Spark 需求的資源如何跟 YARN 中可用的資源配合也是需要著重考慮的,YARN 相關的參數有:

  • yarn.nodemanager.resource.memory-mb 控制在每個節點上 container 能夠使用的最大內存;
  • yarn.nodemanager.resource.cpu-vcores 控制在每個節點上 container 能夠使用的最大core個數;

請求5個 core 會生成向YARN 要5個虛擬core的請求。從YARN 請求內存相對比較復雜因為以下的一些原因:

--executor-memory/spark.executor.memory 控制 executor 的堆的大小,但是 JVM 本身也會占用一定的堆空間,比如內部的 String 或者直接 byte buffer,executor memory 的 spark.yarn.executor.memoryOverhead 屬性決定向YARN 請求的每個 executor 的內存大小,默認值為max(384, 0.7 * spark.executor.memory);
YARN 可能會比請求的內存高一點,YARN 的 yarn.scheduler.minimum-allocation-mb 和 yarn.scheduler.increment-allocation-mb 屬性控制請求的最小值和增加量。
下面展示的是 Spark on YARN 內存結構:

技術分享


如果這些還不夠決定Spark executor 個數,還有一些概念還需要考慮的:

  • 應用的master,是一個非 executor 的容器,它擁有特殊的從YARN 請求資源的能力,它自己本身所占的資源也需要被計算在內。在 yarn-client 模式下,它默認請求 1024MB 和 1個core。在 yarn-cluster 模式中,應用的 master 運行 driver,所以使用參數 --driver-memory 和 --driver-cores 配置它的資源常常很有用。
  • 在 executor 執行的時候配置過大的 memory 經常會導致過長的GC延時,64G是推薦的一個 executor 內存大小的上限。
  • 我們註意到 HDFS client 在大量並發線程是時性能問題。大概的估計是每個 executor 中最多5個並行的 task 就可以占滿的寫入帶寬。
  • 在運行微型 executor 時(比如只有一個core而且只有夠執行一個task的內存)扔掉在一個JVM上同時運行多個task的好處。比如 broadcast 變量需要為每個 executor 復制一遍,這麽多小executor會導致更多的數據拷貝。

為了讓以上的這些更加具體一點,這裏有一個實際使用過的配置的例子,可以完全用滿整個集群的資源。假設一個集群有6個節點有NodeManager在上面運行,每個節點有16個core以及64GB的內存。那麽 NodeManager的容量:yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores 可以設為 63 * 1024 = 64512 (MB) 和 15。我們避免使用 100% 的 YARN container 資源因為還要為 OS 和 hadoop 的 Daemon 留一部分資源。在上面的場景中,我們預留了1個core和1G的內存給這些進程。Cloudera Manager 會自動計算並且配置。

所以看起來我們最先想到的配置會是這樣的:--num-executors 6 --executor-cores 15 --executor-memory 63G。但是這個配置可能無法達到我們的需求,因為:
- 63GB+ 的 executor memory 塞不進只有 63GB 容量的 NodeManager;
- 應用的 master 也需要占用一個core,意味著在某個節點上,沒有15個core給 executor 使用;
- 15個core會影響 HDFS IO的吞吐量。
配置成 --num-executors 17 --executor-cores 5 --executor-memory 19G 可能會效果更好,因為:
- 這個配置會在每個節點上生成3個 executor,除了應用的master運行的機器,這臺機器上只會運行2個 executor

- --executor-memory 被分成3份(63G/每個節點3個executor)=21。 21 * (1 - 0.07) ~ 19。

調試並發

我們知道 Spark 是一套數據並行處理的引擎。但是 Spark 並不是神奇得能夠將所有計算並行化,它沒辦法從所有的並行化方案中找出最優的那個。每個 Spark stage 中包含若幹個 task,每個 task 串行地處理數據。在調試 Spark 的job時,task 的個數可能是決定程序性能的最重要的參數。

那麽這個數字是由什麽決定的呢?在之前的博文中介紹了 Spark 如何將 RDD 轉換成一組 stage。task 的個數與 stage 中上一個 RDD 的 partition 個數相同。而一個 RDD 的 partition 個數與被它依賴的 RDD 的 partition 個數相同,除了以下的情況:coalesce transformation 可以創建一個具有更少 partition 個數的 RDD,union transformation 產出的 RDD 的 partition 個數是它父 RDD 的 partition 個數之和,cartesian 返回的 RDD 的 partition 個數是它們的積。

如果一個 RDD 沒有父 RDD 呢? 由 textFile 或者 hadoopFile 生成的 RDD 的 partition 個數由它們底層使用的 MapReduce InputFormat 決定的。一般情況下,每讀到的一個 HDFS block 會生成一個 partition。通過parallelize 接口生成的 RDD 的 partition 個數由用戶指定,如果用戶沒有指定則由參數 spark.default.parallelism 決定。

要想知道 partition 的個數,可以通過接口 rdd.partitions().size() 獲得。

這裏最需要關心的問題在於 task 的個數太小。如果運行時 task 的個數比實際可用的 slot 還少,那麽程序解沒法使用到所有的 CPU 資源。

過少的 task 個數可能會導致在一些聚集操作時, 每個 task 的內存壓力會很大。任何 joincogroup*ByKey 操作都會在內存生成一個 hash-map或者 buffer 用於分組或者排序。joincogroupgroupByKey 會在 shuffle 時在 fetching 端使用這些數據結構,reduceByKeyaggregateByKey 會在 shuffle 時在兩端都會使用這些數據結構。

當需要進行這個聚集操作的 record 不能完全輕易塞進內存中時,一些問題會暴露出來。首先,在內存 hold 大量這些數據結構的 record 會增加 GC的壓力,可能會導致流程停頓下來。其次,如果數據不能完全載入內存,Spark 會將這些數據寫到磁盤,這會引起磁盤 IO和排序。在 Cloudera 的用戶中,這可能是導致 Spark Job 慢的首要原因。

那麽如何增加你的 partition 的個數呢?如果你的問題 stage 是從 Hadoop 讀取數據,你可以做以下的選項:
- 使用 repartition 選項,會引發 shuffle;
- 配置 InputFormat 用戶將文件分得更小;
- 寫入 HDFS 文件時使用更小的block。

如果問題 stage 從其他 stage 中獲得輸入,引發 stage 邊界的操作會接受一個 numPartitions 的參數,比如

<span style="font-family:Microsoft YaHei;"><span style="font-family:Microsoft YaHei;font-size:14px;">val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)</span></span> 

  


X 應該取什麽值?最直接的方法就是做實驗。不停的將 partition 的個數從上次實驗的 partition 個數乘以1.5,直到性能不再提升為止。

同時也有一些原則用於計算 X,但是也不是非常的有效是因為有些參數是很難計算的。這裏寫到不是因為它們很實用,而是可以幫助理解。這裏主要的目標是啟動足夠的 task 可以使得每個 task 接受的數據能夠都塞進它所分配到的內存中。

每個 task 可用的內存通過這個公式計算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 默認值分別 0.2 和 0.8.

在內存中所有 shuffle 數據的大小很難確定。最可行的是找出一個 stage 運行的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之間的比例。在用所有shuffle 寫乘以這個比例。但是如果這個 stage 是 reduce 時,可能會有點復雜:

技術分享

在往上增加一點因為大多數情況下 partition 的個數會比較多。

試試在,在有所疑慮的時候,使用更多的 task 數(也就是 partition 數)都會效果更好,這與 MapRecuce 中建議 task 數目選擇盡量保守的建議相反。這個因為 MapReduce 在啟動 task 時相比需要更大的代價。

壓縮你的數據結構

Spark 的數據流由一組 record 構成。一個 record 有兩種表達形式:一種是反序列化的 Java 對象另外一種是序列化的二進制形式。通常情況下,Spark 對內存中的 record 使用反序列化之後的形式,對要存到磁盤上或者需要通過網絡傳輸的 record 使用序列化之後的形式。也有計劃在內存中存儲序列化之後的 record。

spark.serializer 控制這兩種形式之間的轉換的方式。Kryo serializer,org.apache.spark.serializer.KryoSerializer 是推薦的選擇。但不幸的是它不是默認的配置,因為 KryoSerializer 在早期的 Spark 版本中不穩定,而 Spark 不想打破版本的兼容性,所以沒有把 KryoSerializer 作為默認配置,但是 KryoSerializer 應該在任何情況下都是第一的選擇。

你的 record 在這兩種形式切換的頻率對於 Spark 應用的運行效率具有很大的影響。去檢查一下到處傳遞數據的類型,看看能否擠出一點水分是非常值得一試的。

過多的反序列化之後的 record 可能會導致數據到處到磁盤上更加頻繁,也使得能夠 Cache 在內存中的 record 個數減少。點擊這裏查看如何壓縮這些數據。

過多的序列化之後的 record 導致更多的 磁盤和網絡 IO,同樣的也會使得能夠 Cache 在內存中的 record 個數減少,這裏主要的解決方案是把所有的用戶自定義的 class 都通過 SparkConf#registerKryoClasses 的API定義和傳遞的。

數據格式

任何時候你都可以決定你的數據如何保持在磁盤上,使用可擴展的二進制格式比如:Avro,Parquet,Thrift或者Protobuf,從中選擇一種。當人們在談論在Hadoop上使用Avro,Thrift或者Protobuf時,都是認為每個 record 保持成一個 Avro/Thrift/Protobuf 結構保存成 sequence file。而不是JSON。

每次當時試圖使用JSON存儲大量數據時,還是先放棄吧...

【轉載自:http://blog.csdn.net/u012102306/article/details/51700664】

【轉載】Apache Spark Jobs 性能調優(二)