SparkSQL Catalyst解析
Catalyst Optimizer是SparkSQL的核心元件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行的效能。
查詢優化器是一個SQL引擎的核心,開源常用的有Apache Calcite(很多開源元件都通過引入Calcite來實現查詢優化,如Hive/Phoenix/Drill等),另外一個是orca(HAWQ/GreenPlum/">GreenPlum中使用)。
關係代數是查詢優化器的理論基礎。常見的查詢優化技術:查詢重用(ReuseSubquery/ReuseExchange等)/RBO/CBO等。
SparkSQL執行流程
SparkSQL中對一條SQL語句的處理過程如上圖所示:
1) SqlParser將SQL語句解析成一個邏輯執行計劃(未解析)
2) Analyzer利用HiveMeta中表/列等資訊,對邏輯執行計劃進行解析(如表/列是否存在等)
3) SparkOptimizer利用Rule Based(基於經驗規則RBO)/Cost Based(基於代價CBO)的優化方法,對邏輯執行計劃進行優化(如謂詞下推/JoinReorder)
4) SparkPlanner將邏輯執行計劃轉換成物理執行計劃(如Filter -> FilterExec),
同時從某些邏輯運算元的多種物理運算元實現中根據RBO/CBO選擇其中一個合適的物理運算元(如Join的多個實現BroadcastJoin/SortMergeJoin/HashJoin中選擇一個實現)
5) PrepareForExecution是執行物理執行計劃之前做的一些事情,比如ReuseExchange/WholeStageCodegen的處理等等
6) 最終在SparkCore中執行該物理執行計劃。
接下來介紹Catalyst中的核心模組 SparkOptimizer/SparkPlanner
.
SparkOptimizer
使用已有的規則對邏輯執行計劃進行優化,該過程是基於經驗/啟發式的優化方法,得到優化過的邏輯執行計劃。
如上圖所示,Optimizer中有很多Batch,每個Batch中包含1個或多個Rule,Batch的另外一個屬性是迭代次數(Once/FixPoint預設100次),每個Batch內部Rule有前後執行順序,Batch之間也是按照順序來執行的。目前Optimizer中有60多個Rule。
備註:從Rule看JoinReorder在這個過程就已經處理了。
SparkPlanner
參考: ofollow,noindex" target="_blank">https://issues.apache.org/jira/browse/SPARK-1251
SparkPlanner將邏輯執行計劃轉換成物理執行計劃,即將邏輯執行計劃樹中的邏輯節點轉換成物理節點,如Join轉換成HashJoinExec/SortMergeJoinExec...,Filter轉成FilterExec等
Spark的 Stragety
有8個:
- DataSourceV2Strategy
- FileSourceStrategy
- DataSourceStrategy
- SpecialLimits
- Aggregation
- JoinSelection
- InMemoryScans
- BasicOperators
上述很多Stragety都是基於規則的策略。
JoinSelection用到了相關的統計資訊來選擇將Join轉換為BroadcastHashJoinExec還是ShuffledHashJoinExec還是SortMergeJoinExec,屬於CBO基於代價的策略。
PrepareForExecution
在執行之前,對物理執行計劃做一些處理,這些處理都是基於規則的,包括
- PlanSubqueries
- EnsureRequirements
- CollapseCodegenStages
- ReuseExchange
- ReuseSubquery
經過上述步驟之後生成的最終物理執行計劃提交到Spark執行。
CBO(基於代價)實現
CBO的實現有三個步驟如下,可以大致瞭解一下:
1. 統計資訊採集
Optimizer/Planner中CBO(基於代價)的優化需要採集統計資訊,包括表維度和列維度。
//包含表/列 case class Statistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil), hints: HintInfo = HintInfo()) //列 case class ColumnStat( distinctCount: BigInt, min: Option[Any], max: Option[Any], nullCount: BigInt, avgLen: Long, maxLen: Long, histogram: Option[Histogram] = None)
上面結構體用來儲存統計資訊,可以看出:
表維度:
大小/條數
列維度:
NDV/min/max/Null/平均長度/最大長度/直方圖
上述資訊需要提前使用Analyze命令進行採集
// 採集表維度的統計資訊,NOSCAN表示不掃描表(即只有表大小資訊,不採集表條數資訊) ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)] COMPUTE STATISTICS [NOSCAN]; // 採集列資訊 // 若spark.sql.statistics.histogram.enabled設定為true,則會採集直方圖資訊 // 採集直方圖資訊需要額外一次的表掃描 // 使用的是等高直方圖 // 只支援IntegralType/DoubleType/DecimalType/FloatType/DateType/TimestampType的列採集直方圖 ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
2.估算運算元統計資訊
邏輯執行計劃樹中只有葉子節點(表)有實際的統計資訊(通過Analyze獲取), 邏輯執行計劃樹中非葉子節點會根據子節點資訊以及估算方法獲取本節點的統計資訊。
/** * Returns the estimated statistics for the current logical plan node. Under the hood, this * method caches the return value, which is computed based on the configuration passed in the * first time. If the configuration changes, the cache can be invalidated by calling * [[invalidateStatsCache()]]. */ def stats: Statistics = statsCache.getOrElse { if (conf.cboEnabled) { statsCache = Option(BasicStatsPlanVisitor.visit(self)) } else { statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self)) } statsCache.get } def visit(p: LogicalPlan): T = p match { case p: Aggregate => visitAggregate(p) case p: Distinct => visitDistinct(p) case p: Except => visitExcept(p) case p: Expand => visitExpand(p) case p: Filter => visitFilter(p) case p: Generate => visitGenerate(p) case p: GlobalLimit => visitGlobalLimit(p) case p: Intersect => visitIntersect(p) case p: Join => visitJoin(p) case p: LocalLimit => visitLocalLimit(p) case p: Pivot => visitPivot(p) case p: Project => visitProject(p) case p: Repartition => visitRepartition(p) case p: RepartitionByExpression => visitRepartitionByExpr(p) case p: ResolvedHint => visitHint(p) case p: Sample => visitSample(p) case p: ScriptTransformation => visitScriptTransform(p) case p: Union => visitUnion(p) case p: Window => visitWindow(p) case p: LogicalPlan => default(p) }
每個運算元都有自己的預估方法
CBO開啟/關閉,有些運算元的預估方法不一樣,如AggregateEstimation/FilterEstimation/JoinEstimation/ProjectEstimation,其它運算元CBO開啟/關閉使用一套預估方法。
3.基於統計資訊的優化
統計資訊越準確,基於統計資訊的優化更準確,從目前程式碼看只有下面三種場景使用到了統計資訊。
JoinReorder
動態規劃
//代價函式 //weight可以通過引數控制spark.sql.cbo.joinReorder.card.weight,預設0.7 //根據行數/大小來計算代價 cost = rows * weight + size * (1 - weight) // 比較兩種Join的代價大小 def betterThan(other: JoinPlan, conf: SQLConf): Boolean = { if (other.planCost.card == 0 || other.planCost.size == 0) { false } else { val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card) val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size) relativeRows * conf.joinReorderCardWeight + relativeSize * (1 - conf.joinReorderCardWeight) < 1 } }
JoinSelection
根據Join兩個子節點的統計資訊,判斷使用BroadcastHashJoinExec還是ShuffledHashJoinExec還是SortMergeJoinExec,比如其中一個表(size)很小則可以使用BroadcastHashJoinExec。
StarSchemaDetection
探測星型模型,判斷一個列是否是表的主鍵(因為SparkSQL不支援主鍵設定)
/** * Determines if a column referenced by a base table access is a primary key. * A column is a PK if it is not nullable and has unique values. * To determine if a column has unique values in the absence of informational * RI constraints, the number of distinct values is compared to the total * number of rows in the table. If their relative difference * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based * on TPC-DS data results), the column is assumed to have unique values. */ private def isUnique( column: Attribute, plan: LogicalPlan): Boolean = plan match { case PhysicalOperation(_, _, t: LeafNode) => val leafCol = findLeafNodeCol(column, plan) leafCol match { case Some(col) if t.outputSet.contains(col) => val stats = t.stats stats.rowCount match { case Some(rowCount) if rowCount >= 0 => if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) { val colStats = stats.attributeStats.get(col) if (colStats.get.nullCount > 0) { false } else { val distinctCount = colStats.get.distinctCount val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d) // ndvMaxErr adjusted based on TPCDS 1TB data results relDiff <= conf.ndvMaxError * 2 } } else { false } case None => false } case None => false } case _ => false }