1. 程式人生 > >Spark SQL原始碼解析(四)Optimization和Physical Planning階段解析

Spark SQL原始碼解析(四)Optimization和Physical Planning階段解析

Spark SQL原理解析前言: [Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述](https://www.cnblogs.com/listenfwind/p/12724381.html) [Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹](https://www.cnblogs.com/listenfwind/p/12735833.html) [Spark SQL原始碼解析(三)Analysis階段分析](https://www.cnblogs.com/listenfwind/p/12795934.html) 前面已經介紹了SQL parse,將一條SQL語句使用antlr4解析成語法樹並使用訪問者模式生成Unresolved LogicalPlan,然後是Analysis階段將Unresolved LogicalPlan轉換成Resolved LogicalPlan。這一篇我們介紹Optimization階段,和生成Physical Planning階段。 經過這兩個階段後,就差不多要到最後轉換成Spark的RDD任務了。 # Spark SQL Optimization階段概述 先來看看Logical Optimization階段。 上一篇我們討論了Analysis階段如何生成一個真正的Logical Plan樹。這一階段聽名字就知道是優化階段,Spark SQL中有兩個部分的優化,第一部分就是這裡,是rule-base階段的優化,就是根據各種關係代數的優化規則,對生成的Logical Plan適配,匹配到就進行相應的優化邏輯。這些規則大概有:投影消除,constant folding,替換null值,布林表示式簡化等等。當然大部分規則細節我也不是很清楚,僅僅能從名字推斷一二。這 同時還可以新增自己的優化rule,也比較容易實現,論文中就給出了一段自定義優化rule的程式碼: ``` object DecimalAggregates extends Rule[LogicalPlan] { /** Maximum number of decimal digits in a Long */ val MAX_LONG_DIGITS = 18 def apply(plan: LogicalPlan): LogicalPlan = { plan transformAllExpressions { case Sum(e @ DecimalType.Expression(prec , scale)) if prec + 10 <= MAX_LONG_DIGITS => MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) } } ``` 這段程式碼的大意是自定義了一個rule,如果匹配到SUM的表示式,那就執行相應的邏輯,論文裡描述這裡是找到對應的小數並將其轉換為未縮放的64位LONG。具體邏輯看不是很明白不過不重要,重要的是編寫自己的優化rule很方便就是。 順便點一下另一種優化,名字叫做cost-base優化(CBO),是發生在Physical Planning階段的,這裡就先賣個關子,後面說到的時候再討論吧。 然後看到原始碼的時候,會發現Optimizer這個類也是繼承自RuleExecutor,繼承這個類之後的流程基本都是一樣的。前面分析Analysis階段的時候已經有詳細介紹過這個流程,這裡就不展開說了。 其實這優化器的重點應該是各種優化規則,這裡我覺得更多的是設計到關係代數表示式優化理論方面的知識,這部分我也不甚精通,所以也就不說了。對這塊感興趣的童鞋可以看看網上別人的文章,這裡順便列幾個可能有幫助的部落格, - [sparksql 中外連線查詢中的謂詞下推處理](https://cloud.tencent.com/developer/article/1005925) - [資料庫查詢優化入門: 代數與物理優化基礎](https://www.jianshu.com/p/edf503a2a1e7) - [「 資料庫原理 」查詢優化(關係代數表示式優化)](http://www.ptbird.cn/optimization-of-relational-algebraic-expression.html) 下面還是來看看最開始的例子進行Optimization階段後會變成什麼樣吧,先看看之前的示例程式碼: ``` val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") val queryCaseWhen = sql("select key from src ") ``` 然後在Optimization優化階段後,變成了: ``` Project [_1#2 AS key#5] +- LocalRelation [_1#2, _2#3] ``` 好吧,看起來沒什麼變化,與Analysis階段相比,也就少了個SubqueryAlias ,符合預期。不過也對,就一條SELECT語句能優化到哪去啊。 # Physical Planning生成階段概述 相比較於Logical Plan,Physical plan算是Spark可以去執行的東西了,當然本質上它也是一棵樹。 前面說到,Spark有一種cost-based的優化。主要就在這一階段,在這一階段,會生成一個或多個Physical Plan,然後使用cost model預估各個Physical Plan的處理效能,最後選擇一個最優的Physical Plan。這裡最主要優化的是join操作,當觸發join操作的時候,會根據左右兩邊的資料集判斷,然後決定使用Broadcast join,還是傳統的Hash join,抑或是MergeSort join,有關這幾種join的區別這裡就不詳細解釋了,有興趣童鞋可以百度看看。 除了cost-based優化,這一階段也依舊會有rule-based優化,所以說RuleExecutor這個類是很重要的,前面提到的Analysis階段也好,Optimization階段也好,包括這裡的Physical Plan階段,只要是涉及到rule-based優化,都會跟RuleExecutor這個類扯上關係。當然這樣無疑是極大使用了面向物件的特性,不同的階段編寫不同的rule就行,一次編寫,到處複用。 # Physical Planning原始碼分析 首先是在QueryExecution中排程, ``` class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其他程式碼 lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } ......其他程式碼 } ``` 這裡的planner是org.apache.spark.sql.execution.SparkPlanner這個類,而這個類繼承自org.apache.spark.sql.catalyst.planning.QueryPlanner,plan()方法也是在父類QueryPlanner中實現的。和RuleExecution類似,QueryPlanner中有一個返回Seq[GenericStrategy[PhysicalPlan]]的方法:**def strategies: Seq[GenericStrategy[PhysicalPlan]]**,這個方法會在子類(也就是SparkPlanner)重寫,然後被QueryPlanner的plan()方法呼叫。 我們來看看SparkPlanner中strategies方法的重寫,再來看QueryPlanner的plan()方法吧。 ``` class SparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, val experimentalMethods: ExperimentalMethods) extends SparkStrategies { ......其他程式碼 override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( PythonEvals :: DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: Aggregation :: Window :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil) ......其他程式碼 ``` strategies()返回策略列表,是生成策略GenericStrategy,這是個具體的抽象類,位於org.apache.spark.sql.catalyst.planning包。所謂生成策略,就是決定如果根據Logical Plan生成Physical Plan的策略。比如上面介紹的join操作可以生成Broadcast join,Hash join,抑或是MergeSort join,就是一種生成策略,具體的類就是上面程式碼中的JoinSelection。每個生成策略GenericStrategy都是object,其apply()方法返回的是Seq[SparkPlan],**這裡的SparkPlan就是PhysicalPlan**(注意:下文會將SparkPlan和PhysicalPlan混著用)。 明白了生成策略後,就可以來看看QueryPlanner的plan()方法了。 ``` abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { ......其他程式碼 def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... // Collect physical plan candidates. val candidates = strategies.iterator.flatMap(_(plan)) //迭代呼叫並平鋪,變成Iterator[SparkPlan] // The candidates may contain placeholders marked as [[planLater]], // so try to replace them by their child plans. val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate) if (placeholders.isEmpty) { // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder. val childPlans = this.plan(logicalPlan) candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { case p if p.eq(placeholder) =>
childPlan } } } } } } val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned } ......其他程式碼 } ``` 這裡的流程其實不難,主要工作其實就是呼叫各個生成策略GenericStrategy的apply()方法,生成Iterator[SparkPlan]。後面很大部分程式碼是處理佔位符,按我的理解,在生成Logical Plan的時候,可能有些無意義的佔位符,這種需要使用子節點替換調它。倒數第三行prunePlans()方法按註釋說是用來去掉bad plan的,但看實際程式碼只是原封不動返回。 這樣最終就得到一個Iterator[SparkPlan],每個SparkPlan就是可執行的物理操作了。 大致流程就是如此,當然具體到一些生成策略沒有細說,包括輸入源策略,聚合策略等等,每一個都蠻複雜的,這裡就不細說,有興趣可以自行查閱。 對了,最後還要看看示例程式碼到這一步變成什麼樣了,先上示例程式碼: ``` //生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //呼叫spark.sql val queryCaseWhen = sql("select key from src ") ``` 經過Physical Planning階段後,變成如下: ``` Project [_1#2 AS key#5] +- LocalTableScan [_1#2, _2#3] ``` 對比上面的optimized階段,直觀看就是LocalRelation變成LocalTableScan。變得更加具體了,但實際上,Project也變了,雖然列印名字相同,但一個的型別是Project,本質上是LogicalPlan。而一個是ProjectExec,本質上是SparkPlan(也就是PhysicalPlan)。這一點通過斷點看的更清楚。 ![](https://img2020.cnblogs.com/blog/1011838/202004/1011838-20200422180713613-1374963727.png) 到這一步已經很解決終點了,後面再經過一個Preparations階段就能生成RDD了,剩下的部分留待下篇介紹吧。