1. 程式人生 > >【Spark調優】Shuffle原理學習與參數調優

【Spark調優】Shuffle原理學習與參數調優

flight 避免 manager 生成文件 tput 順序輸出 磁盤文件 所有 提高

  • 【生產實踐經驗】

  生產實踐中的切身體會是:影響Spark性能的大BOSS就是shuffle,抓住並解決shuffle這個主要原因,事半功倍。

  • 【Shuffle原理學習筆記】

  1.未經優化的HashShuffleManager

註:這是spark1.2版本之前,最早使用的shuffle方法,這種shuffle方法不要使用,只是用來對比改進後的shuffle方法。

技術分享圖片

如上圖,上遊每個task 都輸出下遊task個數的結果文件,下遊每個task去上遊task輸出的結果文件中獲取對應自己的。

問題:

生成文件個數過多,會生成 上遊

task數量 * 下遊task數量 個文件。

對應目前spark的參數:

spark.shuffle.manager=hash

spark.shuffle.consolidateFiles=false

  2.經過優化以後的HashShufferManager

註:不排序的shuffle推薦使用。

技術分享圖片

如上圖,上遊1個Executor所有task順序輸出下遊task個數的結果文件,下遊每個task去上遊task輸出的結果文件中獲取對應自己的。

減少了中間文件輸出,只生成 executor_num * 下遊task數量 個文件。

對應目前spark的參數:

spark.shuffle.manager=hash

spark.shuffle.consolidateFiles=true

  3.1 shuffle且有排序需要的SortShuffleManager---普通運行機制

註:排序的shuffle推薦使用。

技術分享圖片

如上圖,內存結構達到一定閾值時落盤,落盤前在內存中先排序,分批落盤,每批1萬個,最後合並匯總生成成 1個文件+索引文件。

減少了中間文件輸出,只生成 上遊task個數 個文件。

對應目前spark的參數:

spark.shuffle.manager=sort

spark.shuffle.sort.bypassMergeThreshold,默認值200。如果該值配置較小,<= Shuffle read task數量,spark使用該模式

  3.2 shuffle但沒有排序需要的SortShuffleManager---byPass機制

技術分享圖片

如上圖,內存結構達到一定閾值時分批落盤,每批1萬個,最後合並匯總生成成 1個文件+索引文件。

減少了中間文件輸出,只生成 上遊task個數 個文件。

ByPass機制與普通運行模式對比,差別是不排序了,減少了性能損耗。

對應目前spark的參數:

spark.shuffle.manager=sort

spark.shuffle.sort.bypassMergeThreshold,默認值200。如果該值配置較大,> Shuffle read task數量,spark使用該模式

  【shuffle相關參數調優】

spark.shuffle.file.buffer

· 默認值:32KB

· 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之後,才會溢寫到磁盤。

· 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64KB),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

spark.reducer.maxSizeInFlight

· 默認值:48MB

· 參數說明:該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了下遊task每次能夠從上遊task生成的結果文件拉取多少數據。

· 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96MB),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

spark.shuffle.io.maxRetries

· 默認值:3

· 參數說明:shuffle read task從shuffle write task所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。

· 調優建議:對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如50次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

· 默認值:5秒

· 參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是5s。

· 調優建議:建議加大間隔時長(比如60秒),以增加shuffle操作的穩定性。

如下這個參數配置,推薦看我的另外一篇博客學習來理解和參數調優 Spark內存模型

spark.memory.useLegacyMode=true(使用spark1.6版本之前配置模式) 、spark.shuffle.memoryFraction

推薦及默認使用spark.memory.useLegacyMode=false(使用spark1.6版本及之後配置模式) 、spark.memory.fraction + spark.memory.storageFraction

·· 參數說明:該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例,默認是20%。

· 調優建議:如果內存充足,而且很少使用持久化操作,建議調高這個比例,給shuffle read的聚合操作更多內存,以避免由於內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升10%左右。

如下幾個配置參數,在【Shuffle原理學習筆記】中已說明過。

spark.shuffle.manager

· 默認值:sort

· 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5以後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之後的版本默認都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。

· 調優建議:由於SortShuffleManager默認會對數據進行排序,因此如果業務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果業務邏輯不需要對數據進行排序,那麽建議參考後面的幾個參數調優,通過bypass機制或優化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。tungsten-sort可能有坑,未使用過。

spark.shuffle.sort.bypassMergeThreshold

· 默認值:200

· 參數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最後會將每個task產生的所有臨時磁盤文件都合並成一個文件,並會創建單獨的索引文件。

· 調優建議:當使用SortShuffleManager時,如果的確不需要排序操作,那麽建議將這個參數調大一些,大於shuffle read task的數量。那麽此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

· 默認值:false

· 參數說明:如果使用HashShuffleManager,該參數有效。如果設置為true,那麽就會開啟consolidate機制,會大幅度合並shuffle write的輸出文件,對於shuffle read task數量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。

· 調優建議:如果的確不需要SortShuffleManager的排序機制,那麽除了使用bypass機制,還可以嘗試將spark.shffle.manager參數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發現其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。

【Spark調優】Shuffle原理學習與參數調優