1. 程式人生 > >Spark SQL 原始碼分析之Physical Plan 到 RDD的具體實現

Spark SQL 原始碼分析之Physical Plan 到 RDD的具體實現

  我們都知道一段sql,真正的執行是當你呼叫它的collect()方法才會執行Spark Job,最後計算得到RDD。
  lazy val toRdd: RDD[Row] = executedPlan.execute()

  Spark Plan基本包含4種操作型別,即BasicOperator基本型別,還有就是Join、Aggregate和Sort這種稍複雜的。

  如圖:

  

一、BasicOperator

1.1、Project

  Project 的大致含義是:傳入一系列表示式Seq[NamedExpression],給定輸入的Row,經過Convert(Expression的計算eval)操作,生成一個新的Row。  Project的實現是呼叫其child.execute()方法,然後呼叫mapPartitions對每一個Partition進行操作。
  這個f函式其實是new了一個MutableProjection,然後迴圈的對每個partition進行Convert。
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
  override def output = projectList.map(_.toAttribute)
  override def execute() = child.execute().mapPartitions { iter => //對每個分割槽進行f對映
    @transient val reusableProjection = new MutableProjection(projectList) 
    iter.map(reusableProjection)
  }
}
  通過觀察MutableProjection的定義,可以發現,就是bind references to a schema 和 eval的過程:  將一個Row轉換為另一個已經定義好schema column的Row。
  如果輸入的Row已經有Schema了,則傳入的Seq[Expression]也會bound到當前的Schema。
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
    this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema

  private[this] val exprArray = expressions.toArray
  private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row
  def currentValue: Row = mutableRow
  def apply(input: Row): Row = {
    var i = 0
    while (i < exprArray.length) {
      mutableRow(i) = exprArray(i).eval(input)  //根據輸入的input,即一個Row,計算生成的Row
      i += 1
    }
    mutableRow //返回新的Row
  }
}

1.2、Filter

 Filter的具體實現是傳入的condition進行對input row的eval計算,最後返回的是一個Boolean型別, 如果表示式計算成功,返回true,則這個分割槽的這條資料就會儲存下來,否則會過濾掉。
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
  override def output = child.output

  override def execute() = child.execute().mapPartitions { iter =>
    iter.filter(condition.eval(_).asInstanceOf[Boolean]) //計算表示式 eval(input row)
  }
}

1.3、Sample

  Sample取樣操作其實是呼叫了child.execute()的結果後,返回的是一個RDD,對這個RDD呼叫其sample函式,原生方法。
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
  extends UnaryNode
{
  override def output = child.output

  // TODO: How to pick seed?
  override def execute() = child.execute().sample(withReplacement, fraction, seed)
}

1.4、Union

  Union操作支援多個子查詢的Union,所以傳入的child是一個Seq[SparkPlan]  execute()方法的實現是對其所有的children,每一個進行execute(),即select查詢的結果集合RDD。  通過呼叫SparkContext的union方法,將所有子查詢的結果合併起來。
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {
  // TODO: attributes output by union should be distinct for nullability purposes
  override def output = children.head.output
  override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查詢的結果進行union

  override def otherCopyArgs = sqlContext :: Nil
}

1.5、Limit

  Limit操作在RDD的原生API裡也有,即take().  但是Limit的實現分2種情況:  第一種是 limit作為結尾的操作符,即select xxx from yyy limit zzz。 並且是被executeCollect呼叫,則直接在driver裡使用take方法。  第二種是 limit不是作為結尾的操作符,即limit後面還有查詢,那麼就在每個分割槽呼叫limit,最後repartition到一個分割槽來計算global limit.
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)
  extends UnaryNode {
  // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
  // partition local limit -> exchange into one partition -> partition local limit again

  override def otherCopyArgs = sqlContext :: Nil

  override def output = child.output

  override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver呼叫take

  override def execute() = {
    val rdd = child.execute().mapPartitions { iter =>
      val mutablePair = new MutablePair[Boolean, Row]()
      iter.take(limit).map(row => mutablePair.update(false, row)) //每個分割槽先計算limit
    }
    val part = new HashPartitioner(1)
    val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //需要shuffle,來repartition
    shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
    shuffled.mapPartitions(_.take(limit).map(_._2)) //最後單獨一個partition來take limit
  }
}

1.6、TakeOrdered

  TakeOrdered是經過排序後的limit N,一般是用在sort by 操作符後的limit。  可以簡單理解為TopN操作符。
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
                      (@transient sqlContext: SQLContext) extends UnaryNode {
  override def otherCopyArgs = sqlContext :: Nil

  override def output = child.output

  @transient
  lazy val ordering = new RowOrdering(sortOrder) //這裡是通過RowOrdering來實現排序的

  override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)

  // TODO: Terminal split should be implemented differently from non-terminal split.
  // TODO: Pick num splits based on |limit|.
  override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}

1.7、Sort

  Sort也是通過RowOrdering這個類來實現排序的,child.execute()對每個分割槽進行map,每個分割槽根據RowOrdering的order來進行排序,生成一個新的有序集合。  也是通過呼叫Spark RDD的sorted方法來實現的。
