1. 程式人生 > >Spark效能相關引數配置詳解

Spark效能相關引數配置詳解

隨著Spark的逐漸成熟完善, 越來越多的可配置引數被新增到Spark中來, 本文試圖通過闡述這其中部分引數的工作原理和配置思路, 和大家一起探討一下如何根據實際場合對Spark進行配置優化。

schedule排程相關

排程相關的引數設定,大多數內容都很直白,其實無須過多的額外解釋,不過基於這些引數的常用性(大概會是你針對自己的叢集第一步就會配置的引數),這裡多少就其內部機制做一些解釋。

spark.cores.max

一個叢集最重要的引數之一,當然就是CPU計算資源的數量。spark.cores.max這個引數決定了在Standalone和Mesos模式下,一個Spark應用程式所能申請的CPU Core的數量。如果你沒有併發跑多個Spark應用程式的需求,那麼可以不需要設定這個引數,預設會使用spark.deploy.defaultCores的值(而spark.deploy.defaultCores的值預設為Int.Max,也就是不限制的意思)從而應用程式可以使用所有當前可以獲得的CPU資源。

針對這個引數需要注意的是,這個引數對Yarn模式不起作用,YARN模式下,資源由Yarn統一排程管理,一個應用啟動時所申請的CPU資源的數量由另外兩個直接配置Executor的數量和每個Executor中core數量的引數決定。(歷史原因造成,不同執行模式下的一些啟動引數個人認為還有待進一步整合)

此外,在Standalone模式等後臺分配CPU資源時,目前的實現中,在spark.cores.max允許的範圍內,基本上是優先從每個Worker中申請所能得到的最大數量的CPU core給每個Executor,因此如果人工限制了所申請的Max Core的數量小於Standalone和Mesos模式所管理的CPU數量,可能發生應用只執行在叢集中部分節點上的情況(因為部分節點所能提供的最大CPU資源數量已經滿足應用的要求),而不是平均分佈在叢集中。通常這不會是太大的問題,但是如果涉及資料本地性的場合,有可能就會帶來一定的必須進行遠端資料讀取的情況發生。理論上,這個問題可以通過兩種途徑解決:一是Standalone和Mesos的資源管理模組自動根據節點資源情況,均勻分配和啟動Executor,二是和Yarn模式一樣,允許使用者指定和限制每個Executor的Core的數量。社群中有一個PR試圖走第二種途徑來解決類似的問題,不過截至我寫下這篇文件為止(2014.8),還沒有被Merge。

spark.task.cpus

這個引數在字面上的意思就是分配給每個任務的CPU的數量,預設為1。實際上,這個引數並不能真的控制每個任務實際執行時所使用的CPU的數量,比如你可以通過在任務內部建立額外的工作執行緒來使用更多的CPU(至少目前為止,將來任務的執行環境是否能通過LXC等技術來控制還不好說)。它所發揮的作用,只是在作業排程時,每分配出一個任務時,對已使用的CPU資源進行計數。也就是說只是理論上用來統計資源的使用情況,便於安排排程。因此,如果你期望通過修改這個引數來加快任務的執行,那還是趕緊換個思路吧。這個引數的意義,個人覺得還是在你真的在任務內部自己通過任何手段,佔用了更多的CPU資源時,讓排程行為更加準確的一個輔助手段。

spark.scheduler.mode

這個引數決定了單個Spark應用內部排程的時候使用FIFO模式還是Fair模式。是的,你沒有看錯,這個引數只管理一個Spark應用內部的多個沒有依賴關係的Job作業的排程策略。

如果你需要的是多個Spark應用之間的排程策略,那麼在Standalone模式下,這取決於每個應用所申請和獲得的CPU資源的數量(暫時沒有獲得資源的應用就Pending在那裡了),基本上就是FIFO形式的,誰先申請和獲得資源,誰就佔用資源直到完成。而在Yarn模式下,則多個Spark應用間的排程策略由Yarn自己的策略配置檔案所決定。

