1. 程式人生 > >Apache Spark 2.2中基於成本的優化器(CBO)(轉載)

Apache Spark 2.2中基於成本的優化器(CBO)(轉載)

ons roc art 3.4 post tinc ner sort 重排序

Apache Spark 2.2最近引入了高級的基於成本的優化器框架用於收集並均衡不同的列數據的統計工作 (例如., 基(cardinality)、唯一值的數量、空值、最大最小值、平均/最大長度,等等)來改進查詢類作業的執行計劃。均衡這些作業幫助Spark在選取最優查詢計劃時做出更好決定。這些優化的例子包括在做hash-join時選擇正確的一方建hash,選擇正確的join類型(廣播hash join和全洗牌hash-join)或調整多路join的順序,等等) 在該博客中,我們將深入講解Spark的基於成本的優化器(CBO)並討論Spark是如何收集並存儲這些數據、優化查詢,並在壓力測試查詢中展示所帶來的性能影響。

一個啟發性的例子

在Spark2.2核心,Catalyst優化器是一個統一的庫,用於將查詢計劃表示成多顆樹並依次使用多個優化規則來變換他們。大部門優化規則都基於啟發式,例如,他們只負責查詢的結構且不關心要處理數據的屬性,這樣嚴重限制了他們的可用性。讓我們用一個簡單的例子來演示。考慮以下的查詢,該查詢過濾大小為500GB的t1表並與另一張大小為20GB的t2表做join操作。Spark使用hash join,即選擇小的join關系作為構建hash表的一方並選擇大的join關系作為探測方。由於t2表比t1表小, Apache Spark 2.1 將會選擇右方作為構建hash表的一方而不是對其進行過濾操作(在這個案例中就是會過濾出t1表的大部分數據)。選擇錯誤那方做構建hash表經常會導致系統由於內存限制的原因去放棄快速hash join而使用排序-歸並 join(sort-merge join)。
技術分享圖片 而Apache Spark 2.2卻不這麽做,它會收集每個操作的統計信息 並發現左方在過濾後大小只有100MB (1 百萬條紀錄) ,而過濾右方會有20GB (1億條紀錄)。有了兩側正確的表大小/基的信息,Spark 2.2會選擇左方為構建方,這種選擇會極大加快查詢速度。 為了改進查詢執行計劃的質量,我們使用詳細的統計信息加強了Spark SQL優化器。從詳細的統計信息中,我們傳播統計信息到別的操作子(因為我們從下往上遍歷查詢樹)。傳播結束,我們可以估計每個數據庫操作子的輸出記錄數和輸出紀錄的大小,這樣就可以得到一個高效的查詢計劃。

統計信息收集框架

ANALYZE TABLE 命令

CBO依賴細節化的統計信息來優化查詢計劃。要收集這些統計信息,用戶可以使用以下這些新的SQL命令:
1 ANALYZE TABLE table_name COMPUTE STATISTICS

上面的 SQL 語句可以收集表級的統計信息,例如記錄數、表大小(單位是byte)。這裏需要註意的是ANALYZE, COMPUTE, and STATISTICS都是保留的關鍵字,他們已特定的列名為入參,在metastore中保存表級的統計信息。

1 ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column-name1, column-name2, ….

需要註意的是在ANALYZE 語句中沒必要指定表的每個列-只要指定那些在過濾/join條件或group by等中涉及的列

統計信息類型

下表列出了所收集的統計信息的類型,包括數字類型、日期、時間戳和字符串、二進制數據類型 技術分享圖片 由於CBO是以後續方式遍歷Spark的邏輯計劃樹,我們可以自底向上地把這些統計信息傳播到其他操作子。雖然我們要評估的統計信息及對應開銷的操作子有很多,我們將講解兩個最復雜且有趣的操作子FILTER 和JOIN的評估統計信息的流程。

過濾選擇

過濾條件是配置在SQL SELECT語句中的WHERE 子句的謂語表達式。謂語可以是包含了邏輯操作子AND、OR、NOT且包含了多個條件的復雜的邏輯表達式 。單個條件通常包含比較操作子,例如=, <, <=, >, >= or <=>。因此,根據全部過濾表達式來估計選擇是非常復雜的。 我們來演示對包含多個條件邏輯表達式的復雜邏輯表達式做過濾選擇 的一些計算。
  • 對於邏輯表達式AND,他的過濾選擇是左條件的選擇乘以右條件選擇,例如fs(a AND b) = fs(a) * fs (b)。
  • 對於邏輯表達式OR,他的過濾選擇是左條件的選擇加上右條件選擇並減去左條件中邏輯表達式AND的選擇,例如 fs (a OR b) = fs (a) + fs (b) - fs (a AND b) = fs (a) + fs (b) – (fs (a) * fs (b))
  • 對於邏輯表達式NOT,他的過濾因子是1.0 減去原表達式的選擇,例如 fs (NOT a) = 1.0 - fs (a)
