Spark SQL Catalyst原始碼分析之Physical Plan
阿新 • • 發佈:2019-01-06
前面幾篇文章主要介紹的是spark sql包裡的的spark sql執行流程,以及Catalyst包內的SqlParser,Analyzer和Optimizer,最後要介紹一下Catalyst裡最後的一個Plan了,即Physical Plan。物理計劃是Spark SQL執行Spark job的前置,也是最後一道計劃。
如圖:
一、SparkPlanner
話接上回,Optimizer接受輸入的Analyzed Logical Plan後,會有SparkPlanner來對Optimized Logical Plan進行轉換,生成Physical plans。SparkPlanner的apply方法,會返回一個Iterator[PhysicalPlan]。lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next()
SparkPlanner繼承了SparkStrategies,SparkStrategies繼承了QueryPlanner。
SparkStrategies包含了一系列特定的Strategies,這些Strategies是繼承自QueryPlanner中定義的Strategy,它定義接受一個Logical Plan,生成一系列的Physical Plan
QueryPlanner 是SparkPlanner的基類,定義了一系列的關鍵點,如Strategy,planLater和apply。@transient protected[sql] val planner = new SparkPlanner protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext val sqlContext: SQLContext = self def numPartitions = self.numShufflePartitions //partitions的個數 val strategies: Seq[Strategy] = //策略的集合 CommandStrategy(self) :: TakeOrdered :: PartialAggregation :: LeftSemiJoin :: HashJoin :: InMemoryScans :: ParquetOperations :: BasicOperators :: CartesianProduct :: BroadcastNestedLoopJoin :: Nil etc...... }
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[Strategy] /** * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can * be used for execution. If this strategy does not apply to the give logical operation then an * empty list should be returned. */ abstract protected class Strategy extends Logging { def apply(plan: LogicalPlan): Seq[PhysicalPlan] //接受一個logical plan,返回Seq[PhysicalPlan] } /** * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be * filled in automatically by the QueryPlanner using the other execution strategies that are * available. */ protected def planLater(plan: LogicalPlan) = apply(plan).next() //返回一個佔位符,佔位符會自動被QueryPlanner用其它的strategies apply def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator //整合所有的Strategy,_(plan)每個Strategy應用plan上,得到所有Strategies執行完後生成的所有Physical Plan的集合,一個iter assert(iter.hasNext, s"No plan for $plan") iter //返回所有物理計劃 } }
繼承關係:
二、Spark Plan
Spark Plan是Catalyst裡經過所有Strategies apply 的最終的物理執行計劃的抽象類,它只是用來執行spark job的。 lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
prepareForExecution其實是一個RuleExecutor[SparkPlan],當然這裡的Rule就是SparkPlan了。 @transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, AddExchange(self)) :: //新增shuffler操作如果必要的話
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil //Bind references
}
Spark Plan繼承Query Plan[Spark Plan],裡面定義的partition,requiredChildDistribution以及spark sql啟動執行的execute方法。abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
self: Product =>
// TODO: Move to `DistributedPlan`
/** Specifies how data is partitioned across different nodes in the cluster. */
def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!
/** Specifies any partition requirements on the input data for this operator. */
def requiredChildDistribution: Seq[Distribution] =
Seq.fill(children.size)(UnspecifiedDistribution)
/**
* Runs this query returning the result as an RDD.
*/
def execute(): RDD[Row] //真正執行查詢的方法execute,返回的是一個RDD
/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[Row] = execute().map(_.copy()).collect() //exe & collect
protected def buildRow(values: Seq[Any]): Row = //根據當前的值,生成Row物件,其實是一個封裝了Array的物件。
new GenericRow(values.toArray)
}
關於Spark Plan的繼承關係,如圖:
三、Strategies
Strategy,注意這裡Strategy是在execution包下的,在SparkPlanner裡定義了目前的幾種策略:LeftSemiJoin、HashJoin、PartialAggregation、BroadcastNestedLoopJoin、CartesianProduct、TakeOrdered、ParquetOperations、InMemoryScans、BasicOperators、CommandStrategy
3.1、LeftSemiJoin
Join分為好幾種類型:case object Inner extends JoinType
case object LeftOuter extends JoinType
case object RightOuter extends JoinType
case object FullOuter extends JoinType
case object LeftSemi extends JoinType
如果Logical Plan裡的Join是joinType為LeftSemi的話,就會執行這種策略,這裡ExtractEquiJoinKeys是一個pattern定義在patterns.scala裡,主要是做模式匹配用的。
這裡匹配只要是等值的join操作,都會封裝為ExtractEquiJoinKeys物件,它會解析當前join,最後返回(joinType, rightKeys, leftKeys, condition, leftChild, rightChild)的格式。
最後返回一個execution.LeftSemiJoinHash這個Spark Plan,可見Spark Plan的類圖繼承關係圖。
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
val semiJoin = execution.LeftSemiJoinHash( //根據解析後的Join,例項化execution.LeftSemiJoinHash這個Spark Plan 返回
leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
// no predicate can be evaluated by matching hash keys
case logical.Join(left, right, LeftSemi, condition) => //沒有Join key的,即非等值join連線的,返回LeftSemiJoinBNL這個Spark Plan
execution.LeftSemiJoinBNL(
planLater(left), planLater(right), condition)(sqlContext) :: Nil
case _ => Nil
}
}
3.2、HashJoin
HashJoin是我們最見的操作,innerJoin型別,裡面提供了2種Spark Plan,BroadcastHashJoin 和 ShuffledHashJoinBroadcastHashJoin的實現是一種廣播變數的實現方法,如果設定了spark.sql.join.broadcastTables這個引數的表(表面逗號隔開)
就會用spark的Broadcast Variables方式先將一張表給查詢出來,然後廣播到各個機器中,相當於Hive中的map join。
ShuffledHashJoin是一種最傳統的預設的join方式,會根據shuffle key進行shuffle的hash join。
object HashJoin extends Strategy with PredicateHelper {
private[this] def broadcastHashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: LogicalPlan,
right: LogicalPlan,
condition: Option[Expression],
side: BuildSide) = {
val broadcastHashJoin = execution.BroadcastHashJoin(
leftKeys, rightKeys, side, planLater(left), planLater(right))(sqlContext)
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}
def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer //獲取需要廣播的表
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: BaseRelation))
if broadcastTables.contains(b.tableName) => //如果右孩子是廣播的表,則buildSide取BuildRight
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
condition,
left @ PhysicalOperation(_, _, b: BaseRelation),
right)
if broadcastTables.contains(b.tableName) =>//如果左孩子是廣播的表,則buildSide取BuildLeft
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.ShuffledHashJoin( //根據hash key shuffle的 Hash Join
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
}
}
3.3、PartialAggregation
PartialAggregation是一個部分聚合的策略,即有些聚合操作可以在local裡面完成的,就在local data裡完成,而不必要的去shuffle所有的欄位。object PartialAggregation extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Aggregate(groupingExpressions, aggregateExpressions, child) =>
// Collect all aggregate expressions.
val allAggregates =
aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a })
// Collect all aggregate expressions that can be computed partially.
val partialAggregates =
aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p })
// Only do partial aggregation if supported by all aggregate expressions.
if (allAggregates.size == partialAggregates.size) {
// Create a map of expressions to their partial evaluations for all aggregate expressions.
val partialEvaluations: Map[Long, SplitEvaluation] =
partialAggregates.map(a => (a.id, a.asPartial)).toMap
// We need to pass all grouping expressions though so the grouping can happen a second
// time. However some of them might be unnamed so we alias them allowing them to be
// referenced in the second aggregation.
val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map {
case n: NamedExpression => (n, n)
case other => (other, Alias(other, "PartialGroup")())
}.toMap
// Replace aggregations with a new expression that computes the result from the already
// computed partial evaluations and grouping values.
val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp {
case e: Expression if partialEvaluations.contains(e.id) =>
partialEvaluations(e.id).finalEvaluation
case e: Expression if namedGroupingExpressions.contains(e) =>
namedGroupingExpressions(e).toAttribute
}).asInstanceOf[Seq[NamedExpression]]
val partialComputation =
(namedGroupingExpressions.values ++
partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq
// Construct two phased aggregation.
execution.Aggregate( //返回execution.Aggregate這個Spark Plan
partial = false,
namedGroupingExpressions.values.map(_.toAttribute).toSeq,
rewrittenAggregateExpressions,
execution.Aggregate(
partial = true,
groupingExpressions,
partialComputation,
planLater(child))(sqlContext))(sqlContext) :: Nil
} else {
Nil
}
case _ => Nil
}
}
3.4、BroadcastNestedLoopJoin
BroadcastNestedLoopJoin是用於Left Outer Join, RightOuter, FullOuter這三種類型的join而上述的Hash Join僅僅用於InnerJoin,這點要區分開來。
object BroadcastNestedLoopJoin extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, joinType, condition) =>
execution.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil
case _ => Nil
}
}
部分程式碼; if (!matched && (joinType == LeftOuter || joinType == FullOuter)) { //LeftOuter or FullOuter
matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null))
}
}
Iterator((matchedRows, includedBroadcastTuples))
}
val includedBroadcastTuples = streamedPlusMatches.map(_._2)
val allIncludedBroadcastTuples =
if (includedBroadcastTuples.count == 0) {
new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
} else {
streamedPlusMatches.map(_._2).reduce(_ ++ _)
}
val rightOuterMatches: Seq[Row] =
if (joinType == RightOuter || joinType == FullOuter) { //RightOuter or FullOuter
broadcastedRelation.value.zipWithIndex.filter {
case (row, i) => !allIncludedBroadcastTuples.contains(i)
}.map {
// TODO: Use projection.
case (row, _) => buildRow(Vector.fill(left.output.size)(null) ++ row)
}
} else {
Vector()
}
3.5、CartesianProduct
笛卡爾積的Join,有待過濾條件的Join。
主要是利用RDD的cartesian實現的。
object CartesianProduct extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, _, None) =>
execution.CartesianProduct(planLater(left), planLater(right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
execution.Filter(condition,
execution.CartesianProduct(planLater(left), planLater(right))) :: Nil
case _ => Nil
}
}
3.6、TakeOrdered
TakeOrdered是用於Limit操作的,如果有Limit和Sort操作。則返回一個TakeOrdered的Spark Plan。
主要也是利用RDD的takeOrdered方法來實現的排序後取TopN。
object TakeOrdered extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
execution.TakeOrdered(limit, order, planLater(child))(sqlContext) :: Nil
case _ => Nil
}
}
3.7、ParquetOperations
支援ParquetOperations的讀寫,插入Table等。 object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sqlContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
(filters: Seq[Expression]) => {
filters.filter { filter =>
// Note: filters cannot be pushed down to Parquet if they contain more complex
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
// all filters that have been pushed down. Note that a predicate such as
// "(A AND B) OR C" can result in "A OR C" being pushed down.
val recordFilter = ParquetFilters.createFilter(filter)
if (!recordFilter.isDefined) {
// First case: the pushdown did not result in any record filter.
true
} else {
// Second case: a record filter was created; here we are conservative in
// the sense that even if "A" was pushed and we check for "A AND B" we
// still want to keep "A AND B" in the higher-level filter, not just "B".
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
}
}
}
} else {
identity[Seq[Expression]] _
}
pruneFilterProject(
projectList,
filters,
prunePushedDownFilters,
ParquetTableScan(_, relation, filters)(sqlContext)) :: Nil
case _ => Nil
}
}
3.8、InMemoryScans
InMemoryScans主要是對InMemoryRelation這個Logical Plan操作。呼叫的其實是Spark Planner裡的pruneFilterProject這個方法。
object InMemoryScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
pruneFilterProject(
projectList,
filters,
identity[Seq[Expression]], // No filters are pushed down.
InMemoryColumnarTableScan(_, mem)) :: Nil
case _ => Nil
}
}
3.9、BasicOperators
所有定義在org.apache.spark.sql.execution裡的基本的Spark Plan,它們都在org.apache.spark.sql.execution包下basicOperators.scala內的有Project、Filter、Sample、Union、Limit、TakeOrdered、Sort、ExistingRdd。
這些是基本元素,實現都相對簡單,基本上都是RDD裡的方法來實現的。
object BasicOperators extends Strategy {
def numPartitions = self.numPartitions
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
execution.Aggregate(
partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
val dataAsRdd =
sparkContext.parallelize(data.map(r =>
new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row))
execution.ExistingRdd(output, dataAsRdd) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.Limit(limit, planLater(child))(sqlContext) :: Nil
case Unions(unionChildren) =>
execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.NoRelation =>
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil
case _ => Nil
}
}
3.10 CommandStrategy
CommandStrategy是專門針對Command型別的Logical Plan即set key = value 、 explain sql、 cache table xxx 這類操作
SetCommand主要實現方式是SparkContext的引數
ExplainCommand主要實現方式是利用executed Plan打印出tree string
CacheCommand主要實現方式SparkContext的cache table和uncache table
case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(logicalPlan) =>
Seq(execution.ExplainCommand(logicalPlan, plan.output)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommand(tableName, cache)(context))
case _ => Nil
}
}
四、Execution
Spark Plan的Execution方式均為呼叫其execute()方法生成RDD,除了簡單的基本操作例如上面的basic operator實現比較簡單,其它的實現都比較複雜,大致的實現我都在上面介紹了,本文就不詳細討論了。五、總結
本文從介紹了Spark SQL的Catalyst框架的Physical plan以及其如何從Optimized Logical Plan轉化為Spark Plan的過程,這個過程用到了很多的物理計劃策略Strategies,每個Strategies最後還是在RuleExecutor裡面被執行,最後生成一系列物理計劃Executed Spark Plans。Spark Plan是執行前最後一種計劃,當生成executed spark plan後,就可以呼叫collect()方法來啟動Spark Job來進行Spark SQL的真正執行了。
——EOF——