1. 程式人生 > >【Spark篇】---Spark調優之代碼調優,數據本地化調優,內存調優,SparkShuffle調優,Executor的堆外內存調優

【Spark篇】---Spark調優之代碼調優,數據本地化調優,內存調優,SparkShuffle調優,Executor的堆外內存調優

左右 任務調度 combiner flight 觸發 年齡 ans minor 序列化機制

一、前述

Spark中調優大致分為以下幾種 ,代碼調優,數據本地化,內存調優,SparkShuffle調優,調節Executor的堆外內存。

二、具體

1、代碼調優

1、避免創建重復的RDD,盡量使用同一個RDD

2、對多次使用的RDD進行持久化

如何選擇一種最合適的持久化策略?

默認情況下,性能最高的當然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的後續算子操作,都是基於純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要復制一份數據副本,並遠程傳送到其他節點上。但是這裏必須要註意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數據比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM內存溢出異常。

如果使用MEMORY_ONLY級別時發生了內存溢出,那麽建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化後再保存在內存中,此時每個partition僅僅是一個字節數組而已,大大減少了對象數量,並降低了內存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是後續算子可以基於純內存進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的數據量過多的話,還是可能會導致OOM內存溢出的異常。

如果純內存的級別都無法使用,那麽建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數據量很大,內存無法完全放下。序列化後的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。

通常不建議使用DISK_ONLY和後綴為_2的級別:因為完全基於磁盤文件進行數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有RDD。後綴為_2的級別,必須將所有數據都復制一份副本,並發送到其他節點上,數據復制以及網絡傳輸會導致較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。

持久化算子:

cache:

MEMORY_ONLY

persist:

MEMORY_ONLY

MEMORY_ONLY_SER

MEMORY_AND_DISK_SER

一般不要選擇帶有_2的持久化級別。

checkpoint:

如果一個RDD的計算時間比較長或者計算起來比較復雜,一般將這個RDD的計算結果保存到HDFS上,這樣數據會更加安全。

如果一個RDD的依賴關系非常長,也會使用checkpoint,會切斷依賴關系,提高容錯的效率。

3、盡量避免使用shuffle類的算子

使用廣播變量來模擬使用join,使用情況:一個RDD比較大,一個RDD比較小。

join算子=廣播變量+filter、廣播變量+map、廣播變量+flatMap

4、使用map-side預聚合的shuffle操作

即盡量使用有combiner的shuffle類算子。

combiner概念:

map端,每一個map task計算完畢後進行的局部聚合。

combiner好處:

1) 降低shuffle write寫磁盤的數據量。

2) 降低shuffle read拉取數據量的大小。

3) 降低reduce端聚合的次數。

combiner的shuffle類算子:

1) reduceByKey:這個算子在map端是有combiner的,在一些場景中可以使用reduceByKey代替groupByKey。

2) aggregateByKey

3) combineByKey

5、盡量使用高性能的算子

使用reduceByKey替代groupByKey

使用mapPartition替代map

使用foreachPartition替代foreach

filter後使用coalesce減少分區數

使用repartitionAndSortWithinPartitions替代repartition與sort類操作

使用repartition和coalesce算子操作分區。

6、使用廣播變量

開發過程中,會遇到需要在算子函數中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那麽此時就應該使用Spark的廣播(Broadcast)功能來提升性能,函數中使用到外部變量時,默認情況 下,Spark會將該變量復制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那麽大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中占用過多內存導致的頻繁GC,都會極大地影響性能。如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播後的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對Executor內存的占用開銷,降低GC的頻率。

廣播大變量發送方式:Executor一開始並沒有廣播變量,而是task運行需要用到廣播變量,會找executor的blockManager要,bloackManager找Driver裏面的blockManagerMaster要。

使用廣播變量可以大大降低集群中變量的副本數。不使用廣播變量,變量的副本數和task數一致。使用廣播變量變量的副本和Executor數一致。

7、使用Kryo優化序列化性能

Spark中,主要有三個地方涉及到了序列化:

1) 在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸。

