1. 程式人生 > >spark性能調優點(逐步完善)

spark性能調優點(逐步完善)

放大 虛擬 垃圾 數據 oca 計算 構建 lis 運行

1、使用高性能序列化類庫
2、優化數據結構
3、對多次使用的RDD進行持久化/CheckPoint
4、使用序列化的持久化級別
5、Java虛擬機垃圾回收調優
降低RDD緩存占用空間的比例:new SparkConf().set("spark.storage.memoryFraction","0.5"),從而提高task使用的內存比例。
6、提高並行度
new SparkConf().set("spark.default.parallelism","5")這個參數一旦設定,每個RDD的數據,都會被拆分為5份,針對RDD的partition,一個partition會自動的來進行計算,所以對於所的算子操作,都會創建5個task在集群中運行。
spark官方推薦,設置集群總cpu的數量的兩到三倍的並行度,每個cpu core可能分配到並發運行2-3個task線程,這樣集群的狀態就不太可能出現空閑的狀態。
小知識:spark會自動設置以文件作為輸入源的RDD的並行度,依據其大小,比如:HDFS,就會給每個block創建一個partition,也依據這個設置並行度,對於reduceByKey等會發生shuffle的操作,就使用並行度最大的父RDD的並行度即可。

7、廣播共享數據
默認情況下,算子函數使用到的外部數據,會被拷貝到時每個task中。如果共享的數據較大,那麽每個task都會把這個較大的數據拷貝至自己的節點上。

8、數據本地化
數據本地化對spark job性能有著巨大的影響。如果數據以及要計算它的代碼是在一起的,那麽性能會非常高。但是,如果數據和計算它的代碼是分開的,那麽其中之一必須到另外一方的機器上。通常來說,移動代碼到其他節點,會比移動數據到代碼所在的節點上去速度要快很多。因為代碼比較小,spark也正是基於這個數據本地化的原則來構建task調度算法的。
本地化,指的是,數據離計算它的代碼有多近,基於數據距離代碼的距離,有幾種數據本地化級別:
1)PROCESS_LOCAL:數據和計算它的代碼在同一個jvm進程中。
2)NODE_LOCA:數據和計算它的代碼在一個節點上,但是不在一個進程中,比如在不同的executor進程中,或者是數據在HDFS文件的block中。
3)NO_PREF:數據從哪裏過來,性能都是一樣的。
4)RACK_LOCAL:數據和計算它的代碼在一個機架上。
5)ANY:數據可能在任意地方,比如其他網絡環境內,或是在其他機架上。
spark處理partition數據時,首先會盡量的使用最好的本地化的級別去啟動task,若啟動的節點一直處於忙碌狀態,spark會待一會,如果executor有空閑資源了,便會啟動task。若等待一會(時間可以通過參數設置),發現沒有executor的core釋放,那麽會放大一個級別去啟動這個Task。
可以設置參數,spark.locality系列參數,來調節spark等待task可以進行數據 本地化的時間。spark.locality.wait(3000ms)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack


9、reduceByKey和groupByKey的合理使用
如果能使用reduceByKey就使用reduceByKey,因為它會在map端先進行本地combine,可以大大減少傳輸reduce端的數據量,減少網絡傳輸的開銷。只有在ReduceByKey處理不了的時候,才會用groupByKey().map()來替代。


10、shuffle調優(重中之重)
spark.shuffle.con.solidateFiles:是否開啟shuffle block file的合並,默認是false
spark.reducer.maxSizeInFlight:reduce task的摘取緩存,默認是48m
spark.shuffle.file.buffer:map task的寫磁盤緩存,默認32K
spark.shuffle.io.maxRetries:拉取失敗的最大重試次數,默認是3次
spark.shuffle.io.retryWait:拉取失敗的重試間隔,默認5s
spark.shuffle.memoryFraction:用於reduce端聚合的內存比例,默認為0.2,超過比例就會溢出到磁盤上。

spark性能調優點(逐步完善)