1. 程式人生 > >Spark原始碼系列(九)Spark SQL初體驗之解析過程詳解

Spark原始碼系列(九)Spark SQL初體驗之解析過程詳解

首先宣告一下這個版本的程式碼是1.1的,之前講的都是1.0的。

Spark支援兩種模式,一種是在spark裡面直接寫sql,可以通過sql來查詢物件,類似.net的LINQ一樣,另外一種支援hive的HQL。不管是哪種方式,下面提到的步驟都會有,不同的是具體的執行過程。下面就說一下這個過程。

Sql解析成LogicPlan

使用Idea的快捷鍵Ctrl + Shift + N開啟SQLQuerySuite檔案,進行除錯吧。

複製程式碼
  def sql(sqlText: String): SchemaRDD = {
    if (dialect == "sql") {
      new SchemaRDD
(this, parseSql(sqlText)) } else { sys.error(s"Unsupported SQL dialect: $dialect") } }
複製程式碼

從這裡可以看出來,第一步是解析sql,最後把它轉換成一個SchemaRDD。點選進入parseSql函式,發現解析Sql的過程在SqlParser這個類裡面。
在SqlParser的apply方法裡面,我們可以看到else語句裡面的這段程式碼。

      //對input進行解析,符合query的模式的就返回Success
      phrase(query)(new lexical.Scanner(input)) match {
        
case Success(r, x) => r case x => sys.error(x.toString) }

這裡我們主要關注query就可以。

複製程式碼
  protected lazy val query: Parser[LogicalPlan] = (
    select * (
        UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
        INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
        EXCEPT 
^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache )
複製程式碼

這裡面有很多看不懂的操作符,請到下面這個網址裡面去學習。這裡可以看出來它目前支援的sql語句只是select和insert。

我們繼續檢視select。

View Code

可以看得出來它對sql的解析是和我們常用的sql寫法是一致的,這裡面再深入下去還有遞迴,並不是看起來那麼好理解。這裡就不繼續講下去了,在解析hive的時候我會重點講一下,我認為目前大家使用得更多是仍然是來源於hive的資料集,畢竟hive那麼穩定。

到這裡我們可以知道第一步是通過Parser把sql解析成一個LogicPlan。

LogicPlan到RDD的轉換過程

好,下面我們回到剛才的程式碼,接著我們應該看SchemaRDD。

複製程式碼
  override def compute(split: Partition, context: TaskContext): Iterator[Row] =
    firstParent[Row].compute(split, context).map(_.copy())

  override def getPartitions: Array[Partition] = firstParent[Row].partitions

  override protected def getDependencies: Seq[Dependency[_]] =
    List(new OneToOneDependency(queryExecution.toRdd))
複製程式碼

SchemaRDD是一個RDD的話,那麼它最重要的3個屬性:compute函式,分割槽,依賴全在這裡面,其它的函式我們就不看了。

挺奇怪的是,我們new出來的RDD,怎麼會有依賴呢,這個queryExecution是啥,點選進去看看吧,程式碼跳轉到SchemaRDD繼承的SchemaRDDLike裡面。

lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }

把這兩段很短的程式碼都放一起了,executePlan方法就是new了一個QueryExecution出來,那我們繼續看看QueryExecution這個類吧。

複製程式碼
    lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
    lazy val optimizedPlan = optimizer(analyzed)
    lazy val sparkPlan = {
      SparkPlan.currentContext.set(self)
      planner(optimizedPlan).next()
    }
    // 在需要的時候加入Shuffle操作
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
    lazy val toRdd: RDD[Row] = executedPlan.execute()
複製程式碼

從這裡可以看出來LogicPlan是經過了5個步驟的轉換,要被analyzer和optimizer的處理,然後轉換成SparkPlan,在執行之前還要被prepareForExecution處理一下,最後呼叫execute方法轉成RDD.

下面我們分步講這些個東東到底是幹啥了。

首先我們看看Anayzer,它是繼承自RuleExecutor的,這裡插句題外話,Spark sql的作者Michael Armbrust在2013年的Spark Submit上介紹Catalyst的時候,就說到要從整體地去優化一個sql的執行是很困難的,所有設計成這種基於一個一個小規則的這種優化方式,既簡單又方便維護。

好,我們接下來看看RuleExecutor的apply方法。

View Code

看完了RuleExecutor,我們繼續看Analyzer,下面我只貼出來batches這塊的程式碼,剩下的要自己去看了哦。

View Code

可以看得出來Analyzer是把Unresolved的LogicPlan解析成resolved的,解析裡面的表名、欄位、函式、別名什麼的。

我們接著看Optimizer, 從單詞上看它是用來做優化的,但是從程式碼上來看它更多的是為了過濾我們寫的一些垃圾語句,並沒有做什麼實際的優化。

View Code

真是用心良苦啊,看來我們寫sql的時候還是要注意一點的,你看人家花多大的功夫來優化我們的爛sql。。。要是我肯定不優化。。。寫得爛就慢去吧!

接下來,就改看這一句了planner(optimizedPlan).next() 我們先看看SparkPlanner吧。

View Code

這一步是把邏輯計劃轉換成物理計劃,或者說是執行計劃了,裡面有很多概念是我以前沒聽過的,網上查了一下才知道,原來資料庫的執行計劃還有那麼多的說法,這一塊需要是專門研究資料庫的人比較瞭解了。剩下的兩步就是prepareForExecution和execute操作。

prepareForExecution操作是檢查物理計劃當中的Distribution是否滿足Partitioning的要求,如果不滿足的話,需要重新弄做分割槽,新增shuffle操作,這塊暫時沒咋看懂,以後還需要仔細研究。最後呼叫SparkPlan的execute方法,這裡面稍微講講這塊的樹型結構。

sql解析出來就是一個二叉樹的結構,不管是邏輯計劃還是物理計劃,都是這種結構,所以在程式碼裡面可以看到LogicPlan和SparkPlan的具體實現類都是有繼承上面圖中的三種類型的節點的。

非LeafNode的SparkPlan的execute方法都會有這麼一句child.execute(),因為它需要先執行子節點的execute來返回資料,執行的過程是一個先序遍歷。

最後把這個過程也用一個圖來表示吧,方便記憶。

(1)通過一個Parser來把sql語句轉換成Unresolved LogicPlan,目前有兩種Parser,SqlParser和HiveQl。

(2)通過Analyzer把LogicPlan當中的Unresolved的內容給解析成resolved的,這裡麵包括表名、函式、欄位、別名等。

(3)通過Optimizer過濾掉一些垃圾的sql語句。

(4)通過Strategies把邏輯計劃轉換成可以具體執行的物理計劃,具體的類有SparkStrategies和HiveStrategies。

(5)在執行前用prepareForExecution方法先檢查一下。

(6)先序遍歷,呼叫執行計劃樹的execute方法。