1. 程式人生 > >深入研究Spark SQL的Catalyst優化器(原創翻譯)

深入研究Spark SQL的Catalyst優化器(原創翻譯)

超越 href 語法 英文 更多 com edi 此外 並行化

Spark SQL是Spark最新和技術最為復雜的組件之一。它支持SQL查詢和新的DataFrame API。Spark SQL的核心是Catalyst優化器,它以一種新穎的方式利用高級編程語言特性(例如Scala的模式匹配和quasiquotes)來構建可擴展查詢優化器。 我們最近發布了一篇關於Spark SQL的論文,該論文將出現在SIGMOD 2015(由Davies Liu,Joseph K. Bradley,Xiangrui Meng,Tomer Kaftan,Michael J. Franklin和Ali Ghodsi合著)中。在這篇博客文章中,我們重述該論文的部分內容,解釋Catalyst優化器的內部功能以實現更廣泛的應用。 為了實現Spark SQL,我們設計了一個新的可擴展優化器Catalyst,它基於Scala中的函數式編程結構。 Catalyst的可擴展設計有兩個目的。首先,我們希望能夠輕松地為Spark SQL添加新的優化技術和功能,尤其是為了解決我們在使用大數據時遇到的各種問題(例如,半結構化數據和高級分析)。其次,我們希望使外部開發人員能夠擴展優化器 - 例如,通過添加數據源特定規則,可以將過濾或聚合的數據推送到外部存儲系統,或者支持新的數據類型。 Catalyst支持基於規則和基於成本的優化。 Catalyst的核心是使用一個通用庫生成樹並使用規則操作這些樹。在該框架的基礎上,構建了用於關系查詢處理庫(例如表達式,邏輯查詢計劃)和處理執行查詢不同階段的幾組規則:分析、邏輯優化、物理計劃和代碼生成,代碼生成將部分查詢編譯為Java字節碼。對於後者,使用了Scala特性quasiquotes,它可以很容易地在運行時由組合表達式生成代碼。最後,Catalyst提供了幾個公共擴展點,包括外部數據源和用戶定義的類型。

Catalyst中的主要數據類型是由節點對象組成的樹。 每個節點都有一個節點類型和零個或多個子節點。 新的節點類型在Scala中定義為TreeNode類的子類。 這些對象是不可變的,並可以使用函數轉換來操作,如下一小節所討論的。 一個簡單的例子,使用非常簡單的表達式語言描述三個節點類:
  • Literal(值:Int):常數值
  • Attribute(名稱:String):輸入行的屬性,例如“x”
  • Add(左:TreeNode,右:TreeNode):兩個表達式的總和。
這些類可以用來構建樹; 例如,表達式x +(1 + 2)的樹將在Scala代碼中表示如下:
1
Add(Attribute(x), Add(Literal(1), Literal(2)))

技術分享圖片技術分享圖片

規則

可以使用規則來操作樹,這些規則是從一棵樹到另一棵樹的函數。雖然規則可以在其輸入樹上運行任意代碼(因為該樹只是一個Scala對象),但最常見的方法是使用一組模式匹配函數來查找和替換具有特定結構的子樹。 模式匹配是許多函數式語言的一個特性,它允許從代數數據類型的潛在嵌套結構中提取值。在Catalyst中,樹提供了一種轉換方法,該方法遞歸地在樹的所有節點上應用模式匹配函數,將每個模式匹配轉換為結果。例如,我們可以實現一個在常量之間相加???Add操作的規則,如下所示:
1
tree.transform { 2 case Add(Literal(c1), Literal(c2)) => Literal(c1+c2) 3 }

將此應用於x +(1 + 2)的樹會產生新的樹x + 3。這裏關鍵是使用了Scala的標準模式匹配語法,它可用於匹配對象的類型和為提取的值(這裏為c1和c2)提供名稱。

傳遞給變換的模式匹配表達式是一個部分函數,??這意味著它只需要匹配所有輸入樹的子集。 Catalyst將測試規則適用樹的哪些部分,自動跳過並下降到不匹配的子樹。這種能力意味著規則只需對給定適用優化的樹進行推理,而對那些不適用的數不進行推理。因此,當新的操作符新增到系統中時,這些規則不需要修改。 規則(和一般的Scala模式匹配)可以在同一個變換調用中匹配多個模式,這使得一次實現多個轉換來得非常簡潔。
1 tree.transform {
2   case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
3   case Add(left, Literal(0)) => left
4   case Add(Literal(0), right) => right
5 }

