第四篇:Spark SQL Catalyst源碼分析之TreeNode Library
/** Spark SQL源碼分析系列文章*/
前幾篇文章介紹了Spark SQL的Catalyst的核心運行流程、SqlParser,和Analyzer,本來打算直接寫Optimizer的,但是發現忘記介紹TreeNode這個Catalyst的核心概念,介紹這個可以更好的理解Optimizer是如何對Analyzed Logical Plan進行優化的生成Optimized Logical Plan,本文就將TreeNode基本架構進行解釋。
一、TreeNode類型
TreeNode Library是Catalyst的核心類庫,語法樹的構建都是由一個個TreeNode組成。TreeNode本身是一個BaseType <: TreeNode[BaseType] 的類型,並且實現了Product這個trait,這樣可以存放異構的元素了。
TreeNode有三種形態:BinaryNode
在Catalyst裏,這些Node都是繼承自Logical Plan,可以說每一個TreeNode節點就是一個Logical Plan(包含Expression)(直接繼承自TreeNode)
主要繼承關系類圖如下:
1、BinaryNode
二元節點,即有左右孩子的二叉節點
[java] view plain copy
- [[TreeNode]] that has two children, [[left]] and [[right]].
- trait BinaryNode[BaseType <: TreeNode[BaseType]] {
- def left: BaseType
- def right: BaseType
- def children = Seq(left, right)
- }
- abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
- self: Product =>
- }
節點定義比較簡單,左孩子,右孩子都是BaseType。 children是一個Seq(left, right)
下面列出主要繼承二元節點的類,可以當查詢手冊用 :)
這裏提示下平常常用的二元節點:Join和Union
2、UnaryNode
一元節點,即只有一個孩子節點
[java] view plain copy- A [[TreeNode]] with a single [[child]].
- trait UnaryNode[BaseType <: TreeNode[BaseType]] {
- def child: BaseType
- def children = child :: Nil
- }
- abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
- self: Product =>
- }
下面列出主要繼承一元節點的類,可以當查詢手冊用 :)
常用的二元節點有,Project,Subquery,Filter,Limit ...等
3、Leaf Node
葉子節點,沒有孩子節點的節點。
[java] view plain copy- A [[TreeNode]] with no children.
- trait LeafNode[BaseType <: TreeNode[BaseType]] {
- def children = Nil
- }
- abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
- self: Product =>
- // Leaf nodes by definition cannot reference any input attributes.
- override def references = Set.empty
- }
下面列出主要繼承葉子節點的類,可以當查詢手冊用 :)
提示常用的葉子節點: Command類系列,一些Funtion函數,以及Unresolved Relation...etc.
二、TreeNode 核心方法
簡單介紹一個TreeNode這個類的屬性和方法 currentId
一顆樹裏的TreeNode有個唯一的id,類型是java.util.concurrent.atomic.AtomicLong原子類型。
[java] view plain copy
- private val currentId = new java.util.concurrent.atomic.AtomicLong
- protected def nextId() = currentId.getAndIncrement()
sameInstance
判斷2個實例是否是同一個的時候,只需要判斷TreeNode的id。
- def sameInstance(other: TreeNode[_]): Boolean = {
- this.id == other.id
- }
fastEquals,更常用的一個快捷的判定方法,沒有重寫Object.Equals,這樣防止scala編譯器生成case class equals 方法
[java] view plain copy- def fastEquals(other: TreeNode[_]): Boolean = {
- sameInstance(other) || this == other
- }
map,flatMap,collect都是遞歸的對子節點進行應用PartialFunction,其它方法還有很多,篇幅有限這裏不一一描述了。
2.1、核心方法 transform 方法
transform該方法接受一個PartialFunction,就是就是前一篇文章Analyzer裏提到的Batch裏面的Rule。
是會將Rule叠代應用到該節點的所有子節點,最後返回這個節點的副本(一個和當前節點不同的節點,後面會介紹,其實就是利用反射來返回一個修改後的節點)。
如果rule沒有對一個節點進行PartialFunction的操作,就返回這個節點本身。
來看一個例子:
[java] view plain copy
- object GlobalAggregates extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform { //apply方法這裏調用了logical plan(TreeNode) 的transform方法來應用一個PartialFunction。
- case Project(projectList, child) if containsAggregates(projectList) =>
- Aggregate(Nil, projectList, child)
- }
- def containsAggregates(exprs: Seq[Expression]): Boolean = {
- exprs.foreach(_.foreach {
- case agg: AggregateExpression => return true
- case _ =>
- })
- false
- }
- }
這個方法真正的調用是transformChildrenDown,這裏提到了用先序遍歷來對子節點進行遞歸的Rule應用。
如果在對當前節點應用rule成功,修改後的節點afterRule,來對其children節點進行rule的應用。
transformDown方法:
[java] view plain copy
- /**
- * Returns a copy of this node where `rule` has been recursively applied to it and all of its
- * children (pre-order). When `rule` does not apply to a given node it is left unchanged.
- * @param rule the function used to transform this nodes children
- */
- ef transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
- val afterRule = rule.applyOrElse(this, identity[BaseType])
- // Check if unchanged and then possibly return old copy to avoid gc churn.
- if (this fastEquals afterRule) {
- transformChildrenDown(rule) //修改前節點this.transformChildrenDown(rule)
- } else {
- afterRule.transformChildrenDown(rule) //修改後節點進行transformChildrenDown
- }
最重要的方法transformChildrenDown:
對children節點進行遞歸的調用PartialFunction,利用最終返回的newArgs來生成一個新的節點,這裏調用了makeCopy()來生成節點。
transformChildrenDown方法:
[java] view plain copy
- /**
- * Returns a copy of this node where `rule` has been recursively applied to all the children of
- * this node. When `rule` does not apply to a given node it is left unchanged.
- * @param rule the function used to transform this nodes children
- */
- def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {
- var changed = false
- val newArgs = productIterator.map {
- case arg: TreeNode[_] if children contains arg =>
- val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //遞歸子節點應用rule
- if (!(newChild fastEquals arg)) {
- changed = true
- newChild
- } else {
- arg
- }
- case Some(arg: TreeNode[_]) if children contains arg =>
- val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
- if (!(newChild fastEquals arg)) {
- changed = true
- Some(newChild)
- } else {
- Some(arg)
- }
- case m: Map[_,_] => m
- case args: Traversable[_] => args.map {
- case arg: TreeNode[_] if children contains arg =>
- val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
- if (!(newChild fastEquals arg)) {
- changed = true
- newChild
- } else {
- arg
- }
- case other => other
- }
- case nonChild: AnyRef => nonChild
- case null => null
- }.toArray
- if (changed) makeCopy(newArgs) else this //根據作用結果返回的newArgs數組,反射生成新的節點副本。
- }
makeCopy方法,反射生成節點副本
[java] view plain copy- /**
- * Creates a copy of this type of tree node after a transformation.
- * Must be overridden by child classes that have constructor arguments
- * that are not present in the productIterator.
- * @param newArgs the new product arguments.
- */
- def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {
- try {
- val defaultCtor = getClass.getConstructors.head //反射獲取默認構造函數的第一個
- if (otherCopyArgs.isEmpty) {
- defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成當前節點類型的節點
- } else {
- defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //如果還有其它參數,++
- }
- } catch {
- case e: java.lang.IllegalArgumentException =>
- throw new TreeNodeException(
- this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName? "
- + s"Exception message: ${e.getMessage}.")
- }
- }
三、TreeNode實例
現在準備從一段sql來出發,畫一下這個spark sql的整體樹的transformation。 SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key 首先,我們先執行一下,在控制臺裏看一下生成的計劃: [java] view plain copy- <span style="font-size:12px;">sbt/sbt hive/console
- Using /usr/java/default as default JAVA_HOME.
- Note, this will be overridden by -java-home if it is set.
- [info] Loading project definition from /app/hadoop/shengli/spark/project/project
- [info] Loading project definition from /app/hadoop/shengli/spark/project
- [info] Set current project to root (in build file:/app/hadoop/shengli/spark/)
- [info] Starting scala interpreter...
- [info]
- import org.apache.spark.sql.catalyst.analysis._
- import org.apache.spark.sql.catalyst.dsl._
- import org.apache.spark.sql.catalyst.errors._
- import org.apache.spark.sql.catalyst.expressions._
- import org.apache.spark.sql.catalyst.plans.logical._
- import org.apache.spark.sql.catalyst.rules._
- import org.apache.spark.sql.catalyst.types._
- import org.apache.spark.sql.catalyst.util._
- import org.apache.spark.sql.execution
- import org.apache.spark.sql.hive._
- import org.apache.spark.sql.hive.test.TestHive._
- import org.apache.spark.sql.parquet.ParquetTestData
- scala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>
3.1、UnResolve Logical Plan
第一步生成UnResolve Logical Plan 如下: [java] view plain copy- scala> query.queryExecution.logical
- res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [*]
- Join Inner, Some((‘a.key = ‘b.key))
- Subquery a
- Project [*]
- UnresolvedRelation None, src, None
- Subquery b
- Project [*]
- UnresolvedRelation None, src, None
3.2、Analyzed Logical Plan
Analyzer會將允用Batch的Rules來對Unresolved Logical Plan Tree 進行rule應用,這裏用來EliminateAnalysisOperators將Subquery給消除掉,Batch("Resolution將Atrribute和Relation給Resolve了,Analyzed Logical Plan Tree如下圖:3.3、Optimized Plan
我把Catalyst裏的Optimizer戲稱為Spark SQL的優化大師,因為整個Spark SQL的優化都是在這裏進行的,後面會有文章來講解Optimizer。 在這裏,優化的不明顯,因為SQL本身不復雜 [java] view plain copy- scala> query.queryExecution.optimizedPlan
- res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [key#0,value#1,key#2,value#3]
- Join Inner, Some((key#0 = key#2))
- MetastoreRelation default, src, None
- MetastoreRelation default, src, None
3.4、executedPlan
最後一步是最終生成的物理執行計劃,裏面涉及到了Hive的TableScan,涉及到了HashJoin操作,還涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。 [java] view plain copy- scala> query.queryExecution.executedPlan
- res4: org.apache.spark.sql.execution.SparkPlan =
- Project [key#0:0,value#1:1,key#2:2,value#3:3]
- HashJoin [key#0], [key#2], BuildRight
- Exchange (HashPartitioning [key#0:0], 150)
- HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None
- Exchange (HashPartitioning [key#2:0], 150)
- HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None
四、總結:
本文介紹了Spark SQL的Catalyst框架核心TreeNode類庫,繪制了TreeNode繼承關系的類圖,了解了TreeNode這個類在Catalyst所起到的作用。語法樹中的Logical Plan均派生自TreeNode,並且Logical Plan派生出TreeNode的三種形態,即Binary Node, Unary Node, Leaft Node。 正式這幾種節點,組成了Spark SQl的Catalyst的語法樹。TreeNode的transform方法是核心的方法,它接受一個rule,會對當前節點的孩子節點進行遞歸的調用rule,最後會返回一個TreeNode的copy,這種操作就是transformation,貫穿了Spark SQL執行的幾個核心階段,如Analyze,Optimize階段。
最後用一個實際的例子,展示出來Spark SQL的執行樹生成流程。
我目前的理解就是這些,如果分析不到位的地方,請大家多多指正。 ——EOF——
原創文章,轉載請註明:
轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/38084079
註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章鏈接。如若需要用於商業目的或者與授權方面的協商,請聯系我。
轉自:http://blog.csdn.net/oopsoom/article/details/38084079第四篇:Spark SQL Catalyst源碼分析之TreeNode Library