那麼這個內部的排程邏輯有什麼用呢?如果你的Spark應用是通過服務的形式,為多個使用者提交作業的話,那麼可以通過配置Fair模式相關引數來調整不同使用者作業的排程和資源分配優先順序。

spark.locality.wait

spark.locality.wait和spark.locality.wait.process,spark.locality.wait.node,spark.locality.wait.rack這幾個引數影響了任務分配時的本地性策略的相關細節。

Spark中任務的處理需要考慮所涉及的資料的本地性的場合,基本就兩種,一是資料的來源是HadoopRDD;二是RDD的資料來源來自於RDD Cache(即由CacheManager從BlockManager中讀取,或者Streaming資料來源RDD)。其它情況下,如果不涉及shuffle操作的RDD,不構成劃分Stage和Task的基準,不存在判斷Locality本地性的問題,而如果是ShuffleRDD,其本地性始終為No Prefer,因此其實也無所謂Locality。

在理想的情況下,任務當然是分配在可以從本地讀取資料的節點上時(同一個JVM內部或同一臺物理機器內部)的執行時效能最佳。但是每個任務的執行速度無法準確估計,所以很難在事先獲得全域性最優的執行策略,當Spark應用得到一個計算資源的時候,如果沒有可以滿足最佳本地性需求的任務可以執行時,是退而求其次,執行一個本地性條件稍差一點的任務呢,還是繼續等待下一個可用的計算資源已期望它能更好的匹配任務的本地性呢?

這幾個引數一起決定了Spark任務排程在得到分配任務時,選擇暫時不分配任務,而是等待獲得滿足程序內部/節點內部/機架內部這樣的不同層次的本地性資源的最長等待時間。預設都是3000毫秒。

基本上,如果你的任務數量較大和單個任務執行時間比較長的情況下,單個任務是否在資料本地執行,代價區別可能比較顯著,如果資料本地性不理想,那麼調大這些引數對於效能優化可能會有一定的好處。反之如果等待的代價超過帶來的收益,那就不要考慮了。

特別值得注意的是:在處理應用剛啟動後提交的第一批任務時,由於當作業排程模組開始工作時,處理任務的Executors可能還沒有完全註冊完畢,因此一部分的任務會被放置到No Prefer的佇列中,這部分任務的優先順序僅次於資料本地性滿足Process級別的任務,從而被優先分配到非本地節點執行,如果的確沒有Executors在對應的節點上執行,或者的確是No Prefer的任務(如shuffleRDD),這樣做確實是比較優化的選擇,但是這裡的實際情況只是這部分Executors還沒來得及註冊上而已。這種情況下,即使加大本節中這幾個引數的數值也沒有幫助。針對這個情況,有一些已經完成的和正在進行中的PR通過例如動態調整No Prefer佇列,監控節點註冊比例等等方式試圖來給出更加智慧的解決方案。不過,你也可以根據自身叢集的啟動情況,通過在建立SparkContext之後,主動Sleep幾秒的方式來簡單的解決這個問題。

spark.speculation

spark.speculation以及spark.speculation.interval,spark.speculation.quantile, spark.speculation.multiplier等引數調整Speculation行為的具體細節,Speculation是在任務排程的時候,如果沒有適合當前本地性要求的任務可供執行,將跑得慢的任務在空閒計算資源上再度排程的行為,這些引數調整這些行為的頻率和判斷指標,預設是不使用Speculation的。

通常來說很難正確的判斷是否需要Speculation,能真正發揮Speculation用處的場合,往往是某些節點由於執行環境原因,比如CPU資源由於某種原因被佔用,磁碟損壞導致IO緩慢造成任務執行速度異常的情況,當然前提是你的分割槽任務不存在僅能被執行一次,或者不能同時執行多個拷貝等情況。Speculation任務參照的指標通常是其它任務的執行時間,而實際的任務可能由於分割槽資料尺寸不均勻,本來就會有時間差異,加上一定的排程和IO的隨機性,所以如果一致性指標定得過嚴,Speculation可能並不能真的發現問題,反而增加了不必要的任務開銷,定得過寬,大概又基本相當於沒用。

