1. 程式人生 > >第十篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 query

第十篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 query

pro .net asn 解析 partition store exec attr_ array

/** Spark SQL源碼分析系列文章*/

前面講到了Spark SQL In-Memory Columnar Storage的存儲結構是基於列存儲的。

那麽基於以上存儲結構,我們查詢cache在jvm內的數據又是如何查詢的,本文將揭示查詢In-Memory Data的方式。

一、引子

本例使用hive console裏查詢cache後的src表。 select value from src

當我們將src表cache到了內存後,再次查詢src,可以通過analyzed執行計劃來觀察內部調用。

即parse後,會形成InMemoryRelation結點,最後執行物理計劃時,會調用InMemoryColumnarTableScan這個結點的方法。

如下:

[java] view plain copy
  1. scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)
  2. 14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src
  3. 14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed
  4. exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =
  5. == Parsed Logical Plan ==
  6. Project [value#5]
  7. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  8. == Analyzed Logical Plan ==
  9. Project [value#5]
  10. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  11. == Optimized Logical Plan ==
  12. Project [value#5]
  13. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  14. == Physical Plan ==
  15. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查詢內存中表的入口
  16. Code Generation: false
  17. == RDD ==

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst裏的一個葉子結點,包含了要查詢的attributes,和InMemoryRelation(封裝了我們緩存的In-Columnar Storage數據結構)。 執行葉子節點,出發execute方法對內存數據進行查詢。 1、查詢時,調用InMemoryRelation,對其封裝的內存數據結構的每個分區進行操作。 2、獲取要請求的attributes,如上,查詢請求的是src表的value屬性。 3、根據目的查詢表達式,來獲取在對應存儲結構中,請求列的index索引。 4、通過ColumnAccessor來對每個buffer進行訪問,獲取對應查詢數據,並封裝為Row對象返回。

技術分享

[java] view plain copy
  1. private[sql] case class InMemoryColumnarTableScan(
  2. attributes: Seq[Attribute],
  3. relation: InMemoryRelation)
  4. extends LeafNode {
  5. override def output: Seq[Attribute] = attributes
  6. override def execute() = {
  7. relation.cachedColumnBuffers.mapPartitions { iterator =>
  8. // Find the ordinals of the requested columns. If none are requested, use the first.
  9. val requestedColumns = if (attributes.isEmpty) {
  10. Seq(0)
  11. } else {
  12. attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根據表達式exprId找出對應列的ByteBuffer的索引
  13. }
  14. iterator
  15. .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根據索引取得對應請求列的ByteBuffer,並封裝為ColumnAccessor。
  16. .flatMap { columnAccessors =>
  17. val nextRow = new GenericMutableRow(columnAccessors.length) //Row的長度
  18. new Iterator[Row] {
  19. override def next() = {
  20. var i = 0
  21. while (i < nextRow.length) {
  22. columnAccessors(i).extractTo(nextRow, i) //根據對應index和長度,從byterbuffer裏取得值,封裝到row裏
  23. i += 1
  24. }
  25. nextRow
  26. }
  27. override def hasNext = columnAccessors.head.hasNext
  28. }
  29. }
  30. }
  31. }
  32. }

查詢請求的列,如下:

[java] view plain copy
  1. scala> exe.optimizedPlan
  2. res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
  3. Project [value#5]
  4. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  5. scala> val relation = exe.optimizedPlan(1)
  6. relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
  7. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  8. scala> val request_relation = exe.executedPlan
  9. request_relation: org.apache.spark.sql.execution.SparkPlan =
  10. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))
  11. scala> request_relation.output //請求的列,我們請求的只有value列
  12. res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)
  13. scala> relation.output //默認保存在relation中的所有列
  14. res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)
  15. scala> val attributes = request_relation.output
  16. attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)



整個流程很簡潔,關鍵步驟是第三步。根據ExprId來查找到,請求列的索引 attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

[java] view plain copy
  1. //根據exprId找出對應ID
  2. scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
  3. attr_index: Seq[Int] = ArrayBuffer(1) //找到請求的列value的索引是1, 我們查詢就從Index為1的bytebuffer中,請求數據
  4. scala> relation.output.foreach(e=>println(e.exprId))
  5. ExprId(4) //對應<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>
  6. ExprId(5)
  7. scala> request_relation.output.foreach(e=>println(e.exprId))
  8. ExprId(5)

三、ColumnAccessor

ColumnAccessor對應每一種類型,類圖如下:

技術分享

最後返回一個新的叠代器:

[java] view plain copy
  1. new Iterator[Row] {
  2. override def next() = {
  3. var i = 0
  4. while (i < nextRow.length) { //請求列的長度
  5. columnAccessors(i).extractTo(nextRow, i)//調用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer
  6. i += 1
  7. }
  8. nextRow//返回解析後的row
  9. }
  10. override def hasNext = columnAccessors.head.hasNext
  11. }

四、總結

Spark SQL In-Memory Columnar Storage的查詢相對來說還是比較簡單的,其查詢思想主要和存儲的數據結構有關。

即存儲時,按每列放到一個bytebuffer,形成一個bytebuffer數組。

查詢時,根據請求列的exprId查找到上述數組的索引,然後使用ColumnAccessor對buffer中字段進行解析,最後封裝為Row對象,返回。

——EOF——

創文章,轉載請註明:

轉載自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/39577419

註:本文基於署名-非商業性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協議,歡迎轉載、轉發和評論,但是請保留本文作者署名和文章鏈接。如若需要用於商業目的或者與授權方面的協商,請聯系我。

技術分享

轉自:http://blog.csdn.net/oopsoom/article/details/39577419

第十篇:Spark SQL 源碼分析之 In-Memory Columnar Storage源碼分析之 query