1. 程式人生 > >Spark常用優化方法

Spark常用優化方法

一、前言

  • 1.為什麼要優化?
    因為你的資源有限、更快速的跑完任務、防止不穩定因素導致的任務失敗。
  • 2.怎樣做優化?
    通常檢視spark的web UI,或者檢視執行中的logs
  • 3.做哪方面的優化?
    spark 應用程式 80% 的優化,都是集中在三個地方:記憶體,磁碟io,網路io

二、調優詳情

1.spark-submit命令中作為引數設定

資源引數設定的不合理,可能會導致沒有充分利用叢集資源,作業執行會極其緩慢;或者設定的資源過大,佇列沒有足夠的資源來提供,進而導致各種異常。

    --num-executors  //該引數用於設定Spark作業總共要用多少個Executor程序來執行
    --executor-memory //該引數用於設定每個Executor程序的記憶體。Executor記憶體的大小,很多時候直接決定了Spark作業的效能,而且跟常見的JVM OOM異常,也有直接的關聯。
    //每個Executor程序的記憶體設定4G~8G較為合適。num-executors乘以executor-memory,就代表了你的Spark作業申請到的總記憶體量(也就是所有Executor程序的記憶體總和)
    --executor-cores  //該引數用於設定每個Executor程序的CPU core數量。這個引數決定了每個Executor程序並行執行task執行緒的能力。同樣建議,如果是跟他人共享這個佇列,那麼num-executors * executor-cores不要超過佇列總CPU core的1/3~1/2左右比較合適
    -- driver-memory  //設定Driver程序的記憶體,預設1G即可
    -- spark.default.parallelism //該引數用於設定每個stage的預設task數量。這個引數極為重要。【如果不去設定這個引數,那麼此時就會導致Spark自己根據底層HDFS的block數量來設定task的數量,預設是一個HDFS block對應一個task。task數量偏少的話,就會導致你前面設定好的Executor的引數都前功盡棄。】
    --spark.storage.memoryFraction //用於設定RDD持久化資料在Executor記憶體中能佔的比例,預設是0.6
   --spark.shuffle.memoryFraction  //設定shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設是0.2
./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

1.提高並行度parallelism

如果設定了new SparkConf().set(“spark.default.parallelism”,“5”),所有的RDDpartition都會被設定為 5個,也就是每個RDD的資料都會被分為5分,每個partition都會啟動一個task來進行計算,對於所有的運算元操作,都只會使用5個task在叢集中執行。所以在這個時候,叢集中有10個cpucore,也只會使用5個來進行task,剩餘空閒。造成資源浪費。

面對10個core,我們可以設定10個甚至20、30個task,因為task之間的執行順序和時間是不一樣的,正好10個也會造成浪費。官方建議並行度設定為core數的2~3倍,可以最高效率使用資源。

2. repartition and coalesce

3.使用Kryo序列化機制

spark預設使用了java自身提供的序列化機制,基於ObjectInputStream和OBjectOutputStream的序列化機制,但是效能比較差,速度慢,序列化之後佔用空間依舊高

spark還提供了另外一種序列化機制——Kryo序列化機制,快,結果集小10倍,缺點是有些型別及時實現了Seriralizable介面,也不一定能被序列化。如果想要達到最好的效能,Kryo要求在Spark中對所有需要序列化的型別進行註冊

4.優化資料結構(屬於程式碼內優化)

5.對於多次使用的RDD進行持久化或者Checkpoint

6.設定廣播共享資料

使用Broadcast廣播,讓其在每個節點中一個副本,而不是每個task一個副本。減少節點上的記憶體佔用。
Broadcast廣播節點上使用該資料的時候不需要呼叫RDD,而是呼叫broadcastConf廣播副本,就可以節省記憶體和網路IO

7. 資料本地化(spark的內部優化機制)

資料本地化處理機制(基於資料距離程式碼的距離)
情況由好到壞:

(1)PROCESS_LOCAL:資料和計算它的程式碼在同一個JVM程序中

(2)NODE_LOCAL:資料和程式碼在同一節點上,但是不在一個程序中,比如在不容的executor程序彙總,或者資料在HDFS檔案的block中

(3)NO_PREF:資料從哪裡過來,最終的效能都是一樣的

(4)RACK_LOCAL:資料和程式碼在同一個機架上

(5)ANY:資料可能在任意地方,比如其他網路環境或者其他機架上

8.reduceByKey和groupByKey

一般情況下,reduceByKey的操作都是可以使用groupByKey().map()來進行替代操作的。但是 groupByKey不會進行本地聚合,原封不動的將ShuffleMapTask的輸出拉渠道ResultTask的記憶體中。因為reduceByKey首先會在map端進行本地的combine,可以大大減要傳輸到reduce的資料量,減少網路IO,只有在reduceByKey解決不了類似問題的時候才會使用groupByKey().map()來進行代替。


參考 https://blog.csdn.net/u012102306/article/details/51637366

一、Spark效能優化:開發調優篇
二、Spark效能優化:資源調優篇
三、Spark效能優化:資料傾斜調優
四、Spark效能優化:shuffle調優