個人覺得,如果你的叢集規模比較大,執行環境複雜,的確可能經常發生執行異常,加上資料分割槽尺寸差異不大,為了程式執行時間的穩定性,那麼可以考慮仔細調整這些引數。否則還是考慮如何排除造成任務執行速度異常的因數比較靠鋪一些。

當然,我沒有實際在很大規模的叢集上執行過Spark,所以如果看法有些偏頗,還請有實際經驗的XD指正。

壓縮和序列化相關

spark.serializer

預設為org.apache.spark.serializer.JavaSerializer,可選org.apache.spark.serializer.KryoSerializer,實際上只要是org.apache.spark.serializer的子類就可以了,不過如果只是應用,大概你不會自己去實現一個的。

序列化對於spark應用的效能來說,還是有很大影響的,在特定的資料格式的情況下,KryoSerializer的效能可以達到JavaSerializer的10倍以上,當然放到整個Spark程式中來考量,比重就沒有那麼大了,但是以Wordcount為例,通常也很容易達到30%以上的效能提升。而對於一些Int之類的基本型別資料,效能的提升就幾乎可以忽略了。KryoSerializer依賴Twitter的Chill庫來實現,相對於JavaSerializer,主要的問題在於不是所有的Java Serializable物件都能支援。

需要注意的是,這裡可配的Serializer針對的物件是Shuffle資料,以及RDD Cache等場合,而Spark Task的序列化是通過spark.closure.serializer來配置,但是目前只支援JavaSerializer,所以等於沒法配置啦。

spark.rdd.compress

這個引數決定了RDD Cache的過程中,RDD資料在序列化之後是否進一步進行壓縮再儲存到記憶體或磁碟上。當然是為了進一步減小Cache資料的尺寸,對於Cache在磁碟上而言,絕對大小大概沒有太大關係,主要是考慮Disk的IO頻寬。而對於Cache在記憶體中,那主要就是考慮尺寸的影響,是否能夠Cache更多的資料,是否能減小Cache資料對GC造成的壓力等。

這兩者,前者通常不會是主要問題,尤其是在RDD Cache本身的目的就是追求速度,減少重算步驟,用IO換CPU的情況下。而後者,GC問題當然是需要考量的,資料量小,佔用空間少,GC的問題大概會減輕,但是是否真的需要走到RDDCache壓縮這一步,或許用其它方式來解決可能更加有效。

所以這個值預設是關閉的,但是如果在磁碟IO的確成為問題或者GC問題真的沒有其它更好的解決辦法的時候,可以考慮啟用RDD壓縮。

spark.broadcast.compress

是否對Broadcast的資料進行壓縮,預設值為True。

Broadcast機制是用來減少執行每個Task時,所需要傳送給TASK的RDD所使用到的相關資料的尺寸,一個Executor只需要在第一個Task啟動時,獲得一份Broadcast資料,之後的Task都從本地的BlockManager中獲取相關資料。在1.1最新版本的程式碼中,RDD本身也改為以Broadcast的形式傳送給Executor(之前的實現RDD本身是隨每個任務傳送的),因此基本上不太需要顯式的決定哪些資料需要broadcast了。

因為Broadcast的資料需要通過網路傳送,而在Executor端又需要儲存在本地BlockMananger中,加上最新的實現,預設RDD通過Boradcast機制傳送,因此大大增加了Broadcast變數的比重,所以通過壓縮減小尺寸,來減少網路傳輸開銷和記憶體佔用,通常都是有利於提高整體效能的。

什麼情況可能不壓縮更好呢,大致上個人覺得同樣還是在網路頻寬和記憶體不是問題的時候,如果Driver端CPU資源很成問題(畢竟壓縮的動作基本都在Driver端執行),那或許有調整的必要。

