SparkSQL執行時引數優化
近期接手了不少大資料表任務排程補資料的工作,補數時發現資源消耗異常的大且執行速度卻不怎麼給力.
發現根本原因在於sparkSQL配置有諸多問題,解決後總結出來就當拋磚引玉了.
-
具體現象
- 記憶體CPU比例失調 一個Spark任務消耗 120(executor)*4G = 480G記憶體僅僅使用120個 core.幾個SprakSQL任務就將整個系統資源吃光.
- 設定超過40個executor,但未指定分割槽數,導致多數executor空閒.
-
原因分析
-
SparkSQL配置時Core與記憶體比例不恰當
-
沒有指定executor核心數
-
未進行其他配置引數優化
-
解決辦法
-
在配置SparkSQL任務時指定executor核心數 建議為4
(同一executor [程序]內記憶體共享,當資料傾斜時,使用相同核心數與記憶體量的兩個任務,executor總量少 的任務不容易OOM,因為單核心最大可用記憶體大.但是並非越大越好,因為單個exector最大core受伺服器剩餘core數量限制,過大的core 數量可能導致資源分配不足) -
設定spark.default.parallelism=600 每個stage的預設task數量
(計算公式為num-executors * executor-cores 系統預設值分割槽為40,這是導致executor並行度上不去的罪魁禍首,之所以這樣計算是為了儘量避免計算最慢的task 決定整個stage的時間,將其設定為總核心的2-3倍,讓執行快的task可以繼續領取任務計算直至全部任務計算完畢) -
開啟spark.sql.auto.repartition=true 自動重新分割槽
(每個stage [階段]執行時分割槽並不盡相同,使用此配置可優化計算後分區數,避免分割槽數過大導致單個分割槽資料量過少,每個task運算分割槽資料時時間過短,從而導致task頻繁排程消耗過多時間) -
設定spark.sql.shuffle.partitions=400 提高shuffle並行度
(shuffle read task的並行度) -
設定spark.shuffle.service.enabled=true 提升shuffle效率 --!並未測試
(Executor 程序除了執行task 也要進行寫shuffle 資料,當Executor程序任務過重時,導致GC不能為其他Executor提供shuffle資料時將會影響效率.此服務開啟時代替Executor來抓取shuffle資料)
-
在配置SparkSQL任務時指定executor核心數 建議為4
前後資源配置對比
型別 | 記憶體數量 | cpu核心數量 | executor數量 | executor記憶體 | 單核心記憶體 |
---|---|---|---|---|---|
系統資源總量 | 7168G | 3500 | - | - | 2G |
目前一個任務 | 480G | 120 | 120 | 4G | 4G |
優化後 | 480G | 240 | 60 | 8G | 2G |
以下為SparkSQL調優相關設定
以下列表中動態資源分配相關不建議使用
//1.下列Hive引數對Spark同樣起作用。
set hive.exec.dynamic.partition=true; // 是否允許動態生成分割槽
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分割槽全部動態生成
set hive.exec.max.dynamic.partitions = 100; // 動態生成的最多分割槽數
//2.執行行為
set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做廣播的閾值
set spark.dynamicAllocation.enabled; // 開啟動態資源分配
set spark.dynamicAllocation.maxExecutors; //開啟動態資源分配後,最多可分配的Executor數
set spark.dynamicAllocation.minExecutors; //開啟動態資源分配後,最少可分配的Executor數
set spark.sql.shuffle.partitions; // 需要shuffle是mapper端寫出的partition個數
set spark.sql.adaptive.enabled; // 是否開啟調整partition功能,如果開啟,spark.sql.shuffle.partitions設定的partition可能會被合併到一個reducer裡執行
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //開啟spark.sql.adaptive.enabled後,兩個partition的和低於該閾值會合併到一個reducer
set spark.sql.adaptive.minNumPostShufflePartitions; // 開啟spark.sql.adaptive.enabled後,最小的分割槽數
set spark.Hadoop.mapreduce.input.fileinputformat.split.maxsize; //當幾個stripe的大小大於該值時,會合併到一個task中處理
//3.executor能力
set spark.executor.memory; // executor用於快取資料、程式碼執行的堆記憶體以及JVM執行時需要的記憶體
set spark.yarn.executor.memoryOverhead; //Spark執行還需要一些堆外記憶體,直接向系統申請,如資料傳輸時的netty等。
set spark.sql.windowExec.buffer.spill.threshold; //當用戶的SQL中包含視窗函式時,並不會把一個視窗中的所有資料全部讀進記憶體,而是維護一個快取池,當池中的資料條數大於該引數表示的閾值時,spark將資料寫到磁碟
set spark.executor.cores; //單個executor上可以同時執行的task數
Linux公社的RSS地址 :https://www.linuxidc.com/rssFeed.aspx
本文永久更新連結地址:https://www.linuxidc.com/Linux/2019-03/157881.htm