1. 程式人生 > >spark 引數調優詳解(持續更新中)

spark 引數調優詳解(持續更新中)

spark引數調優需要對各個引數充分理解,沒有一套可以借鑑的引數,因為每個叢集規模都不一樣,只有理解了引數的用途,調試出符合自己業務場景叢集環境,並且能在擴大叢集、業務的情況下,能夠跟著修改引數。這樣才算是正確的引數調優。

1、背景

使用spark-thriftserver,jdbc連線以執行hive查詢。

spark2.2.1版本,其實官方文件中,相關的引數已經寫的很詳細了,這邊仔細閱讀了官方文件,

並結合了實際情景,實踐之後,整理了這篇部落格。

官方文件:

http://spark.apache.org/docs/2.2.1/configuration.html

yarn:

http://spark.apache.org/docs/2.2.1/running-on-yarn.html#configuration

2、引數詳解

① Application Properties 應用基本屬性

spark.driver.cores  

driver端分配的核數,預設為1,thriftserver是啟動thriftserver服務的機器,資源充足的話可以儘量給多。

spark.driver.memory

driver端分配的記憶體數,預設為1g,同上。

spark.executor.memory

每個executor分配的記憶體數,預設1g,會受到yarn CDH的限制,和memoryOverhead相加 不能超過總記憶體限制。

spark.driver.maxResultSize

driver端接收的最大結果大小,預設1GB,最小1MB,設定0為無限。

這個引數不建議設定的太大,如果要做資料視覺化,更應該控制在20-30MB以內。

過大會導致OOM。

spark.extraListeners

預設none,隨著SparkContext被建立而建立,用於監聽單引數、無引數建構函式的建立,並丟擲異常。

② Runtime Environment 執行環境

主要是一些日誌,jvm引數的額外配置,jars等一些自定義的配置,一般不會用到這一塊。

③ Shuffle Behavior 

spark.reducer.maxSizeInFlight

預設48m。從每個reduce任務同時拉取的最大map數,每個reduce都會在完成任務後,需要一個堆外記憶體的緩衝區來存放結果,如果沒有充裕的記憶體就儘可能把這個調小一點。。相反,堆外記憶體充裕,調大些就能節省gc時間。

spark.reducer.maxBlocksInFlightPerAddress

限制了每個主機每次reduce可以被多少臺遠端主機拉取檔案塊,調低這個引數可以有效減輕node manager的負載。(預設值Int.MaxValue)

spark.reducer.maxReqsInFlight

限制遠端機器拉取本機器檔案塊的請求數,隨著叢集增大,需要對此做出限制。否則可能會使本機負載過大而掛掉。。(預設值為Int.MaxValue)

spark.reducer.maxReqSizeShuffleToMem

shuffle請求的檔案塊大小 超過這個引數值,就會被強行落盤,防止一大堆併發請求把記憶體佔滿。(預設Long.MaxValue)

spark.shuffle.compress

是否壓縮map輸出檔案,預設壓縮 true

spark.shuffle.spill.compress

shuffle過程中溢位的檔案是否壓縮,預設true,使用spark.io.compression.codec壓縮。

spark.shuffle.file.buffer

在記憶體輸出流中 每個shuffle檔案佔用記憶體大小,適當提高 可以減少磁碟讀寫 io次數,初始值為32k

spark.shuffle.memoryFraction

該引數代表了Executor記憶體中,分配給shuffle read task進行聚合操作的記憶體比例,預設是20%。

cache少且記憶體充足時,可以調大該引數,給shuffle read的聚合操作更多記憶體,以避免由於記憶體不足導致聚合過程中頻繁讀寫磁碟。

spark.shuffle.manager

當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(預設是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫資料,但是最後會將每個task產生的所有臨時磁碟檔案都合併成一個檔案,並會建立單獨的索引檔案。

當使用SortShuffleManager時,如果的確不需要排序操作,那麼建議將這個引數調大一些,大於shuffle read task的數量。那麼此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的效能開銷。但是這種方式下,依然會產生大量的磁碟檔案,因此shuffle write效能有待提高。

spark.shuffle.consolidateFiles

如果使用HashShuffleManager,該引數有效。如果設定為true,那麼就會開啟consolidate機制,會大幅度合併shuffle write的輸出檔案,對於shuffle read task數量特別多的情況下,這種方法可以極大地減少磁碟IO開銷,提升效能。

如果的確不需要SortShuffleManager的排序機制,那麼除了使用bypass機制,還可以嘗試將spark.shuffle.manager引數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。

spark.shuffle.io.maxRetries

shuffle read task從shuffle write task所在節點拉取屬於自己的資料時,如果因為網路異常導致拉取失敗,是會自動進行重試的。該引數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。

對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網路不穩定等因素導致的資料拉取失敗。在實踐中發現,對於針對超大資料量(數十億~上百億)的shuffle過程,調節該引數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

同上,預設5s,建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。

spark.io.encryption.enabled + spark.io.encryption.keySizeBits + spark.io.encryption.keygen.algorithm

io加密,預設關閉

④ Spark UI

這一塊配置,是有關於spark日誌的。日誌開關,日誌輸出路徑,是否壓縮。

還有一些視覺化介面、埠的配置 

 ⑤ Compression and Serialization

spark.broadcast.compress

廣播變數前是否會先進行壓縮。預設true (spark.io.compression.codec)

spark.io.compression.codec

壓縮RDD資料、日誌、shuffle輸出等的壓縮格式 預設lz4

spark.io.compression.lz4.blockSize

使用lz4壓縮時,每個資料塊大小 預設32k

spark.rdd.compress

rdd是否壓縮 預設false,節省memory_cache大量記憶體 消耗更多的cpu資源(時間)。

spark.serializer.objectStreamReset

當使用JavaSerializer序列化時,會快取物件防止寫多餘的資料,但這些物件就不會被gc,可以輸入reset 清空快取。預設快取100個物件,修改成-1則不快取任何物件。

⑥ Memory Management

spark.memory.fraction

spark.memory.storageFraction

spark.shuffle.service.enabled=true
shuffle service的目的是儲存executor產生的檔案並提供給後續任務使用,從而使得executor可以安全的移除
spark.dynamicAllocation.enabled 資源動態分配,多用多拿,不用還回去
spark.dynamicAllocation.minExecutors 動態最小的executor數量
spark.dynamicAllocation.maxExecutors 動態最大的executor數量
spark.dynamicAllocation.schedulerBacklogTimeout 資源不夠(有task在pending的時候),間隔多久會去申請額外的資源
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 首次之後,資源不夠時,申請資源間隔
spark.dynamicAllocation.executorIdleTimeout 不用資源多少時間,會kill executor
spark.sql.hive.thriftServer.singleSession 可檢視持久化表
spark.yarn.executor.memoryOverhead 每個executor分配的堆外記憶體,解決報錯說將此引數調大。包括了VM開銷,交錯字串,其他開銷等,會隨著executor的大小變化,大小約為它的10%左右。

spark2.3.0+中,分成了2個引數spark.driver.memoryOverhead spark.executor.memoryOverhead


spark.executor.memory 每個executor的執行記憶體,spark2.0之後,動態分配storage和execution,預設切分0.6。加上堆外記憶體,不能大於yarn的container記憶體限制。
spark.yarn.executor.memoryOverhead和spark.executor.memory這兩個是比較重要的引數,配置不好,就很容易導致OOM。

spark.executor.cores 

每個executor核數

spark.driver.extraJavaOptions -Dhdp.version=2.6.0-cdh5.8.3 

配置相應的hadoop版本,不然也會kill executor