現在我們看下可能有多個操作子的單個邏輯條件例如 =, <, <=, >, >= or <=>。對於單個操作符作為列,另一個操作符為字符串的情況,我們先計算等於 (=) 和小於 (<) 算子的過濾選擇。其他的比較操作符也是類似。
  • 等於操作符 (=) :我們檢查條件中的字符串常量值是否落在列的當前最小值和最大值的區間內 。這步是必要的,因為如果先使用之前的條件可能會導致區間改變。如果常量值落在區間外,那麽過濾選擇就是 0.0。否則,就是去重後值的反轉(註意:不包含額外的柱狀圖信息,我們僅僅估計列值的統一分布)。後面發布的版本將會均衡柱狀圖來優化估計的準確性。
  • 小於操作符 (<) :檢查條件中的字符串常量值落在哪個區間。如果比當前列值的最小值還小,那麽過濾選擇就是 0.0(如果大於最大值,選擇即為1.0)。否則,我們基於可用的信息計算過濾因子。如果沒有柱狀圖,就傳播並把過濾選擇設置為: (常量值– 最小值) / (最大值 – 最小值)。另外,如果有柱狀圖,在計算過濾選擇時就會加上在當前列最小值和常量值之間的柱狀圖桶密度 。同時,註意在條件右邊的常量值此時變成了該列的最大值。

Join基數

我們已經討論了過濾選擇, 現在討論join的輸出基。在計算二路join的輸出基之前,我們需要先有雙方孩子節點的輸出基 。每個join端的基都不會超過原表記錄數的基。更準確的說,是在執行join操作子之前,執行所有操作後得到的有效紀錄數。在此,我們偏好計算下內連接(inner join)操作的基因為它經常用於演化出其他join類型的基。我們計算下在 A.k = B.k 條件下A join B 的記錄數 ,即 num(A IJ B) = num(A)*num(B)/max(distinct(A.k),distinct(B.k)) num(A) 是join操作上一步操作執行後表A的有效記錄數, distinct是join列 k唯一值的數量。 如下所示,通過計算內連接基,我們可以大概演化出其他join類型的基:
  • 左外連接(Left-Outer Join): num(A LOJ B) = max(num(A IJ B),num(A)) 是指內連接輸出基和左外連接端A的基之間較大的值。這是因為我們需要把外端的每條紀錄計入,雖然他們沒有出現在join輸出紀錄內。
  • Right-Outer Join: num(A ROJ B) = max(num(A IJ B),num(B))
  • Full-Outer Join: num(A FOJ B) = num(A LOJ B) + num(A ROJ B) - num(A IJ B)

最優計劃選擇

現在我們已經有了數據統計的中間結果,讓我們討論下如何使用這個信息來選擇最佳的查詢計劃。早先我們解釋了在hash join操作中根據精確的基和統計信息選擇構建方。 同樣,根據確定的基和join操作的前置所有操作的大小估計,我們可以更好的估計join測的大小來決定該測是否符合廣播的條件。 這些統計信息同時也有助於我們均衡基於成本的join的重排序優化。我們適配了動態編程算法[Selinger 1979]3 來選取最佳的多路join順序。更確切的說,在構建多路join時候,我們僅考慮同個集合(包含m個元素)的最佳方案(成本最低)。舉個例子,對於3路join,我們根據元素 {A, B, C} 可能的排列順序,即 (A join B) join C, (A join C) join B 和(B join C) join A來考慮最佳join方案。我們適配 的算法考慮了所有的組合,包括左線性樹(left-deep trees),濃密樹(bushy trees)和右線性樹(right-deep-trees)。我們還修剪笛卡兒積(cartesian product )用於在構建新的計劃時如果左右子樹都沒有join條件包含的引用需要情況。這個修剪策略顯著減少了搜索範圍。 大部分數據庫優化器將CPU和I/O計入考慮因素,分開考慮成本來估計總共的操作開銷。在Spark中,我們用簡單的公式估計join操作的成本: cost = weight * cardinality + (1.0 - weight) * size 4 公式的第一部分對應CPU成本粗略值,第二部分對應IO。一顆join樹的成本是所有中間join成本的總和。

查詢的性能測試和分析

我們使用非侵入式方法把這些基於成本的優化加入到Spark,通過加入全局配置spark.sql.cbo.enabled來開關這個特性。在Spark 2.2, 這個參數默認是false 。短期內有意設置該特性默認為關閉,因為的Spark被上千家公司用於生產環境,默認開啟該特性可能會導致生產環境壓力變大從而導致不良後果。

配置及方法學

在四個節點 (單臺配置:Huawei FusionServer RH2288 , 40 核和384 GB 內存) 的集群用TPC-DS來測試Apache Spark 2.2查詢性能。在四個節點的集群運行測試查詢性能的語句並設比例因子為1000(大概1TB數據)。收集全部24張表(總共245列)的統計信息大概要14分鐘。 在校驗端到端的結果前,我們先看一條查詢語句TPC-DS(Q25; 如下所示)來更好了解基於成本的join排序帶來的威力。這句查詢語句包括三張事實表: store_sales (29 億行紀錄), store_returns (2.88 億行紀錄) 和catalog_sales (14.4 億行紀錄). 同時也包括三張維度表: date_dim(7.3萬行紀錄), store (1K 行紀錄) 和 item (300K 行紀錄).
 1 SELECT
 2  i_item_id,
 3  i_item_desc,
 4  s_store_id,
 5  s_store_name,
 6  sum(ss_net_profit) AS store_sales_profit,
 7  sum(sr_net_loss) AS store_returns_loss,
 8  sum(cs_net_profit) AS catalog_sales_profit
 9 FROM
