1. 程式人生 > >Spark修煉之道(進階篇)——Spark入門到精通:第九節 Spark SQL執行流程解析

Spark修煉之道(進階篇)——Spark入門到精通:第九節 Spark SQL執行流程解析

1.整體執行流程

使用下列程式碼對SparkSQL流程進行分析,讓大家明白LogicalPlan的幾種狀態,理解SparkSQL整體執行流程

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case
classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("/examples/src/main/resources/people.txt"
).map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

(1)檢視teenagers的Schema資訊

scala> teenagers.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

(2)檢視執行流程

scala> teenagers.queryExecution
res3: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

QueryExecution中表示的是整體Spark SQL執行流程,從上面的輸出結果可以看到,一個SQL語句要執行需要經過下列步驟:

== (1)Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== (2)Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (3)Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (4)Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

//啟動動態位元組碼生成技術(bytecode generation,CG),提升查詢效率
Code Generation: true

2.全表查詢執行流程

執行語句:

val all= sqlContext.sql("SELECT * FROM people")

執行流程:

scala> all.queryExecution
res9: org.apache.spark.sql.SQLContext#QueryExecution =
//注意*號被解析為unresolvedalias(*)
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
//unresolvedalias(*)被analyzed為Schema中所有的欄位
//UnresolvedRelation [people]被analyzed為Subquery people
name: string, age: int
Project [name#0,age#1]
 Subquery people
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Scan PhysicalRDD[name#0,age#1]

Code Generation: true

3. filter查詢執行流程

執行語句:

scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

執行流程:

scala> filterQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 //多出了Filter,後同
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

4. join查詢執行流程

執行語句:

val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")

檢視整體執行流程

scala> joinQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Filter
//Join Inner
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age = 'b.age)
  'Join Inner, None
   'UnresolvedRelation [people], Some(a)
   'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#2,age#3]
 Filter (age#1 = age#3)
  Join Inner, None
   Subquery a
    Subquery people
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
   Subquery b
    Subquery people
     LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Project [name#0,age#1,name#2,age#3]
 Join Inner, Some((age#1 = age#3))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...

//檢視其Physical Plan
scala> joinQuery.queryExecution.sparkPlan
res16: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#2,age#3]
 SortMergeJoin [age#1], [age#3]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#2,age#3]

前面的例子與下面的例子等同,只不過其執行方式略有不同,執行語句:

scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")
innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]

檢視整體執行流程:

scala> innerQuery.queryExecution
res2: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Join Inner
//另外這裡面沒有Filter
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Join Inner, Some(('a.age = 'b.age))
  'UnresolvedRelation [people], Some(a)
  'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  Subquery a
   Subquery people
    LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
  Subquery b
   Subquery people
    LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//注意Optimized Logical Plan與Analyzed Logical Plan
//並沒有進行特別的優化,突出這一點是為了比較後面的子查詢
//其Analyzed和Optimized間的區別
== Optimized Logical Plan ==
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...

//檢視其Physical Plan
scala> innerQuery.queryExecution.sparkPlan
res14: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#6,age#7]
 SortMergeJoin [age#1], [age#7]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#6,age#7]

5. 子查詢執行流程

執行語句:

scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")
subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

檢視整體執行流程:


scala> subQuery.queryExecution
res4: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//這裡需要注意Optimized與Analyzed間的區別
//Filter被進行了優化
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

6. 聚合SQL執行流程

執行語句:

scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")
aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]

執行流程檢視:


scala> aggregateQuery.queryExecution
res6: org.apache.spark.sql.SQLContext#QueryExecution =
//注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
//即group by a.name被 parsed為unresolvedalias('a.name)
== Parsed Logical Plan ==
'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, _c1: bigint
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitions...

//檢視其Physical Plan
scala> aggregateQuery.queryExecution.sparkPlan
res10: org.apache.spark.sql.execution.SparkPlan =
TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])
 TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])
  Filter ((age#1 >= 13) && (age#1 <= 19))
   Scan PhysicalRDD[name#0,age#1]

其它SQL語句,大家可以使用同樣的方法檢視其執行流程,以掌握Spark SQL背後實現的基本思想。