spark.io.compression.codec

RDD Cache和Shuffle資料壓縮所採用的演算法Codec,預設值曾經是使用LZF作為預設Codec,最近因為LZF的記憶體開銷的問題,預設的Codec已經改為Snappy。

LZF和Snappy相比較,前者壓縮率比較高(當然要看具體資料內容了,通常要高20%左右),但是除了記憶體問題以外,CPU代價也大一些(大概也差20%~50%?)

在用於Shuffle資料的場合下,記憶體方面,應該主要是在使用HashShuffleManager的時候有可能成為問題,因為如果Reduce分割槽數量巨大,需要同時開啟大量的壓縮資料流用於寫檔案,進而在Codec方面需要大量的buffer。但是如果使用SortShuffleManager,由於shuffle檔案數量大大減少,不會產生大量的壓縮資料流,所以記憶體開銷大概不會成為主要問題。

剩下的就是CPU和壓縮率的權衡取捨,和前面一樣,取決於CPU/網路/磁碟的能力和負載,個人認為CPU通常更容易成為瓶頸。所以要調整效能,要不不壓縮,要不使用Snappy可能性大一些?

對於RDD Cache的場合來說,絕大多數場合都是記憶體操作或者本地IO,所以CPU負載的問題可能比IO的問題更加突出,這也是為什麼spark.rdd.compress本身預設為不壓縮,如果要壓縮,大概也是Snappy合適一些?

Storage相關配置引數

spark.local.dir

這個看起來很簡單,就是Spark用於寫中間資料,如RDD Cache,Shuffle,Spill等資料的位置,那麼有什麼可以注意的呢。

首先,最基本的當然是我們可以配置多個路徑(用逗號分隔)到多個磁碟上增加整體IO頻寬,這個大家都知道。

其次,目前的實現中,Spark是通過對檔名採用hash演算法分佈到多個路徑下的目錄中去,如果你的儲存裝置有快有慢,比如SSD+HDD混合使用,那麼你可以通過在SSD上配置更多的目錄路徑來增大它被Spark使用的比例,從而更好地利用SSD的IO頻寬能力。當然這只是一種變通的方法,終極解決方案還是應該像目前HDFS的實現方向一樣,讓Spark能夠感知具體的儲存裝置型別,針對性的使用。

需要注意的是,在Spark 1.0以後,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)引數會覆蓋這個配置。比如Spark On YARN的時候,Spark Executor的本地路徑依賴於Yarn的配置,而不取決於這個引數。

spark.executor.memory

Executor 記憶體的大小,和效能本身當然並沒有直接的關係,但是幾乎所有執行時效能相關的內容都或多或少間接和記憶體大小相關。這個引數最終會被設定到Executor的JVM的heap尺寸上,對應的就是Xmx和Xms的值

理論上Executor 記憶體當然是多多益善,但是實際受機器配置,以及執行環境,資源共享,JVM GC效率等因素的影響,還是有可能需要為它設定一個合理的大小。多大算合理,要看實際情況

Executor的記憶體基本上是Executor內部所有任務共享的,而每個Executor上可以支援的任務的數量取決於Executor所管理的CPU Core資源的多少,因此你需要了解每個任務的資料規模的大小,從而推算出每個Executor大致需要多少記憶體即可滿足基本的需求。

如何知道每個任務所需記憶體的大小呢,這個很難統一的衡量,因為除了資料集本身的開銷,還包括演算法所需各種臨時記憶體空間的使用,而根據具體的程式碼演算法等不同,臨時記憶體空間的開銷也不同。但是資料集本身的大小,對最終所需記憶體的大小還是有一定的參考意義的。

