1. 程式人生 > >spark sql 內建配置(V2.2)

spark sql 內建配置(V2.2)

最近整理了一下spark SQL內建配。加粗配置項是對sparkSQL 調優效能影響比較大的項,小夥伴們按需酌情配置。後續會挑出一些通用調優配置,共大家參考。有不正確的地方,歡迎大家在留言區留言討論。 

配置項 預設值 概述
spark.sql.optimizer.maxIterations 100 sql優化器最大迭代次數
spark.sql.optimizer.inSetConversionThreshold 10 插入轉換的集合大小閾值
spark.sql.inMemoryColumnarStorage.compressed TRUE 當設定為true時,SCAPK SQL將根據資料的統計自動為每個列選擇壓縮編解碼器
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制用於列快取的批處理的大小。較大的批處理大小可以提高記憶體利用率和壓縮率,但快取資料時會出現OOM風險
spark.sql.inMemoryColumnarStorage.partitionPruning  TRUE 啟用記憶體中的列表分割槽剪枝
spark.sql.join.preferSortMergeJoin TRUE When true, 使用sort merge join 代替 shuffle hash join
spark.sql.sort.enableRadixSort
TRUE 使用基數排序,基數排序效能非常快,但是會額外使用over heap.當排序比較小的Row時,overheap 需要提高50%
spark.sql.autoBroadcastJoinThreshold 10L * 1024 * 1024 當執行join時,被廣播到worker節點上表最大位元組。當被設定為-1,則禁用廣播。當前僅僅支援 Hive Metastore tables,表大小的統計直接基於hive表的原始檔大小
spark.sql.limit.scaleUpFactor 4 在執行查詢時,兩次嘗試之間讀取partation數目的增量。較高的值會導致讀取過多分割槽,較少的值會導致執行時間過長,因為浙江執行更多的作業
spark.sql.statistics.fallBackToHdfs FALSE 當不能從table metadata中獲取表的統計資訊,返回到hdfs。這否有用取決與表是否足夠小到能夠使用auto broadcast joins
spark.sql.defaultSizeInBytes Long.MaxValue 在查詢計劃中表預設大小,預設被設定成Long.MaxValue 大於spark.sql.autoBroadcastJoinThreshold的值,也就意味著預設情況下不會廣播一個表,除非他足夠小
spark.sql.shuffle.partitions 200 當為join/aggregation  shuffle資料時,預設partition的數量
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 64 * 1024 * 1024byte The target post-shuffle input size in bytes of a task.
spark.sql.adaptive.enabled FALSE 是否開啟adaptive query execution(自適應查詢執行)
spark.sql.adaptive.minNumPostShufflePartitions -1 測試用
spark.sql.subexpressionElimination.enabled TRUE When true, common subexpressions will be eliminated
當為真時,將刪除公共子表示式
spark.sql.caseSensitive FALSE 查詢分析器是否區分大小寫,預設情況下不區分。強烈建議不區分大小寫
spark.sql.constraintPropagation.enabled 是否開啟優化,在查詢優化器期間推斷和傳播查詢計劃中的資料約束。對於某種型別的查詢計劃(例如有大量謂語和別名的查詢),約束傳播是昂貴的,會對整個執行時間產生負面影響。
spark.sql.parser.escapedStringLiterals FALSE 2.0之前預設值為true,知否預設是否。正常文字能否包含在正則表示式中。
spark.sql.parquet.mergeSchema FALSE 若為true,在讀取parquet資料來源時,schema從所有檔案中合併出來。否則如果沒有可用的摘要檔案,則從概要檔案或隨機檔案中選擇模式
spark.sql.parquet.respectSummaryFiles FALSE 若為ture,假設parquet的所有部分檔案和概要檔案一致,在合併模式時會忽略他們。否則將會合並所有的部分檔案
spark.sql.parquet.binaryAsString FALSE 是否向下相容其他parquet生產系統(eg impala or older version spark sql ),不區分位元組資料和string資料寫到parquet schema,這個配置促使spark sql將二進位制資料作為string達到相容
spark.sql.parquet.int96AsTimestamp TRUE 是否使用Int96作為timestamp的儲存格式,可以避免精度損失丟失納秒部分,為其他parquet系統提供相容(impala)
spark.sql.parquet.int64AsTimestampMillis FALSE 當為true,timestamp值將以Int64作為mlibs的儲存擴充套件型別,這種模式微秒將被丟棄
spark.sql.parquet.cacheMetadata TRUE 是否快取parquet的schema資料元,可以提升靜態資料的查詢效能
spark.sql.parquet.compression.codec snappy 支援型別:uncompressed", "snappy", "gzip", "lzo"。 指定parquet寫檔案的壓縮編碼方式
spark.sql.parquet.filterPushdown TRUE 是否開啟parquet過濾條件下推
spark.sql.parquet.writeLegacyFormat FALSE spark sql在拼接schema時是否遵循parquet的schema的規範
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter parquet輸出提交器類,同城必須是org.apache.hadoop.mapreduce.OutputCommitter的子類,如果不是將不會建立資料來源摘要,即使配置開啟了parquet.enable.summary-metadata
spark.sql.parquet.enableVectorizedReader TRUE 開啟parquet向量解碼
spark.sql.orc.filterPushdown FALSE 是否開啟條件下推到orc檔案寫
spark.sql.hive.verifyPartitionPath FALSE 當為true時,在讀取HDFS中儲存的資料時,檢查表根目錄下的所有分割槽路徑
spark.sql.hive.metastorePartitionPruning TRUE 當為true,spark sql的謂語將被下推到hive metastore中,更早的消除不匹配的分割槽,會影響到違背轉換成檔案源關係的hive表
spark.sql.hive.manageFilesourcePartitions TRUE 是否使用hive metastore管理spark sql的 dataSource表分割槽,若為true,dataSource表會在執行計劃期間使用分割槽剪枝 
spark.sql.hive.filesourcePartitionFileCacheSize 250 * 1024 * 1024 當非0時,開啟將分割槽檔案資料元快取到記憶體中,所有表共享一個快取,當開啟 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)時才會生效
spark.sql.hive.caseSensitiveInferenceMode INFER_AND_SAVE 設定無法從hive表屬性讀取分割槽大小寫模式時所採取的操作,雖然Spice SQL本身不區分大小寫,但hive相容的檔案格式如parquet。Spark sql必須使用一個保持情況的模式,當查詢由包含區分大小寫欄位名或查詢的檔案支援的任何表可能無法返回準確的結果時。有效選項包括INFER_AND_SAVE(預設模式——從基礎資料檔案推斷出區分大小寫的模式,並將其寫入表屬性),INFER_ONLY(推斷schema但不嘗試將其寫入表屬性)和NEVER_INFER(回退到使用區分大小寫間接轉移模式代替推斷)
spark.sql.optimizer.metadataOnly TRUE 當為true時,啟用僅使用表的元資料的元資料查詢優化來生成分割槽列,而不是表掃描。當掃描的所有列都是分割槽列,並且查詢具有滿足不同語義的聚合運算子時,它適用。
spark.sql.columnNameOfCorruptRecord _corrupt_record 當json/csv資料內部列解析失敗時,失敗列的名稱
spark.sql.broadcastTimeout" 5*60 在broadCast join時 ,廣播等待的超時時間
spark.sql.thriftserver.scheduler.pool 為JDBC客戶端會話設定公平排程程式池
spark.sql.thriftServer.incrementalCollect FALSE 當TRUE時,啟用增量集合以在thrift server中執行
spark.sql.thriftserver.ui.retainedStatements 200 JDBC/ODBC Web使用者介面歷史記錄中SQL語句的數量
spark.sql.thriftserver.ui.retainedSessions 200 JDBC/ODBC Web UI歷史中儲存的SQL客戶端會話數
spark.sql.sources.default parquet 輸入輸出預設資料元
spark.sql.hive.convertCTAS FALSE 如果時true,將使用spark.sql.sources.default.設定資料來源,不指定任何儲存屬性到hive ctas語句
spark.sql.hive.gatherFastStats TRUE 在修復表分割槽時,將快速收集STATS(檔案數量和所有檔案的總大小),以避免HIVE轉移子中的順序列表。
spark.sql.sources.partitionColumnTypeInference.enabled TRUE 是否自動推斷分割槽列的資料型別
spark.sql.sources.bucketing.enabled TRUE 當false時,分桶表當作普通表處理
spark.sql.crossJoin.enabled FALSE 當false時,如果查詢中語法笛卡兒積 卻語法中沒有顯示join,將會丟擲異常
spark.sql.orderByOrdinal TRUE 當為true時,排序欄位放置到seleect List,否則被忽略
spark.sql.groupByOrdinal TRUE 當為true時,按組子句的序號被視為選擇列表中的位置。當為false時,序數被忽略。
spark.sql.groupByAliases TRUE group by後的別名是否能夠被用到 select list中,若為否將丟擲分析異常
spark.sql.sources.parallelPartitionDiscovery.threshold 32 允許在driver端列出檔案的最大路徑數。如果在分割槽發現期間檢測到的路徑的數量超過該值,則嘗試用另一個SCAPLE分散式作業來列出檔案。這適用於parquet、ORC、CSV、JSON和LIbSVM資料來源。
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 遞迴地列出路徑集合的並行數,設定阻止檔案列表生成太多工的序號
spark.sql.selfJoinAutoResolveAmbiguity TRUE 自動解決子連結中的連線條件歧義,修復bug SPARK-6231
spark.sql.retainGroupColumns TRUE 是否保留分組列
spark.sql.pivotMaxValues 10000
spark.sql.runSQLOnFiles TRUE 當為true,在sql查詢時,能夠使用dataSource.path作為表(eg:"select a,b from hdfs://xx/xx/*")
spark.sql.codegen.wholeStage TRUE 當為true,多個運算元的整個stage將被便宜到一個java方法中
spark.sql.codegen.maxFields 100 在啟用整個stage codegen之前支援的最大欄位(包括巢狀欄位)
spark.sql.codegen.fallback TRUE 當為true,在整個stage的codegen,對於編譯generated code 失敗的query 部分,將會暫時關閉
spark.sql.codegen.maxCaseBranches 20 支援最大的codegen
spark.sql.files.maxPartitionBytes 128 * 1024 * 1024 在讀取檔案時,一個分割槽最大被讀取的數量,預設值=parquet.block.size
spark.sql.files.openCostInBytes 4 * 1024 * 1024 為了測定開啟一個檔案的耗時,通過同時掃描配置的位元組數來測定,最好是過度估計,那麼小檔案的分割槽將比具有較大檔案的分割槽更快(首先排程
spark.sql.files.ignoreCorruptFiles FALSE 是否自動跳過不正確的檔案
spark.sql.files.maxRecordsPerFile 0 寫入單個檔案的最大條數,如果時0或者負數,則無限制
spark.sql.exchange.reuse TRUE planer是否嘗試找出重複的 exchanges並複用
spark.sql.streaming.stateStore.minDeltasForSnapshot 10 在合併成快照之前需要生成的狀態儲存增量檔案的最小數目
spark.sql.streaming.checkpointLocation 檢查點資料流的查詢的預設儲存位置
spark.sql.streaming.minBatchesToRetain 100 流式計算最小批次長度
spark.sql.streaming.unsupportedOperationCheck TRUE streaming query的logical plan 檢查不支援的操作
spark.sql.variable.substitute TRUE
spark.sql.codegen.aggregate.map.twolevel.enable 啟用兩級聚合雜湊對映。當啟用時,記錄將首先“插入/查詢第一級、小、快的對映,然後在第一級滿或無法找到鍵時回落到第二級、更大、較慢的對映。當禁用時,記錄直接進入第二級。預設為真
spark.sql.view.maxNestedViewDepth 100 巢狀檢視中檢視引用的最大深度。巢狀檢視可以引用其他巢狀檢視,依賴關係被組織在有向無環圖(DAG)中。然而,DAG深度可能變得太大,導致意外的行為。此配置限制了這一點:當分析期間檢視深度超過該值時,我們終止解析度以避免潛在錯誤。
spark.sql.objectHashAggregate.sortBased.fallbackThreshold 128 在ObjectHashAggregateExec的情況下,當記憶體中雜湊對映的大小增長過大時,我們將回落到基於排序的聚合。此選項為雜湊對映的大小設定行計數閾值。
spark.sql.execution.useObjectHashAggregateExec TRUE 是否使用 ObjectHashAggregateExec
spark.sql.streaming.fileSink.log.deletion TRUE 是否刪除檔案流接收器中的過期日誌檔案
spark.sql.streaming.fileSink.log.compactInterval 10 日誌檔案合併閾值,然後將所有以前的檔案壓縮到下一個日誌檔案中
spark.sql.streaming.fileSink.log.cleanupDelay 10min 保證一個日誌檔案被所有使用者可見的時長
spark.sql.streaming.fileSource.log.deletion TRUE 是否刪除檔案流源中過期的日誌檔案
spark.sql.streaming.fileSource.log.compactInterval 10 日誌檔案合併閾值,然後將所有以前的檔案壓縮到下一個日誌檔案中
spark.sql.streaming.fileSource.log.cleanupDelay 10min 保證一個日誌檔案被所有使用者可見的時長
spark.sql.streaming.schemaInference FALSE 基於檔案的流,是否推斷它的模式
spark.sql.streaming.pollingDelay 10L(MILLISECONDS) 在沒有資料可用時延遲查詢新資料多長時間
spark.sql.streaming.noDataProgressEventInterval 10000L(MILLISECONDS) 在沒有資料的情況下,在兩個進度事件之間等待時間
spark.sql.streaming.metricsEnabled FALSE 是否為活動流查詢報告DoopWalth/CODAHALE度量
spark.sql.streaming.numRecentProgressUpdates 100 streaming query 保留的進度更新數量
spark.sql.statistics.ndv.maxError 0.05 生成列級統計量時超對數G+++演算法允許的最大估計誤差
spark.sql.cbo.enabled FALSE 在設定true時啟用CBO來估計計劃統計資訊
spark.sql.cbo.joinReorder.enabled FALSE Enables join reorder in CBO.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm
spark.sql.cbo.joinReorder.card.weight 0.07 The weight of cardinality (number of rows) for plan cost comparison in join reorder:  rows * weight + size * (1 - weight)
spark.sql.cbo.joinReorder.dp.star.filter FALSE Applies star-join filter heuristics to cost based join enumeration
spark.sql.cbo.starSchemaDetection FALSE When true, it enables join reordering based on star schema detection
spark.sql.cbo.starJoinFTRatio 0.9 Specifies the upper limit of the ratio between the largest fact tables  for a star join to be considered
spark.sql.session.timeZone TimeZone.getDefault.getID  時間時區
spark.sql.windowExec.buffer.in.memory.threshold 4096 視窗操作符保證儲存在記憶體中的行數的閾值
spark.sql.windowExec.buffer.spill.threshold spark.sql.windowExec.buffer.in.memory.threshold 視窗操作符溢位的行數的閾值
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold Int.MaxValue 由sortMergeJoin運算子保證儲存在記憶體中的行數的閾值
spark.sql.sortMergeJoinExec.buffer.spill.threshold spark.sql.sortMergeJoinExec.buffer.in.memory.threshold 由排序合併連線運算子溢位的行數的閾值
spark.sql.cartesianProductExec.buffer.in.memory.threshold 4096 笛卡爾乘積運算元保證儲存在記憶體中的行數的閾值
spark.sql.cartesianProductExec.buffer.spill.threshold spark.sql.cartesianProductExec.buffer.in.memory.threshold 笛卡爾乘積運算元溢位的行數閾值
spark.sql.redaction.options.regex "(?i)url".r

即便join的hive表沒有10M,也沒有觸發 mapjoin[解決方案]

spark在join的時候,用來判斷一個表的大小是否達到了10M這個限制,是不會去計算這個表在hdfs上的具體的檔案大小的,而是使用hive metadata中的資訊,具體如下圖:

explain出來spark的執行計劃如下:

== Physical Plan ==

*Project [device#57, pkg#58]

+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRight

:- *Filter isnotnull(pkg#58)

:  +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]

+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))

+- *Filter isnotnull(apppkg#62)

+- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping

當有些hive沒有totalSize這個資訊的時候,spark就會用sortMergeJoin來做join了,可以使用下面的命令重新生成metadata資訊:

ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS