1. 程式人生 > >第四篇:Spark SQL Catalyst源碼分析之TreeNode Library

第四篇:Spark SQL Catalyst源碼分析之TreeNode Library

pla where 並且 手冊 input bst node lec esc

/** Spark SQL源碼分析系列文章*/

前幾篇文章介紹了Spark SQL的Catalyst的核心運行流程、SqlParser,和Analyzer,本來打算直接寫Optimizer的,但是發現忘記介紹TreeNode這個Catalyst的核心概念,介紹這個可以更好的理解Optimizer是如何對Analyzed Logical Plan進行優化的生成Optimized Logical Plan,本文就將TreeNode基本架構進行解釋。

一、TreeNode類型

TreeNode Library是Catalyst的核心類庫,語法樹的構建都是由一個個TreeNode組成。TreeNode本身是一個BaseType <: TreeNode[BaseType] 的類型,並且實現了Product這個trait,這樣可以存放異構的元素了。
TreeNode有三種形態:BinaryNode

UnaryNodeLeaf Node.
在Catalyst裏,這些Node都是繼承自Logical Plan,可以說每一個TreeNode節點就是一個Logical Plan(包含Expression)(直接繼承自TreeNode)

主要繼承關系類圖如下:

技術分享

1、BinaryNode

二元節點,即有左右孩子的二叉節點

[java] view plain copy
  1. [[TreeNode]] that has two children, [[left]] and [[right]].
  2. trait BinaryNode[BaseType <: TreeNode[BaseType]] {
  3. def left: BaseType
  4. def right: BaseType
  5. def children = Seq(left, right)
  6. }
  7. abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
  8. self: Product =>
  9. }

節點定義比較簡單,左孩子,右孩子都是BaseType。 children是一個Seq(left, right)

下面列出主要繼承二元節點的類,可以當查詢手冊用 :)

這裏提示下平常常用的二元節點:Join和Union

技術分享

2、UnaryNode

一元節點,即只有一個孩子節點

[java] view plain copy
  1. A [[TreeNode]] with a single [[child]].
  2. trait UnaryNode[BaseType <: TreeNode[BaseType]] {
  3. def child: BaseType
  4. def children = child :: Nil
  5. }
  6. abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
  7. self: Product =>
  8. }

下面列出主要繼承一元節點的類,可以當查詢手冊用 :)

常用的二元節點有,Project,Subquery,Filter,Limit ...等
技術分享

3、Leaf Node

葉子節點,沒有孩子節點的節點。

[java] view plain copy
  1. A [[TreeNode]] with no children.
  2. trait LeafNode[BaseType <: TreeNode[BaseType]] {
  3. def children = Nil
  4. }
  5. abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
  6. self: Product =>
  7. // Leaf nodes by definition cannot reference any input attributes.
  8. override def references = Set.empty
  9. }

下面列出主要繼承葉子節點的類,可以當查詢手冊用 :)

提示常用的葉子節點: Command類系列,一些Funtion函數,以及Unresolved Relation...etc.

技術分享

二、TreeNode 核心方法

簡單介紹一個TreeNode這個類的屬性和方法

currentId
一顆樹裏的TreeNode有個唯一的id,類型是java.util.concurrent.atomic.AtomicLong原子類型。

[java] view plain copy
  1. private val currentId = new java.util.concurrent.atomic.AtomicLong
  2. protected def nextId() = currentId.getAndIncrement()

sameInstance
判斷2個實例是否是同一個的時候,只需要判斷TreeNode的id。

[java] view plain copy
  1. def sameInstance(other: TreeNode[_]): Boolean = {
  2. this.id == other.id
  3. }

fastEquals,更常用的一個快捷的判定方法,沒有重寫Object.Equals,這樣防止scala編譯器生成case class equals 方法

[java] view plain copy
  1. def fastEquals(other: TreeNode[_]): Boolean = {
  2. sameInstance(other) || this == other
  3. }

map,flatMap,collect都是遞歸的對子節點進行應用PartialFunction,其它方法還有很多,篇幅有限這裏不一一描述了。

2.1、核心方法 transform 方法

transform該方法接受一個PartialFunction,就是就是前一篇文章Analyzer裏提到的Batch裏面的Rule。
是會將Rule叠代應用到該節點的所有子節點,最後返回這個節點的副本(一個和當前節點不同的節點,後面會介紹,其實就是利用反射來返回一個修改後的節點)。
如果rule沒有對一個節點進行PartialFunction的操作,就返回這個節點本身。