2) 將自定義的類型作為RDD的泛型類型時(比如JavaRDD<SXT>,SXT是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。

3) 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。

4) Task發送時也需要序列化。

Kryo序列化器介紹:

Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以後,可以讓網絡傳輸的數據變少;在集群中耗費的內存資源大大減少。

對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。

Spark中使用Kryo:

Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{SpeedSortKey.class})

8、優化數據結構

java中有三種類型比較消耗內存:

1) 對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內存空間。

2) 字符串,每個字符串內部都有一個字符數組以及長度等額外信息。

3) 集合類型,比如HashMap、LinkedList等,因為集合類型內部通常會使用一些內部類來封裝集合元素,比如Map.Entry。

因此Spark官方建議,在Spark編碼實現中,特別是對於算子函數中的代碼,盡量不要使用上述三種數據結構,盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數組替代集合類型,這樣盡可能地減少內存占用,從而降低GC頻率,提升性能。

2、數據本地化

1、數據本地化的級別:

1) PROCESS_LOCAL

task要計算的數據在本進程(Executor)的內存中。

技術分享圖片

2) NODE_LOCAL

① task所計算的數據在本節點所在的磁盤上。

② task所計算的數據在本節點其他Executor進程的內存中。

技術分享圖片

3) NO_PREF

task所計算的數據在關系型數據庫中,如mysql。

技術分享圖片

4) RACK_LOCAL

task所計算的數據在同機架的不同節點的磁盤或者Executor進程的內存中

技術分享圖片

5) ANY

跨機架

2、Spark數據本地化調優:

Spark中任務調度時,TaskScheduler在分發之前需要依據數據的位置來分發,最好將task分發到數據所在的節點上,如果TaskScheduler分發的task在默認3s依然無法執行的話,TaskScheduler會重新發送這個task到相同的Executor中去執行,會重試5次,如果依然無法執行,那麽TaskScheduler會降低一級數據本地化的級別再次發送task。

技術分享圖片

如上圖中,會先嘗試1,PROCESS_LOCAL數據本地化級別,如果重試5次每次等待3s,會默認這個Executor計算資源滿了,那麽會降低一級數據本地化級別到2,NODE_LOCAL,如果還是重試5次每次等待3s還是失敗,那麽還是會降低一級數據本地化級別到3,RACK_LOCAL。這樣數據就會有網絡傳輸,降低了執行效率。

1) 如何提高數據本地化的級別?

可以增加每次發送task的等待時間(默認都是3s),將3s倍數調大, 結合WEBUI來調節:

spark.locality.wait

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

註意:等待時間不能調大很大,調整數據本地化的級別不要本末倒置,雖然每一個task的本地化級別是最高了,但整個Application的執行時間反而加長。

2) 如何查看數據本地化的級別?

通過日誌或者WEBUI

3、內存調優

技術分享圖片

JVM堆內存分為一塊較大的Eden和兩塊較小的Survivor,每次只使用Eden和其中一塊Survivor,當回收時將Eden和Survivor中還存活著的對象一次性復制到另外一塊Survivor上,最後清理掉Eden和剛才用過的Survivor。也就是說當task創建出來對象會首先往Eden和survivor1中存放,survivor2是空閑的,當Eden和survivor1區域放滿以後就會觸發minor gc小型垃圾回收,清理掉不再使用的對象。會將存活下來的對象放入survivor2中。

如果存活下來的對象大小大於survivor2的大小,那麽JVM就會將多余的對象直接放入到老年代中。

如果這個時候年輕代的內存不是很大的話,就會經常的進行minor gc,頻繁的minor gc會導致短時間內有些存活的對象(多次垃圾回收都沒有回收掉,一直在用的又不能被釋放,這種對象每經過一次minor gc都存活下來)頻繁的倒來倒去,會導致這些短生命周期的對象(不一定長期使用)每進行一次垃圾回收就會長一歲。年齡過大,默認15歲,垃圾回收還是沒有回收回去就會跑到老年代裏面去了。

