Spark SQL 源代碼分析之Physical Plan 到 RDD的詳細實現
/** Spark SQL源代碼分析系列文章*/
接上一篇文章Spark SQL Catalyst源代碼分析之Physical Plan。本文將介紹Physical Plan的toRDD的詳細實現細節:
我們都知道一段sql,真正的運行是當你調用它的collect()方法才會運行Spark Job,最後計算得到RDD。lazy val toRdd: RDD[Row] = executedPlan.execute()
Spark Plan基本包括4種操作類型,即BasicOperator基本類型,還有就是Join、Aggregate和Sort這樣的稍復雜的。
如圖:
一、BasicOperator
1.1、Project
Project 的大致含義是:傳入一系列表達式Seq[NamedExpression],給定輸入的Row。經過Convert(Expression的計算eval)操作。生成一個新的Row。 Project的實現是調用其child.execute()方法,然後調用mapPartitions對每一個Partition進行操作。這個f函數事實上是new了一個MutableProjection,然後循環的對每一個partition進行Convert。
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output = projectList.map(_.toAttribute) override def execute() = child.execute().mapPartitions { iter => //對每一個分區進行f映射 @transient val reusableProjection = new MutableProjection(projectList) iter.map(reusableProjection) } }
假設輸入的Row已經有Schema了,則傳入的Seq[Expression]也會bound到當前的Schema。
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) { def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema private[this] val exprArray = expressions.toArray private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row def currentValue: Row = mutableRow def apply(input: Row): Row = { var i = 0 while (i < exprArray.length) { mutableRow(i) = exprArray(i).eval(input) //依據輸入的input,即一個Row,計算生成的Row i += 1 } mutableRow //返回新的Row } }
1.2、Filter
Filter的詳細實現是傳入的condition進行對input row的eval計算。最後返回的是一個Boolean類型, 假設表達式計算成功。返回true,則這個分區的這條數據就會保存下來,否則會過濾掉。case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { override def output = child.output override def execute() = child.execute().mapPartitions { iter => iter.filter(condition.eval(_).asInstanceOf[Boolean]) //計算表達式 eval(input row) } }
1.3、Sample
Sample取樣操作事實上是調用了child.execute()的結果後,返回的是一個RDD,對這個RDD調用其sample函數,原生方法。case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan) extends UnaryNode { override def output = child.output // TODO: How to pick seed? override def execute() = child.execute().sample(withReplacement, fraction, seed) }
1.4、Union
Union操作支持多個子查詢的Union,所以傳入的child是一個Seq[SparkPlan] execute()方法的實現是對其全部的children,每一個進行execute()。即select查詢的結果集合RDD。 通過調用SparkContext的union方法。將全部子查詢的結果合並起來。case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output = children.head.output override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查詢的結果進行union override def otherCopyArgs = sqlContext :: Nil }
1.5、Limit
Limit操作在RDD的原生API裏也有。即take(). 可是Limit的實現分2種情況: 第一種是 limit作為結尾的操作符,即select xxx from yyy limit zzz。 而且是被executeCollect調用,則直接在driver裏使用take方法。 另外一種是 limit不是作為結尾的操作符。即limit後面還有查詢,那麽就在每一個分區調用limit,最後repartition到一個分區來計算global limit.case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext) extends UnaryNode { // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again override def otherCopyArgs = sqlContext :: Nil override def output = child.output override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver調用take override def execute() = { val rdd = child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Boolean, Row]() iter.take(limit).map(row => mutablePair.update(false, row)) //每一個分區先計算limit } val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //須要shuffle,來repartition shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.mapPartitions(_.take(limit).map(_._2)) //最後單獨一個partition來take limit } }
1.6、TakeOrdered
TakeOrdered是經過排序後的limit N,通常是用在sort by 操作符後的limit。 能夠簡單理解為TopN操作符。case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) (@transient sqlContext: SQLContext) extends UnaryNode { override def otherCopyArgs = sqlContext :: Nil override def output = child.output @transient lazy val ordering = new RowOrdering(sortOrder) //這裏是通過RowOrdering來實現排序的 override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1) }
1.7、Sort
Sort也是通過RowOrdering這個類來實現排序的,child.execute()對每一個分區進行map,每一個分區依據RowOrdering的order來進行排序,生成一個新的有序集合。 也是通過調用Spark RDD的sorted方法來實現的。case class Sort( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan) extends UnaryNode { override def requiredChildDistribution = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil @transient lazy val ordering = new RowOrdering(sortOrder) //排序順序 override def execute() = attachTree(this, "sort") { // TODO: Optimize sorting operation? child.execute() .mapPartitions( iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每一個分區調用sorted方法,傳入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序規則,進行排序</span> preservesPartitioning = true) } override def output = child.output }
1.8、ExistingRdd
ExistingRdd是object ExistingRdd { def convertToCatalyst(a: Any): Any = a match { case o: Option[_] => o.orNull case s: Seq[Any] => s.map(convertToCatalyst) case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray) case other => other } def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { data.mapPartitions { iterator => if (iterator.isEmpty) { Iterator.empty } else { val bufferedIterator = iterator.buffered val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity) bufferedIterator.map { r => var i = 0 while (i < mutableRow.length) { mutableRow(i) = convertToCatalyst(r.productElement(i)) i += 1 } mutableRow } } } } def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = { ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd)) } }
二、 Join Related Operators
HashJoin:
在解說Join Related Operator之前。有必要了解一下HashJoin這個位於execution包下的joins.scala文件中的trait。 Join操作主要包括BroadcastHashJoin、LeftSemiJoinHash、ShuffledHashJoin均實現了HashJoin這個trait. 主要類圖例如以下: HashJoin這個trait的主要成員有: buildSide是左連接還是右連接,有一種基準的意思。 leftKeys是左孩子的expressions, rightKeys是右孩子的expressions。 left是左孩子物理計劃,right是右孩子物理計劃。 buildSideKeyGenerator是一個Projection是依據傳入的Row對象來計算buildSide的Expression的。 streamSideKeyGenerator是一個MutableProjection是依據傳入的Row對象來計算streamSide的Expression的。 這裏buildSide假設是left的話,能夠理解為buildSide是左表,那麽去連接這個左表的右表就是streamSide。 HashJoin關鍵的操作是joinIterators。簡單來說就是join兩個表。把每一個表看著Iterators[Row]. 方式: 1、首先遍歷buildSide,計算buildKeys然後利用一個HashMap,形成 (buildKeys, Iterators[Row])的格式。 2、遍歷StreamedSide。計算streamedKey,去HashMap裏面去匹配key,來進行join 3、最後生成一個joinRow,這個將2個row對接。 見代碼凝視:trait HashJoin { val leftKeys: Seq[Expression] val rightKeys: Seq[Expression] val buildSide: BuildSide val left: SparkPlan val right: SparkPlan lazy val (buildPlan, streamedPlan) = buildSide match { //模式匹配,將physical plan封裝形成Tuple2,假設是buildLeft。那麽就是(left,right),否則是(right,left) case BuildLeft => (left, right) case BuildRight => (right, left) } lazy val (buildKeys, streamedKeys) = buildSide match { //模式匹配,將expression進行封裝<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span> case BuildLeft => (leftKeys, rightKeys) case BuildRight => (rightKeys, leftKeys) } def output = left.output ++ right.output @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) //生成buildSideKey來依據Expression來計算Row返回結果 @transient lazy val streamSideKeyGenerator = //<span style="font-family: Arial, Helvetica, sans-serif;">生成</span><span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span><span style="font-family: Arial, Helvetica, sans-serif;">來依據Expression來計算Row返回結果</span> () => new MutableProjection(streamedKeys, streamedPlan.output) def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { //把build表的Iterator[Row]和streamIterator[Row]進行join操作返回Join後的Iterator[Row] // TODO: Use Spark‘s HashMap implementation. val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //匹配主要使用HashMap實現 var currentRow: Row = null // Create a mapping of buildKeys -> rows while (buildIter.hasNext) { //眼下僅僅對build Iterator進行叠代,形成rowKey,Rows,相似wordCount,可是這裏不是累加Value,而是Row的集合。joinRow的實現,實現2個Row對接:實際上就是生成一個新的Array,將2個Array合並。currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) //計算rowKey作為HashMap的key if(!rowKey.anyNull) { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new ArrayBuffer[Row]() hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList) newMatchList } else { existingMatchList } matchList += currentRow.copy() //返回matchList } } new Iterator[Row] { //最後用streamedRow的Key來匹配buildSide端的HashMap private[this] var currentStreamedRow: Row = _ private[this] var currentHashMatches: ArrayBuffer[Row] = _ private[this] var currentMatchPosition: Int = -1 // Mutable per row objects. private[this] val joinRow = new JoinedRow private[this] val joinKeys = streamSideKeyGenerator() override final def hasNext: Boolean = (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || (streamIter.hasNext && fetchNext()) override final def next() = { val ret = buildSide match { case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) //右連接的話,streamedRow放左邊。匹配到的key的Row放到右表 case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) //左連接的話,相反。
} currentMatchPosition += 1 ret } /** * Searches the streamed iterator for the next row that has at least one match in hashtable. * * @return true if the search is successful, and false if the streamed iterator runs out of * tuples. */ private final def fetchNext(): Boolean = { currentHashMatches = null currentMatchPosition = -1 while (currentHashMatches == null && streamIter.hasNext) { currentStreamedRow = streamIter.next() if (!joinKeys(currentStreamedRow).anyNull) { currentHashMatches = hashTable.get(joinKeys.currentValue) //streamedRow從buildSide裏的HashTable裏面匹配rowKey } } if (currentHashMatches == null) { false } else { currentMatchPosition = 0 true } } } } }
class JoinedRow extends Row { private[this] var row1: Row = _ private[this] var row2: Row = _ ......... def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) var i = 0 while(i < totalSize) { copiedValues(i) = apply(i) i += 1 } new GenericRow(copiedValues) //返回一個新的合並後的Row }
2.1、LeftSemiJoinHash
left semi join,不多說了。hive早期版本號裏替代IN和EXISTS 的版本號。 將右表的join keys放到HashSet裏。然後遍歷左表,查找左表的join key能否匹配。case class LeftSemiJoinHash( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { val buildSide = BuildRight //buildSide是以右表為基準 override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil override def output = left.output def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理計劃運行後生成RDD,利用zipPartitions對Partition進行合並。然後用上述方法實現。 val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null // Create a Hash set of buildKeys while (buildIter.hasNext) { currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) if(!rowKey.anyNull) { val keyExists = hashSet.contains(rowKey) if (!keyExists) { hashSet.add(rowKey) } } } val joinKeys = streamSideKeyGenerator() streamIter.filter(current => { !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue) }) } } }
2.2、BroadcastHashJoin
名約: 廣播HashJoin,呵呵。 是InnerHashJoin的實現。這裏用到了concurrent並發裏的future,異步的廣播buildPlan的表運行後的的RDD。 假設接收到了廣播後的表,那麽就用streamedPlan來匹配這個廣播的表。 實現是RDD的mapPartitions和HashJoin裏的joinIterators最後生成join的結果。case class BroadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, left: SparkPlan, right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin { override def otherCopyArgs = sqlContext :: Nil override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = UnspecifiedDistribution :: UnspecifiedDistribution :: Nil @transient lazy val broadcastFuture = future { //利用SparkContext廣播表 sqlContext.sparkContext.broadcast(buildPlan.executeCollect()) } def execute() = { val broadcastRelation = Await.result(broadcastFuture, 5.minute) streamedPlan.execute().mapPartitions { streamedIter => joinIterators(broadcastRelation.value.iterator, streamedIter) //調用joinIterators對每一個分區map } } }
2.3、ShuffleHashJoin
ShuffleHashJoin顧名思義就是須要shuffle數據,outputPartitioning是左孩子的的Partitioning。會依據這個Partitioning進行shuffle。然後利用SparkContext裏的zipPartitions方法對每一個分區進行zip。
這裏的requiredChildDistribution。的是ClusteredDistribution,這個會在HashPartitioning裏面進行匹配。關於這裏面的分區這裏不贅述,能夠去org.apache.spark.sql.catalyst.plans.physical下的partitioning裏面去查看。case class ShuffledHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => joinIterators(buildIter, streamIter) } } }
未完待續 :)
原創文章,轉載請註明:
轉載自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory
本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/38274621
註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議。歡迎轉載、轉發和評論。可是請保留本文作者署名和文章鏈接。如若須要用於商業目的或者與授權方面的協商,請聯系我。
Spark SQL 源代碼分析之Physical Plan 到 RDD的詳細實現