來看一個例子:

[java] view plain copy
  1. object GlobalAggregates extends Rule[LogicalPlan] {
  2. def apply(plan: LogicalPlan): LogicalPlan = plan transform { //apply方法這裏調用了logical plan(TreeNode) 的transform方法來應用一個PartialFunction。
  3. case Project(projectList, child) if containsAggregates(projectList) =>
  4. Aggregate(Nil, projectList, child)
  5. }
  6. def containsAggregates(exprs: Seq[Expression]): Boolean = {
  7. exprs.foreach(_.foreach {
  8. case agg: AggregateExpression => return true
  9. case _ =>
  10. })
  11. false
  12. }
  13. }

這個方法真正的調用是transformChildrenDown,這裏提到了用先序遍歷來對子節點進行遞歸的Rule應用。
如果在對當前節點應用rule成功,修改後的節點afterRule,來對其children節點進行rule的應用。

transformDown方法:

[java] view plain copy
  1. /**
  2. * Returns a copy of this node where `rule` has been recursively applied to it and all of its
  3. * children (pre-order). When `rule` does not apply to a given node it is left unchanged.
  4. * @param rule the function used to transform this nodes children
  5. */
  6. ef transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  7. val afterRule = rule.applyOrElse(this, identity[BaseType])
  8. // Check if unchanged and then possibly return old copy to avoid gc churn.
  9. if (this fastEquals afterRule) {
  10. transformChildrenDown(rule) //修改前節點this.transformChildrenDown(rule)
  11. } else {
  12. afterRule.transformChildrenDown(rule) //修改後節點進行transformChildrenDown
  13. }

最重要的方法transformChildrenDown:
對children節點進行遞歸的調用PartialFunction,利用最終返回的newArgs來生成一個新的節點,這裏調用了makeCopy()來生成節點。

transformChildrenDown方法:

[java] view plain copy
  1. /**
  2. * Returns a copy of this node where `rule` has been recursively applied to all the children of
  3. * this node. When `rule` does not apply to a given node it is left unchanged.
  4. * @param rule the function used to transform this nodes children
  5. */
  6. def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {
  7. var changed = false
  8. val newArgs = productIterator.map {
  9. case arg: TreeNode[_] if children contains arg =>
  10. val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //遞歸子節點應用rule
  11. if (!(newChild fastEquals arg)) {
  12. changed = true
  13. newChild
  14. } else {
  15. arg
  16. }
  17. case Some(arg: TreeNode[_]) if children contains arg =>
  18. val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
  19. if (!(newChild fastEquals arg)) {
  20. changed = true
  21. Some(newChild)
  22. } else {
  23. Some(arg)
  24. }
  25. case m: Map[_,_] => m
  26. case args: Traversable[_] => args.map {
  27. case arg: TreeNode[_] if children contains arg =>
  28. val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
  29. if (!(newChild fastEquals arg)) {
  30. changed = true
  31. newChild
  32. } else {
  33. arg
  34. }
  35. case other => other
  36. }
  37. case nonChild: AnyRef => nonChild
  38. case null => null
  39. }.toArray
  40. if (changed) makeCopy(newArgs) else this //根據作用結果返回的newArgs數組,反射生成新的節點副本。
  41. }

makeCopy方法,反射生成節點副本

[java] view plain copy
  1. /**
  2. * Creates a copy of this type of tree node after a transformation.
  3. * Must be overridden by child classes that have constructor arguments
  4. * that are not present in the productIterator.
  5. * @param newArgs the new product arguments.
  6. */
  7. def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {
  8. try {
  9. val defaultCtor = getClass.getConstructors.head //反射獲取默認構造函數的第一個
  10. if (otherCopyArgs.isEmpty) {
  11. defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成當前節點類型的節點
  12. } else {
  13. defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //如果還有其它參數,++
  14. }
  15. } catch {
  16. case e: java.lang.IllegalArgumentException =>
  17. throw new TreeNodeException(
  18. this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName? "
  19. + s"Exception message: ${e.getMessage}.")
  20. }
  21. }

三、TreeNode實例

