1. 程式人生 > >Spark效能調優

Spark效能調優

通常我們對一個系統進行效能優化無怪乎兩個步驟——效能監控和引數調整,本文主要分享的也是這兩方面內容。

效能監控工具

【Spark監控工具】

Spark提供了一些基本的Web監控頁面,對於日常監控十分有用。

1. Application Web UI

http://master:4040(預設埠是4040,可以通過spark.ui.port修改)可獲得這些資訊:(1)stages和tasks排程情況;(2)RDD大小及記憶體使用;(3)系統環境資訊;(4)正在執行的executor資訊。

2. history server

當Spark應用退出後,仍可以獲得歷史Spark應用的stages和tasks執行資訊,便於分析程式不明原因掛掉的情況。配置方法如下:

(1)$SPARK_HOME/conf/spark-env.sh

export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50

Dspark.history.fs.logDirectory=hdfs://hadoop000:8020/directory"

說明:spark.history.retainedApplica-tions僅顯示最近50個應用spark.history.fs.logDirectory:Spark History Server頁面只展示該路徑下的資訊。

(2)$SPARK_HOME/conf/spark-defaults.conf

spark.eventLog.enabled true

spark.eventLog.dir hdfs://hadoop000:8020/directory #應用在執行過程中所有的資訊均記錄在該屬性指定的路徑下

3. spark.eventLog.compress true

(1)HistoryServer啟動

$SPARK_HOMR/bin/start-histrory-server.sh

(2)HistoryServer停止

$SPARK_HOMR/bin/stop-histrory-server.sh

4. ganglia

通過配置ganglia,可以分析叢集的使用狀況和資源瓶頸,但是預設情況下ganglia是未被打包的,需要在mvn編譯時新增-Pspark-ganglia-lgpl,並修改配置檔案$SPARK_HOME/conf/metrics.properties。

5. Executor logs

Standalone模式:$SPARK_HOME/logs

YARN模式:在yarn-site.xml檔案中配置了YARN日誌的存放位置:yarn.nodemanager.log-dirs,或使用命令獲取yarn logs -applicationId。

【其他監控工具】