10  store_sales, store_returns, catalog_sales, date_dim d1, date_dim d2, date_dim d3,
11  store, item
12 WHERE
13  d1.d_moy = 4
14    AND d1.d_year = 2001
15    AND d1.d_date_sk = ss_sold_date_sk
16    AND i_item_sk = ss_item_sk
17    AND s_store_sk = ss_store_sk
18    AND ss_customer_sk = sr_customer_sk
19    AND ss_item_sk = sr_item_sk
20    AND ss_ticket_number = sr_ticket_number
21    AND sr_returned_date_sk = d2.d_date_sk
22    AND d2.d_moy BETWEEN 4 AND 10
23    AND d2.d_year = 2001
24    AND sr_customer_sk = cs_bill_customer_sk
25    AND sr_item_sk = cs_item_sk
26    AND cs_sold_date_sk = d3.d_date_sk
27    AND d3.d_moy BETWEEN 4 AND 10
28    AND d3.d_year = 2001
29 GROUP BY
30  i_item_id, i_item_desc, s_store_id, s_store_name
31 ORDER BY
32  i_item_id, i_item_desc, s_store_id, s_store_name
33 LIMIT 100

沒使用CBO的Q25

我們先看下沒使用基於成本優化的Q25的join樹(如下)。一般這種樹也叫做左線性樹。這裏, join #1 和 #2 是大的事實表與事實表join,join了3張事實表store_sales, store_returns, 和catalog_sales,並產生大的中間結果表。這兩個join都以shuffle join的方式執行並會產生大的輸出,其中join #1輸出了1.99億行紀錄。總之,關閉CBO,查詢花費了241秒。 技術分享圖片

使用了CBO的Q25

另一方面,用了CBO,Spark創建了優化方案可以減小中間結果(如下)。在該案例中,Spark創建了濃密樹而不是左-深度樹。在CBO規則下,Spark 先join 的是事實表對應的維度表 (在嘗試直接join事實表前)。避免大表join意味著避免了大開銷的shuffle。在這次查詢中,中間結果大小縮小到原來的1/6(相比之前)。最後,Q25只花了71秒,性能提升了3.4倍。 技術分享圖片

TPC-DS 查詢性能

現在我們對性能提升的原因有了直觀感受,我們再看下端到端的TPC-DS查詢結果。下表展示了使用CBO或沒使用CBO下所有TPC-DS查詢花費的: 技術分享圖片 首先,要註意的是一半TPC-DS性能查詢沒有性能的改變。這是因為使用或沒使用CBO的查詢計劃沒有不同 (例如,即使沒有CBO, Spark’s Catalyst 優化器的柱狀圖也可以優化這些查詢。剩下的查詢性能都有提升,最有意思的其中16個查詢,CBO對查詢計劃進行巨大改變並帶來了超過30%的性能提升(如下)總的來說,我們觀察的圖標說明16個查詢大概加速了2.2倍,其中Q72 加速最大,達到了8倍。 技術分享圖片

結論

回顧前文,該博客展示了Apache Spark 2.2新的CBO不同的高光層面的。我們討論了統計信息收集框架的細節、過濾和join時的基傳播、CBO開啟(選擇構建方和多路重排序)以及TPC-DS查詢性能的提升。 去年,我們針對CBO umbrella JIRA SPARK-16026總共處理了32個子任務,涉及到50多個補丁和7000多行代碼。也就是說,在分布式數據庫 均衡CBO是非常困難的而這也是向這個方向邁出的一小步。在以後的版本中,我們計劃繼續往這個方向做下去,繼續加入更復雜的統計信息(直方圖、總記錄數-最小粗略估計、統計信息分區程度,等等)並改進我們的公式。 我們對已經取得的進展感到十分興奮並希望你們喜歡這些改進。我們希望你們能在Apache Spark 2.2中嘗試新的CBO!

延伸閱讀

可以查看Spark 2017(峰會) 演講: Cost Based Optimizer in Spark 2.2
  1. 原理就是較小的關系更容易放到內存
  2. <=> 表示‘安全的空值相等’ ,如果兩邊的結果都是null就返回true,如果只有一邊是null就返回false
  3. P. Griffiths Selinger, M. M. Astrahan, D. D. Chamberlin, R. A. Lorie, T. G. Price, “Access Path Selection in a Relational Database Management System”, Proceedings of ACM SIGMOD conference, 1979
  4. weight(權值)是調優參數,可以通過配置 spark.sql.cbo.joinReorder.card.weight (默認是0.7)

Apache Spark 2.2中基於成本的優化器(CBO)(轉載)