這樣會導致在老年代中存放大量的短生命周期的對象,老年代應該存放的是數量比較少並且會長期使用的對象,比如數據庫連接池對象。這樣的話,老年代就會滿溢(full gc 因為本來老年代中的對象很少,很少進行full gc 因此采取了不太復雜但是消耗性能和時間的垃圾回收算法)。不管minor gc 還是 full gc都會導致JVM的工作線程停止。

總結:

堆內存不足造成的影響

1) 頻繁的minor gc。

2) 老年代中大量的短生命周期的對象會導致full gc。

3) gc 多了就會影響Spark的性能和運行的速度。

Spark JVM調優主要是降低gc時間,可以修改Executor內存的比例參數。

RDD緩存、task定義運行的算子函數,可能會創建很多對象,這樣會占用大量的堆內存。堆內存滿了之後會頻繁的GC,如果GC還不能夠滿足內存的需要的話就會報OOM。比如一個task在運行的時候會創建N個對象,這些對象首先要放入到JVM年輕代中。比如在存數據的時候我們使用了foreach來將數據寫入到內存,每條數據都會封裝到一個對象中存入數據庫中,那麽有多少條數據就會在JVM中創建多少個對象。

Spark中如何內存調優?

Spark Executor堆內存中存放(以靜態內存管理為例):RDD的緩存數據和廣播變量(spark.storage.memoryFraction 0.6),shuffle聚合內存(spark.shuffle.memoryFraction 0.2),task的運行(0.2)那麽如何調優呢?

1) 提高Executor總體內存的大小

2) 降低儲存內存比例或者降低聚合內存比例

如何查看gc?

Spark WEBUI中job->stage->task

4、Spark Shuffle調優

spark.shuffle.file.buffer 32k buffer大小 默認是32K maptask端的shuffle 降低磁盤IO .

spark.reducer.MaxSizeFlight 48M shuffle read拉取數據量的大小

spark.shuffle.memoryFraction 0.2 shuffle聚合內存的比例

spark.shuffle.io.maxRetries 3 拉取數據重試次數

spark.shuffle.io.retryWait 5s 調整到重試間隔時間60s

spark.shuffle.manager hash|sort Spark Shuffle的種類

spark.shuffle.consolidateFiles false----針對HashShuffle HashShuffle 合並機制

spark.shuffle.sort.bypassMergeThreshold 200----針對SortShuffle SortShuffle bypass機制 200次

5、調節Executor的堆外內存

原因:

Spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網絡傳輸的過程會申請堆外內存(netty是零拷貝),所以使用了堆外內存。默認情況下,這個堆外內存上限默認是每一個executor的內存大小的10%;真正處理大數據的時候,這裏都會出現問題,導致spark作業反復崩潰,無法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G。

executor在進行shuffle write,優先從自己本地關聯的mapOutPutWorker中獲取某份數據,如果本地block manager沒有的話,那麽會通過TransferService,去遠程連接其他節點上executor的block manager去獲取,嘗試建立遠程的網絡連接,並且去拉取數據。頻繁創建對象讓JVM堆內存滿溢,進行垃圾回收。正好碰到那個exeuctor的JVM在垃圾回收。處於垃圾回過程中,所有的工作線程全部停止;相當於只要一旦進行垃圾回收,spark / executor停止工作,無法提供響應,spark默認的網絡連接的超時時長是60s;如果卡住60s都無法建立連接的話,那麽這個task就失敗了task失敗了就會出現shuffle file cannot find的錯誤。

解決方法:

1.調節等待時長。

./spark-submit提交任務的腳本裏面添加:

--conf spark.core.connection.ack.wait.timeout=300

Executor由於內存不足或者堆外內存不足了,掛掉了,對應的Executor上面的block manager也掛掉了,找不到對應的shuffle map output文件,Reducer端不能夠拉取數據。

2.調節堆外內存大小

./spark-submit提交任務的腳本裏面添加

yarn下:

--conf spark.yarn.executor.memoryOverhead=2048 單位M

standalone下:

--conf spark.executor.memoryOverhead=2048單位M

【Spark篇】---Spark調優之代碼調優,數據本地化調優,內存調優,SparkShuffle調優,Executor的堆外內存調優