case class Sort(
    sortOrder: Seq[SortOrder],
    global: Boolean,
    child: SparkPlan)
  extends UnaryNode {
  override def requiredChildDistribution =
    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

  @transient
  lazy val ordering = new RowOrdering(sortOrder) //排序順序

  override def execute() = attachTree(this, "sort") {
    // TODO: Optimize sorting operation?
    child.execute()
      .mapPartitions(
        iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每個分割槽呼叫sorted方法,傳入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序規則,進行排序</span>
        preservesPartitioning = true)
  }

  override def output = child.output
}

1.8、ExistingRdd

ExistingRdd是
object ExistingRdd {
  def convertToCatalyst(a: Any): Any = a match {
    case o: Option[_] => o.orNull
    case s: Seq[Any] => s.map(convertToCatalyst)
    case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
    case other => other
  }

  def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
    data.mapPartitions { iterator =>
      if (iterator.isEmpty) {
        Iterator.empty
      } else {
        val bufferedIterator = iterator.buffered
        val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)

        bufferedIterator.map { r =>
          var i = 0
          while (i < mutableRow.length) {
            mutableRow(i) = convertToCatalyst(r.productElement(i))
            i += 1
          }

          mutableRow
        }
      }
    }
  }

  def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
    ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
  }
}

二、 Join Related Operators

  HashJoin:

  在講解Join Related Operator之前,有必要了解一下HashJoin這個位於execution包下的joins.scala檔案裡的trait。  Join操作主要包含BroadcastHashJoinLeftSemiJoinHashShuffledHashJoin均實現了HashJoin這個trait.  主要類圖如下:    HashJoin這個trait的主要成員有:  buildSide是左連線還是右連線,有一種基準的意思。  leftKeys是左孩子的expressions, rightKeys是右孩子的expressions。  left是左孩子物理計劃,right是右孩子物理計劃。  buildSideKeyGenerator是一個Projection是根據傳入的Row物件來計算buildSide的Expression的。  streamSideKeyGenerator是一個MutableProjection是根據傳入的Row物件來計算streamSide的Expression的。  這裡buildSide如果是left的話,可以理解為buildSide是左表,那麼去連線這個左表的右表就是streamSide。    HashJoin關鍵的操作是joinIterators,簡單來說就是join兩個表,把每個表看著Iterators[Row].  方式:  1、首先遍歷buildSide,計算buildKeys然後利用一個HashMap,形成 (buildKeys, Iterators[Row])的格式。  2、遍歷StreamedSide,計算streamedKey,去HashMap裡面去匹配key,來進行join  3、最後生成一個joinRow,這個將2個row對接。  見程式碼註釋:
trait HashJoin {
  val leftKeys: Seq[Expression]
  val rightKeys: Seq[Expression]
  val buildSide: BuildSide
  val left: SparkPlan
  val right: SparkPlan

  lazy val (buildPlan, streamedPlan) = buildSide match {  //模式匹配,將physical plan封裝形成Tuple2,如果是buildLeft,那麼就是(left,right),否則是(right,left)
    case BuildLeft => (left, right)
    case BuildRight => (right, left)
  }

  lazy val (buildKeys, streamedKeys) = buildSide match { //模式匹配,將expression進行封裝<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span>

    case BuildLeft => (leftKeys, rightKeys)
    case BuildRight => (rightKeys, leftKeys)
  }

  def output = left.output ++ right.output