1. Nmon(http://www.ibm.com/developerworks/aix/library/au-analyze_aix/)

Nmon 輸入:c:CPU n:網路 m:記憶體 d:磁碟

圖1  視覺化的系統資訊

2. Jmeter(http://jmeter. apache.org/)

通常使用Jmeter做系統性能引數的實時展示,JMeter的安裝非常簡單,從官方網站上下載,解壓之後即可使用。執行命令在%JMETER_HOME%/bin下,對於 Windows 使用者,直接使用jmeter.bat。

啟動jmeter:建立測試計劃,設定執行緒組設定迴圈次數。

新增監聽器:[email protected] - PerfMon Metrics Collector。

圖2  jmeter初始化引數配置

設定監聽器:監聽主機埠及監聽內容,例如CPU。

圖3  監聽器設定

啟動監聽:可以實時獲得節點的CPU狀態資訊,從圖4可看出CPU已出現瓶頸。

圖4  CPU狀態顯示

3. Jprofiler(http://www.ej-technologies.com/products/jprofiler/overview.html)

JProfiler是一個全功能的Java剖析工具(profiler),專用於分析J2SE和J2EE應用程式。它把CPU、執行緒和記憶體的剖析組合在一個強大的應用中。JProfiler的GUI可以更方便地找到效能瓶頸、抓住記憶體洩漏(memory leaks),並解決多執行緒的問題。例如分析哪個物件佔用的記憶體比較多;哪個方法佔用較大的CPU資源等;我們通常使用Jprofiler來監控Spark應用在local模式下執行時的效能瓶頸和記憶體洩漏情況。

圖5  Jprofiler儀表盤

上述幾個工具可以直接通過提供的連結瞭解詳細的使用方法。

Spark調優

【Spark叢集並行度】

在Spark叢集環境下,只有足夠高的並行度才能使系統資源得到充分的利用,可以通過修改spark-env.sh來調整Executor的數量和使用資源,Standalone和YARN方式資源的排程管理是不同的。

在Standalone模式下:

1. 每個節點使用的最大記憶體數:SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY;

2. 每個節點的最大併發task數:SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES。

在YARN模式下:

1. 叢集task並行度:SPARK_ EXECUTOR_INSTANCES* SPARK_EXECUTOR_CORES;

2. 叢集記憶體總量:(executor個數) * (SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead)+(SPARK_DRIVER_MEMORY+spark.yarn.driver.memoryOverhead)。

重點強調:Spark對Executor和Driver額外新增堆記憶體大小,Executor端:由spark.yarn.executor.memoryOverhead設定,預設值executorMemory * 0.07與384的最大值。Driver端:由spark.yarn.driver.memoryOverhead設定,預設值driverMemory * 0.07與384的最大值。

通過調整上述引數,可以提高叢集並行度,讓系統同時執行的任務更多,那麼對於相同的任務,並行度高了,可以減少輪詢次數。舉例說明:如果一個stage有100task,並行度為50,那麼執行完這次任務,需要輪詢兩次才能完成,如果並行度為100,那麼一次就可以了。

但是在資源相同的情況,並行度高了,相應的Executor記憶體就會減少,所以需要根據實際實況協調記憶體和core。此外,Spark能夠非常有效的支援短時間任務(例如:200ms),因為會對所有的任務複用JVM,這樣能減小任務啟動的消耗,Standalone模式下,core可以允許1-2倍於物理core的數量進行超配。

【Spark任務數量調整】

Spark的任務數由stage中的起始的所有RDD的partition之和數量決定,所以需要了解每個RDD的partition的計算方法。以Spark應用從HDFS讀取資料為例,HadoopRDD的partition切分方法完全繼承於MapReduce中的FileInputFormat,具體的partition數量由HDFS的塊大小、mapred.min.split.size的大小、檔案的壓縮方式等多個因素決定,詳情需要參見FileInputFormat的程式碼。

【Spark記憶體調優】

記憶體優化有三個方面的考慮:物件所佔用的記憶體,訪問物件的消耗以及垃圾回收所佔用的開銷。

1. 物件所佔記憶體,優化資料結構

Spark 預設使用Java序列化物件,雖然Java物件的訪問速度更快,但其佔用的空間通常比其內部的屬性資料大2-5倍。為了減少記憶體的使用,減少Java序列化後的額外開銷,下面列舉一些Spark官網(http://spark.apache.org/docs/latest/tuning.html#tuning-data-structures)提供的方法。

(1)使用物件陣列以及原始型別(primitive type)陣列以替代Java或者Scala集合類(collection class)。fastutil 庫為原始資料型別提供了非常方便的集合類,且相容Java標準類庫。

(2)儘可能地避免採用含有指標的巢狀資料結構來儲存小物件。

(3)考慮採用數字ID或者列舉型別以便替代String型別的主鍵。

(4)如果記憶體少於32GB,設定JVM引數-XX:+UseCom­pressedOops以便將8位元組指標修改成4位元組。與此同時,在Java 7或者更高版本,設定JVM引數-XX:+UseC­­­­­ompressedStrings以便採用8位元來編碼每一個ASCII字元。

2. 記憶體回收

(1)獲取記憶體統計資訊:優化記憶體前需要了解叢集的記憶體回收頻率、記憶體回收耗費時間等資訊,可以在spark-env.sh中設定SPARK_JAVA_OPTS=“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps $ SPARK_JAVA_OPTS”來獲取每一次記憶體回收的資訊。

(2)優化快取大小:預設情況Spark採用執行記憶體(spark.executor.memory)的60%來進行RDD快取。這表明在任務執行期間,有40%的記憶體可以用來進行物件建立。如果任務執行速度變慢且JVM頻繁進行記憶體回收,或者記憶體空間不足,那麼降低快取大小設定可以減少記憶體消耗,可以降低spark.storage.memoryFraction的大小。

3. 頻繁GC或者OOM

針對這種情況,首先要確定現象是發生在Driver端還是在Executor端,然後在分別處理。

Driver端:通常由於計算過大的結果集被回收到Driver端導致,需要調大Driver端的記憶體解決,或者進一步減少結果集的數量。

Executor端:

(1)以外部資料作為輸入的Stage:這類Stage中出現GC通常是因為在Map側進行map-side-combine時,由於group過多引起的。解決方法可以增加partition的數量(即task的數量)來減少每個task要處理的資料,來減少GC的可能性。

(2)以shuffle作為輸入的Stage:這類Stage中出現GC的通常原因也是和shuffle有關,常見原因是某一個或多個group的資料過多,也就是所謂的資料傾斜,最簡單的辦法就是增加shuffle的task數量,比如在SparkSQL中設定SET spark.sql.shuffle.partitions=400,如果調大shuffle的task無法解決問題,說明你的資料傾斜很嚴重,某一個group的資料遠遠大於其他的group,需要你在業務邏輯上進行調整,預先針對較大的group做單獨處理。

【修改序列化】

使用Kryo序列化,因為Kryo序列化結果比Java標準序列化更小,更快速。具體方法:spark-default.conf 裡設定spark.serializer為org.apache.spark.serializer.KryoSerializer 。

參考官方文件(http://spark.apache.org/docs/latest/tuning.html#summary):對於大多數程式而言,採用Kryo框架以及序列化能夠解決效能相關的大部分問題。

【Spark 磁碟調優】

在叢集環境下,如果資料分佈不均勻,造成節點間任務分佈不均勻,也會導致節點間源資料不必要的網路傳輸,從而大大影響系統性能,那麼對於磁碟調優最好先將資料資源分佈均勻。除此之外,還可以對源資料做一定的處理:

1. 在記憶體允許範圍內,將頻繁訪問的檔案或資料置於記憶體中;

2. 如果磁碟充裕,可以適當增加源資料在HDFS上的備份數以減少網路傳輸;

3. Spark支援多種檔案格式及壓縮方式,根據不同的應用環境進行合理的選擇。如果每次計算只需要其中的某幾列,可以使用列式檔案格式,以減少磁碟I/O,常用的列式有parquet、rcfile。如果檔案過大,將原檔案壓縮可以減少磁碟I/O,例如:gzip、snappy、lzo。

【其他】

廣播變數(broadcast)

當task中需要訪問一個Driver端較大的資料時,可以通過使用SparkContext的廣播變數來減小每一個任務的大小以及在叢集中啟動作業的消耗。參考官方文件http://spark.apache.org/docs/latest/tuning.html#broadcasting-large-variables。

開啟推測機制

推測機制後,如果叢集中,某一臺機器的幾個task特別慢,推測機制會將任務分配到其他機器執行,最後Spark會選取最快的作為最終結果。

在spark-default.conf 中新增:spark.speculation true

推測機制與以下幾個引數有關:

1. spark.speculation.interval 100:檢測週期,單位毫秒;

2. spark.speculation.quantile 0.75:完成task的百分比時啟動推測;

3. spark.speculation.multiplier 1.5:比其他的慢多少倍時啟動推測。

總結

Spark系統的效能調優是一個很複雜的過程,需要對Spark以及Hadoop有足夠的知識儲備。從業務應用平臺(Spark)、儲存(HDFS)、作業系統、硬體等多個層面都會對效能產生很大的影響。藉助於多種效能監控工具,我們可以很好地瞭解系統的效能表現,並根據上面介紹的經驗進行調整。

作者簡介:田毅,亞信科技大資料平臺部門研發經理,Spark Contributor,北京Spark Meetup發起人,主要關注SparkSQL與Spark Streaming。

 <IMG style="BORDER-BOTTOM: medium none; BORDER-LEFT: medium none; VERTICAL-ALIGN: middle; BORDER-TOP: medium none; BORDER-RIGHT: medium none" src="http://ipad-cms.csdn.net/cms/attachment/201503A/55093eea24117.jpg" div="" <="">

本文選自程式設計師電子版2015年3月A刊,該期更多文章請檢視這裡。2000年創刊至今所有文章目錄請檢視程式設計師封面秀。歡迎訂閱程式設計師電子版(含iPad版、Android版、PDF版)。

相關推薦

Spark效能調之原理分析

spark效能調優之前先明白原理,具體如下: 使用spark-submit提交一個Spark作業之後,這個作業就會啟動一個對應的Driver程序。根據使用的部署模式(deploy-mode)不同,Driver程序可能在本地啟動,也可能在叢集中某個工作節點上啟動。Driver程序本身會根

Spark效能調---fastutil優化資料格式

Spark中應用fastutil的場景: 1、如果運算元函式使用了外部變數;那麼第一,你可以使用Broadcast廣播變數優化;第二,可以使用Kryo序列化類庫,提升序列化效能和效率;第三,如果外部變數是某種比較大的集合,那麼可以考慮使用fastutil改寫外部變數,首先從源頭上就減少記憶體的佔

spark效能調---Kryo序列化

1.為啥要用Kryo序列化 Spark運算元操作的時候如果用到外部資料的話,都會對外部資料進行序列化,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,物件輸入輸出流機制,來進行序列化這種預設序列化機制的好處在於,處理起來比較

spark效能調---廣播變數的使用

Broadcast Variables Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it w

Spark效能調 troubleshooting shuffle調 reduce端緩衝大小以避免OOM

reduce導致 記憶體溢位原因 reduce端拉取map端task,是map端寫一點資料,reduce端taskk就會拉取一小部分資料,立即進行後面的聚合、運算元函式應用 每次拉取資料量大小是有buffer決定的,而預設大小是48M,有時候,map端的資料量很大的情況下,reduce端

Spark 效能調 Rdd 之 reduceByKey 本地聚合(也就是map端聚合運算元)

簡單程式碼 val lines = sc.textFile("hdfs://") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val counts = pairs.reduceByKey(_

spark效能調——開發調

開發調優 目錄 開發調優 概述 原則一:避免建立重複的RDD 一個簡單的例子 原則二:儘可能複用同一個RDD 一個簡單的例子 原則三:對多次使用的RDD進行持久化 對多次使用的RDD進行持久化的程式碼示例 Spark的持久化級別 如何選擇一種最合適的持久化

spark效能調:資源優化

在開發完Spark作業之後,就該為作業配置合適的資源了。Spark的資源引數,基本都可以在spark-submit命令中作為引數設定。很多Spark初學者,通常不知道該設定哪些必要的引數,以及如何設定這些引數,最後就只能胡亂設定,甚至壓根兒不設定。資源引數設定的不合理,可能

Spark效能調之廣播變數

廣播變數概述及其優勢廣播變數(groadcast varible)為只讀變數,它有執行SparkContext的driver程式建立後傳送給參與計算的節點。對那些需要讓工作節點高效地訪問相同資料的應用場景,比如機器學習。我們可以在SparkContext上呼叫broadcas

spark效能調:開發調

在大資料計算領域,Spark已經成為了越來越流行、越來越受歡迎的計算平臺之一。Spark的功能涵蓋了大資料領域的離線批處理、SQL類處理、流式/實時計算、機器學習、圖計算等各種不同型別的計算操作,應用範圍與前景非常廣泛。   然而,通過Spark開發出高效能的大資料計算作業

Spark——效能調——Shuffle

一、序引     當以分散式方式處理資料時,常常需要執行map與reduce轉換。由於巨量資料必須從一個節點傳輸到另外的節點,給叢集中的cpu、磁碟、記憶體造成沉重的負載壓力,同時也會給網路頻寬帶來壓力。所以,reduce階段進行的shuffle過程,往往是效

Spark效能調 Shuffle(二)

1.shuffle原理 什麼樣的情況下,會發生shuffle? 在spark中,主要是以下幾個運算元:groupByKey、reduceByKey、countByKey、join,等等。 什麼是shuffle? groupByKey,要把分佈在叢集各個節點上的資料中的同一個key,對

spark效能調(四)調節堆外記憶體和等待時長

調節堆外記憶體!!! executor堆外記憶體 spark底層shuffle使用netty傳輸,所以使用了堆外記憶體!1.2之前是NIO就是socket,之後預設使用netty 有時候,如果你的spark作業處理的資料量特別特別大,幾億資料量;然後spark作業一執行,時

spark效能調(三)shuffle的map端記憶體緩衝reduce端記憶體佔比

效能優化 shuffle spark.shuffle.file.buffer,預設32k spark.shuffle.memoryFraction,0.2 map端記憶體緩衝,reduce端記憶體佔比;很多資料、網上視訊,都會說,這兩個引數, 是調節shuff

spark效能調 —— 為什麼慢的總是“你”

Spark作業效能調優 —— 為什麼慢的總是“你” 背景 業務高峰期,準實時(mini batch)資料處理作業的執行時間現有一些延遲,為了保證作業的SLA,必須及時對作業執行狀況進行排查。 異常原因排查 作業層面 平臺採用的是spark on yar

spark效能調之重構RDD架構,RDD持久化

當第一次對RDD2執行運算元,獲取RDD3的時候,就會從RDD1開始計算,就是讀取HDFS檔案,然後對RDD1執行運算元,獲取到RDD2,然後再計算,得到RDD3 預設情況下,多次對一個RDD執行運算元,去獲取不同的RDD;都會對這個RDD以及之前的父RDD,全部重新計算

Spark效能調

通常我們對一個系統進行效能優化無怪乎兩個步驟——效能監控和引數調整,本文主要分享的也是這兩方面內容。 效能監控工具 【Spark監控工具】 Spark提供了一些基本的Web監控頁面,對於日常監控十分有用。 1. Application Web UI http://master:4040(預設埠是

Spark效能調之廣播大變數

    本篇blog講述在實際spark專案中可能需要注意的一個性能調優的一個點,就是broadcast大變數。    預設的在spark作業中,task執行的運算元中,使用了外部的變數,每個task都會獲取一份變數的副本,有什麼缺點呢?<br>map,本身是不小

Spark效能調-並行度調

效能調優: 並行度調節 效能調優首先是增加資源,增加Application對應的executor的數量,增加executor裡面的cpu core,然後 增加executor裡面的記憶體大小! 這節課也是非常重要的,因為分配完你所能分配的最大資源了!然後對應你的資源調節你程

Spark效能調之——在實際專案中重構RDD架構以及RDD持久化

一、RDD架構重構與優化是什麼。 儘量去複用RDD,差不多的RDD,可以抽取為一個共同的RDD,供後面的RDD計算時,反覆使用。 二、怎麼做? 快取級別: case "NONE" => NONE case "DISK_ONL