實際上,規則可能需要多次執行才能完全轉換樹。Catalyst將規則形成批處理,並執行每個批處理至固定點,該固定點是樹應用其規則後不發生改變。雖然規則運行到固定點意味著每個規則是簡單且自包含,但這些規則仍會對樹上產生較大的全局效果。在上面的例子中,重復的應用規則會持續折疊較大的樹,比如(x + 0)+(3 + 3)。另一個例子,第一個批處理可以分析所有屬性指定類型的表達式,而第二批處理可使用這些類型來進行常量折疊。在每批處理完畢後,開發人員還可以對新樹進行規範性檢查(例如,查看所有屬性為指定類型),這些檢查一般使用遞歸匹配來編寫。

最後,規則條件及其本身可以包含任意的Scala代碼。這使得Catalyst比領域特定語言在優化器上更強大,同時保持簡潔特性。 根據經驗,對不可變樹的函數轉換使得整個優化器非常易於推理和調試。規則也支持在優化器中並行化,盡管該特性還沒有利用這個。

在Spark SQL中使用Catalyst

Catalyst的通用樹轉換框架分為四個階段,如下所示:(1)分析解決引用的邏輯計劃,(2)邏輯計劃優化,(3)物理計劃,(4)代碼生成用於編譯部分查詢生成Java字節碼。 在物理規劃階段,Catalyst可能會生成多個計劃並根據成本進行比較。 所有其他階段完全是基於規則的。 每個階段使用不同類型的樹節點; Catalyst包括用於表達式、數據類型以及邏輯和物理運算符的節點庫。 這些階段如下所示: 技術分享圖片 技術分享圖片

解析

sales”中列的類型,甚至列名是否有效,在查詢表sale元數據之前這些都是未知的。如果不知道它的類型或沒有將它匹配到輸入表(或別名)時,那麽該屬性稱為未解析。Spark SQL使用Catalyst規則和記錄所有表元數據的Catalog對象來解析這些屬性的。構建具有未綁定屬性和數據類型的“未解析的邏輯計劃”樹後,然後執行以下規則: 1、從Catalog中查找名稱關系 2、將命名屬性(如col)映射到操作符的子項 3、將那些屬性引用相同的值給它們一個唯一的ID(隨後遇到如col=col時可以進行優化) 4、通過表達式傳遞和強制類型:例如,我們無法知道1+col的返回類型,直到解析出col並將其子表達式轉換為兼容類型。 經過統計,解析器的規則大約有1000行代碼。

邏輯計劃優化