通常來說每個分割槽的資料集在記憶體中的大小,可能是其在磁碟上源資料大小的若干倍(不考慮源資料壓縮,Java物件相對於原始裸資料也還要算上用於管理資料的資料結構的額外開銷),需要準確的知道大小的話,可以將RDD cache在記憶體中,從BlockManager的Log輸出可以看到每個Cache分割槽的大小(其實也是估算出來的,並不完全準確)

如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134(size: 495.3 MB)

反過來說,如果你的Executor的數量和記憶體大小受機器物理配置影響相對固定,那麼你就需要合理規劃每個分割槽任務的資料規模,例如採用更多的分割槽,用增加任務數量(進而需要更多的批次來運算所有的任務)的方式來減小每個任務所需處理的資料大小。

spark.storage.memoryFraction

如前面所說spark.executor.memory決定了每個Executor可用記憶體的大小,而spark.storage.memoryFraction則決定了在這部分記憶體中有多少可以用於Memory Store管理RDD Cache資料,剩下的記憶體用來保證任務執行時各種其它記憶體空間的需要。

spark.executor.memory預設值為0.6,官方文件建議這個比值不要超過JVM Old Gen區域的比值。這也很容易理解,因為RDD Cache資料通常都是長期駐留記憶體的,理論上也就是說最終會被轉移到Old Gen區域(如果該RDD還沒有被刪除的話),如果這部分資料允許的尺寸太大,勢必把Old Gen區域佔滿,造成頻繁的FULL GC。

如何調整這個比值,取決於你的應用對資料的使用模式和資料的規模,粗略的來說,如果頻繁發生Full GC,可以考慮降低這個比值,這樣RDD Cache可用的記憶體空間減少(剩下的部分Cache資料就需要通過Disk Store寫到磁碟上了),會帶來一定的效能損失,但是騰出更多的記憶體空間用於執行任務,減少Full GC發生的次數,反而可能改善程式執行的整體效能

spark.streaming.blockInterval

這個引數用來設定Spark Streaming裡Stream Receiver生成Block的時間間隔,預設為200ms。具體的行為表現是具體的Receiver所接收的資料,每隔這裡設定的時間間隔,就從Buffer中生成一個StreamBlock放進佇列,等待進一步被儲存到BlockManager中供後續計算過程使用。理論上來說,為了每個StreamingBatch間隔裡的資料是均勻的,這個時間間隔當然應該能被Batch的間隔時間長度所整除。總體來說,如果記憶體大小夠用,Streaming的資料來得及處理,這個blockInterval時間間隔的影響不大,當然,如果資料Cache Level是Memory+Ser,即做了序列化處理,那麼BlockInterval的大小會影響序列化後資料塊的大小,對於Java的GC的行為會有一些影響。

此外spark.streaming.blockQueueSize決定了在StreamBlock被儲存到BlockMananger之前,佇列中最多可以容納多少個StreamBlock。預設為10,因為這個佇列Poll的時間間隔是100ms,所以如果CPU不是特別繁忙的話,基本上應該沒有問題。

Shuffle相關

Shuffle操作大概是對Spark效能影響最大的步驟之一(因為可能涉及到排序,磁碟IO,網路IO等眾多CPU或IO密集的操作),這也是為什麼在Spark 1.1的程式碼中對整個Shuffle框架程式碼進行了重構,將Shuffle相關讀寫操作抽象封裝到Pluggable的Shuffle Manager中,便於試驗和實現不同的Shuffle功能模組。例如為了解決Hash Based的Shuffle Manager在檔案讀寫效率方面的問題而實現的Sort Base的Shuffle Manager。

spark.shuffle.manager

用來配置所使用的Shuffle Manager,目前可選的Shuffle Manager包括預設的org.apache.spark.shuffle.sort.HashShuffleManager(配置引數值為hash)和新的org.apache.spark.shuffle.sort.SortShuffleManager(配置引數值為sort)。

這兩個ShuffleManager如何選擇呢,首先需要了解他們在實現方式上的區別。