  @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) //生成buildSideKey來根據Expression來計算Row返回結果
  @transient lazy val streamSideKeyGenerator = //<span style="font-family: Arial, Helvetica, sans-serif;">生成</span><span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span><span style="font-family: Arial, Helvetica, sans-serif;">來根據Expression來計算Row返回結果</span>
    () => new MutableProjection(streamedKeys, streamedPlan.output)

  def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { //把build表的Iterator[Row]和streamIterator[Row]進行join操作返回Join後的Iterator[Row]
    // TODO: Use Spark's HashMap implementation.

    val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //匹配主要使用HashMap實現
    var currentRow: Row = null

    // Create a mapping of buildKeys -> rows 
    while (buildIter.hasNext) { //目前只對build Iterator進行迭代,形成rowKey,Rows,類似wordCount,但是這裡不是累加Value,而是Row的集合。
      currentRow = buildIter.next()
      val rowKey = buildSideKeyGenerator(currentRow) //計算rowKey作為HashMap的key
      if(!rowKey.anyNull) {
        val existingMatchList = hashTable.get(rowKey)
        val matchList = if (existingMatchList == null) {
          val newMatchList = new ArrayBuffer[Row]()
          hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList)
          newMatchList
        } else {
          existingMatchList
        }
        matchList += currentRow.copy() //返回matchList
      }
    }

    new Iterator[Row] { //最後用streamedRow的Key來匹配buildSide端的HashMap
      private[this] var currentStreamedRow: Row = _
      private[this] var currentHashMatches: ArrayBuffer[Row] = _
      private[this] var currentMatchPosition: Int = -1

      // Mutable per row objects.
      private[this] val joinRow = new JoinedRow

      private[this] val joinKeys = streamSideKeyGenerator()

      override final def hasNext: Boolean =
        (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||
          (streamIter.hasNext && fetchNext())

      override final def next() = {
        val ret = buildSide match {
          case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) //右連線的話,streamedRow放左邊,匹配到的key的Row放到右表
          case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) //左連線的話,相反。
        }
        currentMatchPosition += 1
        ret
      }

      /**
       * Searches the streamed iterator for the next row that has at least one match in hashtable.
       *
       * @return true if the search is successful, and false if the streamed iterator runs out of
       *         tuples.
       */
      private final def fetchNext(): Boolean = {
        currentHashMatches = null
        currentMatchPosition = -1

        while (currentHashMatches == null && streamIter.hasNext) {
          currentStreamedRow = streamIter.next()
          if (!joinKeys(currentStreamedRow).anyNull) {
            currentHashMatches = hashTable.get(joinKeys.currentValue) //streamedRow從buildSide裡的HashTable裡面匹配rowKey
          }
        }

        if (currentHashMatches == null) {
          false
        } else {
          currentMatchPosition = 0
          true
        }
      }
    }
  }
}
joinRow的實現,實現2個Row對接:實際上就是生成一個新的Array,將2個Array合併。
class JoinedRow extends Row {
  private[this] var row1: Row = _
  private[this] var row2: Row = _
  .........
   def copy() = {
    val totalSize = row1.size + row2.size 
    val copiedValues = new Array[Any](totalSize)
    var i = 0
    while(i < totalSize) {
      copiedValues(i) = apply(i)
      i += 1
    }
    new GenericRow(copiedValues) //返回一個新的合併後的Row
  }

2.1、LeftSemiJoinHash

 left semi join,不多說了,hive早期版本里替代IN和EXISTS 的版本。 將右表的join keys放到HashSet裡,然後遍歷左表,查詢左表的join key是否能匹配。
case class LeftSemiJoinHash(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    left: SparkPlan,
    right: SparkPlan) extends BinaryNode with HashJoin {

  val buildSide = BuildRight //buildSide是以右表為基準

  override def requiredChildDistribution =
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

  override def output = left.output

  def execute() = {
    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理計劃執行後生成RDD,利用zipPartitions對Partition進行合併。然後用上述方法實現。
      val hashSet = new java.util.HashSet[Row]()
      var currentRow: Row = null

      // Create a Hash set of buildKeys
      while (buildIter.hasNext) {
        currentRow = buildIter.next()
        val rowKey = buildSideKeyGenerator(currentRow)
        if(!rowKey.anyNull) {
          val keyExists = hashSet.contains(rowKey)
          if (!keyExists) {
            hashSet.add(rowKey)
          }
        }
      }

      val joinKeys = streamSideKeyGenerator()
      streamIter.filter(current => {
        !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
      })
    }
  }
}

2.2、BroadcastHashJoin

 名約: 廣播HashJoin,呵呵。  是InnerHashJoin的實現。這裡用到了concurrent併發裡的future,非同步的廣播buildPlan的表執行後的的RDD。  如果接收到了廣播後的表,那麼就用streamedPlan來匹配這個廣播的表。  實現是RDD的mapPartitions和HashJoin裡的joinIterators最後生成join的結果。
case class BroadcastHashJoin(
     leftKeys: Seq[Expression],
     rightKeys: Seq[Expression],
     buildSide: BuildSide,
     left: SparkPlan,
     right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {

  override def otherCopyArgs = sqlContext :: Nil

  override def outputPartitioning: Partitioning = left.outputPartitioning

  override def requiredChildDistribution =
    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

  @transient
  lazy val broadcastFuture = future {  //利用SparkContext廣播表
    sqlContext.sparkContext.broadcast(buildPlan.executeCollect())
  }

  def execute() = {
    val broadcastRelation = Await.result(broadcastFuture, 5.minute)

    streamedPlan.execute().mapPartitions { streamedIter =>
      joinIterators(broadcastRelation.value.iterator, streamedIter) //呼叫joinIterators對每個分割槽map
    }
  }
}

2.3、ShuffleHashJoin

ShuffleHashJoin顧名思義就是需要shuffle資料,outputPartitioning是左孩子的的Partitioning。會根據這個Partitioning進行shuffle。然後利用SparkContext裡的zipPartitions方法對每個分割槽進行zip。這裡的requiredChildDistribution,的是ClusteredDistribution,這個會在HashPartitioning裡面進行匹配。關於這裡面的分割槽這裡不贅述,可以去org.apache.spark.sql.catalyst.plans.physical下的partitioning裡面去檢視。
case class ShuffledHashJoin(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    buildSide: BuildSide,
    left: SparkPlan,
    right: SparkPlan) extends BinaryNode with HashJoin {

  override def outputPartitioning: Partitioning = left.outputPartitioning

  override def requiredChildDistribution =
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

  def execute() = {
    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    }
  }
}


未完待續 :)