在邏輯優化階段,邏輯計劃應用了標準的基於規則的優化。(基於成本的優化通過規則生成多個計劃,然後計算其成本來執行。)這些優化包括常量折疊、謂詞下推、項目裁剪、空值傳播、布爾表達式簡化以及其他規則。總的來說,為各種情況添加規則非常簡單。例如,當我們將固定精度的DECIMAL類型添加到Spark SQL時,我們想要以較低精度的方式優化DECIMAL的聚合(例如求和和平均值);只需要12行代碼編寫一個規則便可在SUM和AVG表達式中找到該數,然後將它們轉換為未縮放的64位LONG,然後進行聚合,最後將結果轉換回來。這個規則的簡化版本,只能優化SUM表達式如下所示:
1 object DecimalAggregates extends Rule[LogicalPlan] {
2   /** Maximum number of decimal digits in a Long */
3   val MAX_LONG_DIGITS = 18
4   def apply(plan: LogicalPlan): LogicalPlan = {
5     plan transformAllExpressions {
6       case Sum(e @ DecimalType.Expression(prec, scale))
7           if prec + 10 <= MAX_LONG_DIGITS =>
8         MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }
9 }

再舉一個例子,一個12行代碼的規則通過簡單的正則表達式將LIKE表達式優化為String.startsWith或String.contains調用。在規則中使用任意Scala代碼使得這些優化易於表達,而這些規則超越了子樹結構的模式匹配。

經過統計,邏輯優化規則有800行代碼。

物理計劃

在物理計劃階段,Spark SQL使用邏輯計劃生成一個或多個物理計劃,這個過程采用了匹配Spark執行引擎的物理運算符。然後使用成本模型選擇計劃。目前,基於成本的優化僅用於選擇連接算法:對於已知很小的關系,Spark SQL使用Spark中的點對點廣播工具進行廣播連接。不過,該框架支持更深入地使用基於成本的優化,因為可以使用規則對整棵樹進行遞歸估計。因此,我們打算在未來實施更加豐富的基於成本的優化。 物理計劃還執行基於規則的物理優化,例如將管道項目或過濾器合並到一個Spark映射操作中。另外,它可以將操作從邏輯計劃推送到支持謂詞或項目下推的數據源。我們將在後面的章節中描述這些數據源的API。 總的來說,物理計劃規則大約有500行代碼。

代碼生成

查詢優化的最後階段涉及生成Java字節碼用於在每臺機器上運行。由於Spark SQL經常在內存數據集上運行,其中處理受CPU限制,我們希望Spark SQL支持代碼生成以加快執行速度。盡管如此,代碼生成引擎的構建通常很復雜,特別是編譯器。Catalyst依靠Scala語言的特殊功能quasiquotes來簡化代碼生成。 Quasiquotes允許在Scala語言中對抽象語法樹(AST)進行編程式構建,然後在運行時將其提供給Scala編譯器以生成字節碼。使用Catalyst將表示SQL表達式的樹轉換為Scala代碼的AST用於描述表達式,然後編譯並運行生成的代碼。 作為一個簡單的例子,參考第4.2節介紹的Add、Attribute和Literal樹節點可以寫成(x + y)+1表達式。如果沒有使用代碼生成,這些表達式必須遍歷Add、Attribute和Literal節點樹行走才能解釋每行數據。這會引入大量的分支和虛函數調用,從而減慢執行速度。如果使用了代碼生成,可以編寫一個函數將特定的表達式樹轉換為Scala AST,如下所示:
1 def compile(node: Node): AST = node match {
2   case Literal(value) => q"$value"
3   case Attribute(name) => q"row.get($name)"
4   case Add(left, right) => q"${compile(left)} + ${compile(right)}"
5 }

以q開頭的字符串是quasiquotes,雖然它們看起來像字符串,但它們在編譯時由Scala編譯器解析,並代表其代碼的AST。 Quasiquotes用$符號表示法將變量或其他AST拼接到它們中。例如,文字(1)將成為1的Scala表達式的AST,而屬性(“x”)變為row.get(“x”)。最後,類似Add(Literal(1),Attribute(“x”))的樹成為像1 + row.get(“x”)這樣的Scala表達式的AST。

Quasiquotes在編譯時進行類型檢查,以確保只替換合適的AST或文字,使得它們比字符串連接更有用,並且直接生成Scala AST,而非在運行時運行Scala語法分析器。此外,它們是高度可組合的,因為每個節點的代碼生成規則不需要知道其子節點返回的樹是如何構建的。最後,如果Catalyst缺少表達式級別的優化,則由Scala編譯器對結果代碼進行進一步優化。下圖顯示quasiquotes生成代碼其性能類似於手動優化的程序。 技術分享圖片 技術分享圖片 我們發現quasiquotes非常接近於代碼生成,並且發現即使是Spark SQL的新貢獻者也可以快速為新類型的表達式添加規則。 Quasiquotes也適用於在本地Java對象上運行的目標:當從這些對象訪問字段時,可以直接訪問所需字段,而不必將對象復制成Spark SQL 行,並使用行訪問器方法。最後,將代碼生成的評估與對尚未生成代碼的表達式的解釋評估結合起來很簡單,因為編譯的Scala代碼可以直接使用到表達式解釋器中。 Catalyst生成器總共有大約700行代碼。 這篇博客文章介紹了Spark SQL的Catalyst優化器內部原理。 通過這種新穎、簡單的設計使Spark社區能夠快速建立原型、實現和擴展引擎。 你可以在這裏閱讀其余的論文。 您還可以從以下內容中找到有關Spark SQL的更多信息:
  • Spark SQL and DataFrame Programming Guide from Apache Spark
  • Data Source API in Spark presentation by Yin Huai
  • Introducing DataFrames in Spark for Large Scale Data Science by Reynold Xin
  • Beyond SQL: Speeding up Spark with DataFrames by Michael Armbrust
英文博客參見 https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html 原始論文參見 http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf 轉載請註明自 http://www.cnblogs.com/shishanyuan/p/8455786.html

深入研究Spark SQL的Catalyst優化器(原創翻譯)