HashShuffleManager,故名思義也就是在Shuffle的過程中寫資料時不做排序操作,只是將資料根據Hash的結果,將各個Reduce分割槽的資料寫到各自的磁碟檔案中。帶來的問題就是如果Reduce分割槽的數量比較大的話,將會產生大量的磁碟檔案。如果檔案數量特別巨大,對檔案讀寫的效能會帶來比較大的影響,此外由於同時開啟的檔案控制代碼數量眾多,序列化,以及壓縮等操作需要分配的臨時記憶體空間也可能會迅速膨脹到無法接受的地步,對記憶體的使用和GC帶來很大的壓力,在Executor記憶體比較小的情況下尤為突出,例如Spark on Yarn模式。

SortShuffleManager,是1.1版本之後實現的一個試驗性(也就是一些功能和介面還在開發演變中)的ShuffleManager,它在寫入分割槽資料的時候,首先會根據實際情況對資料採用不同的方式進行排序操作,底線是至少按照Reduce分割槽Partition進行排序,這樣來至於同一個Map任務Shuffle到不同的Reduce分割槽中去的所有資料都可以寫入到同一個外部磁碟檔案中去,用簡單的Offset標誌不同Reduce分割槽的資料在這個檔案中的偏移量。這樣一個Map任務就只需要生成一個shuffle檔案,從而避免了上述HashShuffleManager可能遇到的檔案數量巨大的問題

兩者的效能比較,取決於記憶體,排序,檔案操作等因素的綜合影響。

對於不需要進行排序的Shuffle操作來說,如repartition等,如果檔案數量不是特別巨大,HashShuffleManager面臨的記憶體問題不大,而SortShuffleManager需要額外的根據Partition進行排序,顯然HashShuffleManager的效率會更高。

而對於本來就需要在Map端進行排序的Shuffle操作來說,如ReduceByKey等,使用HashShuffleManager雖然在寫資料時不排序,但在其它的步驟中仍然需要排序,而SortShuffleManager則可以將寫資料和排序兩個工作合併在一起執行,因此即使不考慮HashShuffleManager的記憶體使用問題,SortShuffleManager依舊可能更快。

spark.shuffle.sort.bypassMergeThreshold

這個引數僅適用於SortShuffleManager,如前所述,SortShuffleManager在處理不需要排序的Shuffle操作時,由於排序帶來效能的下降。這個引數決定了在這種情況下,當Reduce分割槽的數量小於多少的時候,在SortShuffleManager內部不使用Merge Sort的方式處理資料,而是與Hash Shuffle類似,直接將分割槽檔案寫入單獨的檔案,不同的是,在最後一步還是會將這些檔案合併成一個單獨的檔案。這樣通過去除Sort步驟來加快處理速度,代價是需要併發開啟多個檔案,所以記憶體消耗量增加,本質上是相對HashShuffleMananger一個折衷方案。這個引數的預設值是200個分割槽,如果記憶體GC問題嚴重,可以降低這個值。

spark.shuffle.consolidateFiles

這個配置引數僅適用於HashShuffleMananger的實現,同樣是為了解決生成過多檔案的問題,採用的方式是在不同批次執行的Map任務之間重用Shuffle輸出檔案,也就是說合並的是不同批次的Map任務的輸出資料,但是每個Map任務所需要的檔案還是取決於Reduce分割槽的數量,因此,它並不減少同時開啟的輸出檔案的數量,因此對記憶體使用量的減少並沒有幫助。只是HashShuffleManager裡的一個折中的解決方案。

需要注意的是,這部分的程式碼實現儘管原理上說很簡單,但是涉及到底層具體的檔案系統的實現和限制等因素,例如在併發訪問等方面,需要處理的細節很多,因此一直存在著這樣那樣的bug或者問題,導致在例如EXT3上使用時,特定情況下效能反而可能下降,因此從Spark 0.8的程式碼開始,一直到Spark 1.1的程式碼為止也還沒有被標誌為Stable,不是預設採用的方式。此外因為並不減少同時開啟的輸出檔案的數量,因此對效能具體能帶來多大的改善也取決於具體的檔案數量的情況。所以即使你面臨著Shuffle檔案數量巨大的問題,這個配置引數是否使用,在什麼版本中可以使用,也最好還是實際測試以後再決定。