現在準備從一段sql來出發,畫一下這個spark sql的整體樹的transformation。 SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key 首先,我們先執行一下,在控制臺裏看一下生成的計劃: [java] view plain copy
  1. <span style="font-size:12px;">sbt/sbt hive/console
  2. Using /usr/java/default as default JAVA_HOME.
  3. Note, this will be overridden by -java-home if it is set.
  4. [info] Loading project definition from /app/hadoop/shengli/spark/project/project
  5. [info] Loading project definition from /app/hadoop/shengli/spark/project
  6. [info] Set current project to root (in build file:/app/hadoop/shengli/spark/)
  7. [info] Starting scala interpreter...
  8. [info]
  9. import org.apache.spark.sql.catalyst.analysis._
  10. import org.apache.spark.sql.catalyst.dsl._
  11. import org.apache.spark.sql.catalyst.errors._
  12. import org.apache.spark.sql.catalyst.expressions._
  13. import org.apache.spark.sql.catalyst.plans.logical._
  14. import org.apache.spark.sql.catalyst.rules._
  15. import org.apache.spark.sql.catalyst.types._
  16. import org.apache.spark.sql.catalyst.util._
  17. import org.apache.spark.sql.execution
  18. import org.apache.spark.sql.hive._
  19. import org.apache.spark.sql.hive.test.TestHive._
  20. import org.apache.spark.sql.parquet.ParquetTestData
  21. scala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>

3.1、UnResolve Logical Plan

第一步生成UnResolve Logical Plan 如下: [java] view plain copy
  1. scala> query.queryExecution.logical
  2. res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
  3. Project [*]
  4. Join Inner, Some((‘a.key = ‘b.key))
  5. Subquery a
  6. Project [*]
  7. UnresolvedRelation None, src, None
  8. Subquery b
  9. Project [*]
  10. UnresolvedRelation None, src, None
如果畫成樹是這樣的,僅個人理解: 我將一開始介紹的三種Node分別用綠色UnaryNode,紅色Binary Node 和 藍色 LeafNode 來表示。 技術分享

3.2、Analyzed Logical Plan

Analyzer會將允用Batch的Rules來對Unresolved Logical Plan Tree 進行rule應用,這裏用來EliminateAnalysisOperators將Subquery給消除掉,Batch("Resolution將Atrribute和Relation給Resolve了,Analyzed Logical Plan Tree如下圖: 技術分享

3.3、Optimized Plan

我把Catalyst裏的Optimizer戲稱為Spark SQL的優化大師,因為整個Spark SQL的優化都是在這裏進行的,後面會有文章來講解Optimizer。 在這裏,優化的不明顯,因為SQL本身不復雜 [java] view plain copy
  1. scala> query.queryExecution.optimizedPlan
  2. res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
  3. Project [key#0,value#1,key#2,value#3]
  4. Join Inner, Some((key#0 = key#2))
  5. MetastoreRelation default, src, None
  6. MetastoreRelation default, src, None
生成的樹如下圖: 技術分享

3.4、executedPlan

最後一步是最終生成的物理執行計劃,裏面涉及到了Hive的TableScan,涉及到了HashJoin操作,還涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。 [java] view plain copy
  1. scala> query.queryExecution.executedPlan
  2. res4: org.apache.spark.sql.execution.SparkPlan =
  3. Project [key#0:0,value#1:1,key#2:2,value#3:3]
  4. HashJoin [key#0], [key#2], BuildRight
  5. Exchange (HashPartitioning [key#0:0], 150)
  6. HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None
  7. Exchange (HashPartitioning [key#2:0], 150)
  8. HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None
生成的物理執行樹如圖: 技術分享

四、總結:

本文介紹了Spark SQL的Catalyst框架核心TreeNode類庫,繪制了TreeNode繼承關系的類圖,了解了TreeNode這個類在Catalyst所起到的作用。語法樹中的Logical Plan均派生自TreeNode,並且Logical Plan派生出TreeNode的三種形態,即Binary Node, Unary Node, Leaft Node。 正式這幾種節點,組成了Spark SQl的Catalyst的語法樹。
TreeNode的transform方法是核心的方法,它接受一個rule,會對當前節點的孩子節點進行遞歸的調用rule,最後會返回一個TreeNode的copy,這種操作就是transformation,貫穿了Spark SQL執行的幾個核心階段,如Analyze,Optimize階段。
最後用一個實際的例子,展示出來Spark SQL的執行樹生成流程。

我目前的理解就是這些,如果分析不到位的地方,請大家多多指正。 ——EOF——

原創文章,轉載請註明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/38084079

註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章鏈接。如若需要用於商業目的或者與授權方面的協商,請聯系我。

技術分享

轉自:http://blog.csdn.net/oopsoom/article/details/38084079

第四篇:Spark SQL Catalyst源碼分析之TreeNode Library