1. 程式人生 > >第一篇:Spark SQL源碼分析之核心流程

第一篇:Spark SQL源碼分析之核心流程

example 協議 bst copyto name 分詞 oop 不同 spl

/** Spark SQL源碼分析系列文章*/

自從去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的貢獻者從幾人到了幾十人,而且發展速度異常迅猛,究其原因,個人認為有以下2點:

1、整合:將SQL類型的查詢語言整合到 Spark 的核心RDD概念裏。這樣可以應用於多種任務,流處理,批處理,包括機器學習裏都可以引入Sql。
2、效率:因為Shark受到hive的編程模型限制,無法再繼續優化來適應Spark模型裏。

前一段時間測試過Shark,並且對Spark SQL也進行了一些測試,但是還是忍不住對Spark SQL一探究竟,就從源代碼的角度來看一下Spark SQL的核心執行流程吧。

一、引子

先來看一段簡單的Spark SQL程序:

[java] view plain copy
  1. 1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  2. 2. import sqlContext._
  3. 3.case class Person(name: String, age: Int)
  4. 4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
  5. 5.people.registerAsTable("people")
  6. 6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  7. 7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)


程序前兩句1和2生成SQLContext,導入sqlContext下面的all,也就是運行SparkSQL的上下文環境。
程序3,4兩句是加載數據源註冊table
第6句是真正的入口,是sql函數,傳入一句sql,先會返回一個SchemaRDD。這一步是lazy的,直到第七句的collect這個action執行時,sql才會執行。

二、SQLCOntext

SQLContext是執行SQL的上下文對象,首先來看一下它Hold的有哪些成員:

Catalog

一個存儲<tableName,logicalPlan>的map結構,查找關系的目錄,註冊表,註銷表,查詢表和邏輯計劃關系的類。

技術分享

SqlParser

Parse 傳入的sql來對語法分詞,構建語法樹,返回一個logical plan

技術分享

Analyzer

logical plan的語法分析器

技術分享

Optimizer

logical Plan的優化器

技術分享

LogicalPlan

邏輯計劃,由catalyst的TreeNode組成,可以看到有3種語法樹

技術分享

SparkPlanner

包含不同策略的優化策略來優化物理執行計劃

技術分享

QueryExecution

sql執行的環境上下文

技術分享
就是這些對象組成了Spark SQL的運行時,看起來很酷,有靜態的metadata存儲,有分析器、優化器、邏輯計劃、物理計劃、執行運行時。
那這些對象是怎麽相互協作來執行sql語句的呢?

三、Spark SQL執行流程

話不多說,先上圖,這個圖我用一個在線作圖工具process on話的,畫的不好,圖能達意就行:

技術分享

核心組件都是綠色的方框,每一步流程的結果都是藍色的框框,調用的方法是橙色的框框。

先概括一下,大致的執行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD

更具體的執行流程:

sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函數調用) 執行sql生成RDD

3.1、Parse SQL

回到開始的程序,我們調用sql函數,其實是SQLContext裏的sql函數它的實現是new一個SchemaRDD,在生成的時候就調用parseSql方法了。

[java] view plain copy
  1. /**
  2. * Executes a SQL query using Spark, returning the result as a SchemaRDD.
  3. *
  4. * @group userf
  5. */
  6. def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

結果是會生成一個邏輯計劃

[java] view plain copy
  1. @transient
  2. protected[sql] val parser = new catalyst.SqlParser
  3. protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

3.2、Analyze to Execution

當我們調用SchemaRDD裏面的collect方法時,則會初始化QueryExecution,開始啟動執行。

[java] view plain copy
  1. override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()

我們可以很清晰的看到執行步驟:

[java] view plain copy
  1. protected abstract class QueryExecution {
  2. def logical: LogicalPlan
  3. lazy val analyzed = analyzer(logical) //首先分析器會分析邏輯計劃
  4. lazy val optimizedPlan = optimizer(analyzed) //隨後優化器去優化分析後的邏輯計劃
  5. // TODO: Don‘t just pick the first one...
  6. lazy val sparkPlan = planner(optimizedPlan).next() //根據策略生成plan物理計劃
  7. // executedPlan should not be used to initialize any SparkPlan. It should be
  8. // only used for execution.
  9. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最後生成已經準備好的Spark Plan
  10. /** Internal version of the RDD. Avoids copies and has no schema */
  11. lazy val toRdd: RDD[Row] = executedPlan.execute() //最後調用toRDD方法執行任務將結果轉換為RDD
  12. protected def stringOrError[A](f: => A): String =
  13. try f.toString catch { case e: Throwable => e.toString }
  14. def simpleString: String = stringOrError(executedPlan)
  15. override def toString: String =
  16. s"""== Logical Plan ==
  17. |${stringOrError(analyzed)}
  18. |== Optimized Logical Plan ==
  19. |${stringOrError(optimizedPlan)}
  20. |== Physical Plan ==
  21. |${stringOrError(executedPlan)}
  22. """.stripMargin.trim
  23. }


至此整個流程結束。

四、總結:

通過分析SQLContext我們知道了Spark SQL都包含了哪些組件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含Physical Plan),QueryExecution.
通過調試代碼,知道了Spark SQL的執行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函數調用) 執行sql生成RDD

隨後還會對裏面的每個組件對象進行研究,看看catalyst究竟做了哪些優化。

——EOF——

原創文章,轉載請註明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/37658021

註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章鏈接。如若需要用於商業目的或者與授權方面的協商,請聯系我。

技術分享

轉自:http://blog.csdn.net/oopsoom/article/details/37658021

第一篇:Spark SQL源碼分析之核心流程