spark.shuffle.spill

shuffle的過程中,如果涉及到排序,聚合等操作,勢必會需要在記憶體中維護一些資料結構,進而佔用額外的記憶體。如果記憶體不夠用怎麼辦,那只有兩條路可以走,一就是out of memory出錯了,二就是將部分資料臨時寫到外部儲存裝置中去,最後再合併到最終的Shuffle輸出檔案中去。

這裡spark.shuffle.spill決定是否Spill到外部儲存裝置(預設開啟),如果你的記憶體足夠使用,或者資料集足夠小,當然也就不需要Spill,畢竟Spill帶來了額外的磁碟操作。

spark.shuffle.memoryFraction/ spark.shuffle.safetyFraction

在啟用Spill的情況下,spark.shuffle.memoryFraction(1.1後預設為0.2)決定了當Shuffle過程中使用的記憶體達到總記憶體多少比例的時候開始Spill。

通過spark.shuffle.memoryFraction可以調整Spill的觸發條件,即Shuffle佔用記憶體的大小,進而調整Spill的頻率和GC的行為。總的來說,如果Spill太過頻繁,可以適當增加spark.shuffle.memoryFraction的大小,增加用於Shuffle的記憶體,減少Spill的次數。當然這樣一來為了避免記憶體溢位,對應的可能需要減少RDD cache佔用的記憶體,即減小spark.storage.memoryFraction的值,這樣RDD cache的容量減少,有可能帶來效能影響,因此需要綜合考慮。

由於Shuffle資料的大小是估算出來的,一來為了降低開銷,並不是每增加一個數據項都完整的估算一次,二來估算也會有誤差,所以實際暫用的記憶體可能比估算值要大,這裡spark.shuffle.safetyFraction(預設為0.8)用來作為一個保險係數,降低實際Shuffle使用的記憶體閥值,增加一定的緩衝,降低實際記憶體佔用超過使用者配置值的概率。

spark.shuffle.spill.compress/ spark.shuffle.compress

這兩個配置引數都是用來設定Shuffle過程中是否使用壓縮演算法對Shuffle資料進行壓縮,前者針對Spill的中間資料,後者針對最終的shuffle輸出檔案,預設都是True

理論上說,spark.shuffle.compress設定為True通常都是合理的,因為如果使用千兆以下的網絡卡,網路頻寬往往最容易成為瓶頸。此外,目前的Spark任務排程實現中,以Shuffle劃分Stage,下一個Stage的任務是要等待上一個Stage的任務全部完成以後才能開始執行,所以shuffle資料的傳輸和CPU計算任務之間通常不會重疊,這樣Shuffle資料傳輸量的大小和所需的時間就直接影響到了整個任務的完成速度。但是壓縮也是要消耗大量的CPU資源的,所以開啟壓縮選項會增加Map任務的執行時間,因此如果在CPU負載的影響遠大於磁碟和網路頻寬的影響的場合下,也可能將spark.shuffle.compress設定為False才是最佳的方案

對於spark.shuffle.spill.compress而言,情況類似,但是spill資料不會被髮送到網路中,僅僅是臨時寫入本地磁碟,而且在一個任務中同時需要執行壓縮和解壓縮兩個步驟,所以對CPU負載的影響會更大一些,而磁碟頻寬(如果標配12HDD的話)可能往往不會成為Spark應用的主要問題,所以這個引數相對而言,或許更有機會需要設定為False。

總之,Shuffle過程中資料是否應該壓縮,取決於CPU/DISK/NETWORK的實際能力和負載,應該綜合考慮。

原文連結:http://spark-config.readthedocs.io/en/latest/