1. 程式人生 > >Spark SQL筆記整理(二):DataFrame編程模型與操作案例

Spark SQL筆記整理(二):DataFrame編程模型與操作案例

代碼 最重要的 ssi func nbu 產生 michael array image

DataFrame原理與解析

Spark SQL和DataFrame

1、Spark SQL是Spark中的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數據。

2、DataFrame

就易用性而言,對比傳統的MapReduce API,說Spark的RDD API有了數量級的飛躍並不為過。然而,對於沒有MapReduce和函數式編程經驗的新手來說,RDD API仍然存在著一定的門檻。另一方面,數據科學家們所熟悉的R、Pandas等傳統數據框架雖然提供了直觀的API,卻局限於單機處理,無法勝任大數據場景。為了解決這一矛盾,Spark SQL 原有SchemaRDD的基礎上提供了與R和Pandas風格類似的DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發者的學習門檻,同時還支持Scala、Java與Python三種語言。更重要的是,由於脫胎自SchemaRDD,DataFrame天然適用於分布式大數據場景。

DataFrame原理解析

在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背後的數據源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

技術分享圖片

DataFrame基本操作案例

Spark SQLContext

要使用Spark SQL,首先就得創建一個創建一個SQLContext對象,或者是它的子類的對象,比如HiveContext的對象。

Java版本:

JavaSparkContext sc = ...; 
SQLContext sqlContext = new SQLContext(sc);

Scala版本:

val sc: SparkContext = ... 
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

Spark HiveContext

1、除了基本的SQLContext以外,還可以使用它的子類——HiveContext。HiveContext的功能除了包含SQLContext提供的所有功能之外,還包括了額外的專門針對Hive的一些功能。這些額外功能包括:使用HiveQL語法來編寫和執行SQL,使用Hive中的UDF函數,從Hive表中讀取數據。

2、要使用HiveContext,就必須預先安裝好Hive,SQLContext支持的數據源,HiveContext也同樣支持——而不只是支持Hive。對於Spark 1.3.x以上的版本,都推薦使用HiveContext,因為其功能更加豐富和完善。

3、Spark SQL還支持用spark.sql.dialect參數設置SQL的方言。使用SQLContext的setConf()即可進行設置。對於SQLContext,它只支持“sql”一種方言。對於HiveContext,它默認的方言是“hiveql”。

創建DataFrame

使用SQLContext或者HiveContext,可以從RDD、Hive、ZMQ、Kafka和RabbitMQ等或者其他數據源,來創建一個DataFrame。我們來舉例使用JSON文件為例創建一個DataFrame。

Java版本:

JavaSparkContext sc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json("hdfs://ns1/spark/sql/person.json");
df.show();

Scala版本:

val sc: SparkContext = new SparkContext();
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json(" hdfs://ns1/spark/sql/person.json")
df.show()

案例

json數據如下:

{"name":"Michael", "age":10, "height": 168.8}
{"name":"Andy", "age":30, "height": 168.8}
{"name":"Justin", "age":19, "height": 169.8}
{"name":"Jack", "age":32, "height": 188.8}
{"name":"John", "age":10, "height": 158.8}
{"name":"Domu", "age":19, "height": 179.8}
{"name":"袁帥", "age":13,  "height": 179.8}
{"name":"殷傑", "age":30, "height": 175.8}
{"name":"孫瑞", "age":19, "height": 179.9}

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkSQL基礎操作學習
  * 操作SparkSQL的核心就是DataFrame,DataFrame帶了一張內存中的二維表,包括元數據信息和表數據
  */
object _01SparkSQLOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val df:DataFrame = sqlContext.read.json("D:/data/spark/sql/people.json")
        // 1.打印DF中所有的記錄
        println("1.打印DF中所有的記錄")
        df.show()   // 默認的輸出表中數據的操作,相當於db中select * from t limit 20

        // 2.打印出DF中所有的schema信息
        println("2.打印出DF中所有的schema信息")
        df.printSchema()

        // 3.查詢出name的列並打印出來 select name from t
        // df.select("name").show()
        println("3.查詢出name的列並打印出來")
        df.select(new Column("name")).show()

        // 4.過濾並打印出年齡超過14歲的人
        println("4.過濾並打印出年齡超過14歲的人")
        df.select(new Column("name"), new Column("age")).where("age>14").show()

        // 5.給每個人的年齡都加上10歲
        println("5.給每個人的年齡都加上10歲")
        df.select(new Column("name"), new Column("age").+(10).as("10年後的年齡")).show()

        // 6.按照身高進行分組
        println("6.按照身高進行分組")   // select height, count(1) from t group by height;
        df.select(new Column("height")).groupBy(new Column("height")).count().show()

        // 註冊表
        df.registerTempTable("people")
        // 執行sql操作
        var sql = "select height, count(1) from people group by height"
        sqlContext.sql(sql).show()

        sc.stop()

    }
}

輸出結果如下:

1.打印DF中所有的記錄
18/05/08 16:06:09 INFO FileInputFormat: Total input paths to process : 1
+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10| 168.8|Michael|
| 30| 168.8|   Andy|
| 19| 169.8| Justin|
| 32| 188.8|   Jack|
| 10| 158.8|   John|
| 19| 179.8|   Domu|
| 13| 179.8|     袁帥|
| 30| 175.8|     殷傑|
| 19| 179.9|     孫瑞|
+---+------+-------+

2.打印出DF中所有的schema信息
root
 |-- age: long (nullable = true)
 |-- height: double (nullable = true)
 |-- name: string (nullable = true)

3.查詢出name的列並打印出來
18/05/08 16:06:10 INFO FileInputFormat: Total input paths to process : 1
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|   Jack|
|   John|
|   Domu|
|     袁帥|
|     殷傑|
|     孫瑞|
+-------+

4.過濾並打印出年齡超過14歲的人
18/05/08 16:06:10 INFO FileInputFormat: Total input paths to process : 1
+------+---+
|  name|age|
+------+---+
|  Andy| 30|
|Justin| 19|
|  Jack| 32|
|  Domu| 19|
|    殷傑| 30|
|    孫瑞| 19|
+------+---+

5.給每個人的年齡都加上10歲
18/05/08 16:06:10 INFO FileInputFormat: Total input paths to process : 1
+-------+-------+
|   name|10年後的年齡|
+-------+-------+
|Michael|     20|
|   Andy|     40|
| Justin|     29|
|   Jack|     42|
|   John|     20|
|   Domu|     29|
|     袁帥|     23|
|     殷傑|     40|
|     孫瑞|     29|
+-------+-------+

6.按照身高進行分組
18/05/08 16:06:11 INFO FileInputFormat: Total input paths to process : 1
+------+-----+
|height|count|
+------+-----+
| 179.9|    1|
| 188.8|    1|
| 158.8|    1|
| 179.8|    2|
| 169.8|    1|
| 168.8|    2|
| 175.8|    1|
+------+-----+

18/05/08 16:06:14 INFO FileInputFormat: Total input paths to process : 1
+------+---+
|height|_c1|
+------+---+
| 179.9|  1|
| 188.8|  1|
| 158.8|  1|
| 179.8|  2|
| 169.8|  1|
| 168.8|  2|
| 175.8|  1|
+------+---+

DataFrame與RDD之間的轉化案例與解析(Java、Scala)

相關使用數據

下面涉及的測試代碼中,需要使用到的源數據sql-rdd-source.txt,如下:

1, zhangsan, 13, 175
2, lisi, 14, 180
3, wangwu, 15, 175
4, zhaoliu, 16, 195
5, zhouqi, 17, 165
6, weiba, 18, 155

使用到的Person類,代碼如下:

public class Person {
    private int id;
    private String name;
    private int age;
    private double height;

    public Person() {
    }

    public Person(int id, String name, int age, double height) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.height = height;
    }
}

利用反射機制將RDD轉為DataFrame

1、一個問題就擺在大家的面前:為什麽要將RDD轉換為DataFrame呀?

主要是能使用Spark SQL進行SQL查詢了。這個功能是無比強大的。

2、是使用反射機制推斷包含了特定數據類型的RDD的元數據。這種基於反射的方式,代碼比較簡潔,事前知道要定義的POJO的元數據信息,當你已經知道你的RDD的元數據時,是一種非常不錯的方式。

使用反射機制推斷元數據

1、Java版本:

Spark SQL是支持將包含了POJO的RDD轉換為DataFrame的。POJO的信息,就定義了元數據。Spark SQL現在是不支持將包含了嵌套POJO或者List等復雜數據的POJO,作為元數據的。只支持一個包含簡單數據類型的field的POJO。

2、Scala版本:

而Scala由於其具有隱式轉換的特性,所以Spark SQL的Scala接口,是支持自動將包含了case class的RDD轉換為DataFrame的。case class就定義了元數據。Spark SQL會通過反射讀取傳遞給case class的參數的名稱,然後將其作為列名。

不同點:

3、與Java不同的是,Spark SQL是支持將包含了嵌套數據結構的case class作為元數據的,比如包含了Array等。

Scala版

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p1

import java.util
import java.util.{Arrays, List}

import cn.xpleaf.bigdata.spark.java.sql.p1.Person
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}

/**
  * SparkRDD與DataFrame之間的轉換操作
  * 1.通過反射的方式,將RDD轉換為DataFrame
  * 2.通過動態編程的方式將RDD轉換為DataFrame
  * 這裏演示的是第1種
  */
object _02SparkRDD2DataFrame {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)
        // 使用kryo的序列化方式
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        conf.registerKryoClasses(Array(classOf[Person]))

        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val lines = sc.textFile("D:/data/spark/sql/sql-rdd-source.txt")
        val personRDD:RDD[Person] = lines.map(line => {
            val fields = line.split(",")
            val id = fields(0).trim.toInt
            val name = fields(1).trim
            val age = fields(2).trim.toInt
            val height = fields(3).trim.toDouble
            new Person(id, name, age, height)
        })

        val persons: util.List[Person] = util.Arrays.asList(
            new Person(1, "孫人才", 25, 179),
            new Person(2, "劉銀鵬", 22, 176),
            new Person(3, "郭少波", 27, 178),
            new Person(1, "齊彥鵬", 24, 175))

//        val df:DataFrame = sqlContext.createDataFrame(persons, classOf[Person])   // 這種方式也可以
        val df:DataFrame = sqlContext.createDataFrame(personRDD, classOf[Person])

        df.show()

        sc.stop()

    }
}

輸出結果如下:

+---+------+---+--------+
|age|height| id|    name|
+---+------+---+--------+
| 13| 175.0|  1|zhangsan|
| 14| 180.0|  2|    lisi|
| 15| 175.0|  3|  wangwu|
| 16| 195.0|  4| zhaoliu|
| 17| 165.0|  5|  zhouqi|
| 18| 155.0|  6|   weiba|
+---+------+---+--------+

Java版

測試代碼如下:

package cn.xpleaf.bigdata.spark.java.sql.p1;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.Arrays;
import java.util.List;

/**
 * SparkRDD與DataFrame之間的轉換操作
 * 1.通過反射的方式,將RDD轉換為DataFrame
 * 2.通過動態編程的方式將RDD轉換為DataFrame
 * 這裏演示的是第1種
 */
public class _01SparkRDD2DataFrame {
    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName(_01SparkRDD2DataFrame.class.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(new Class[]{Person.class});
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);
        List<Person> persons = Arrays.asList(
                new Person(1, "孫人才", 25, 179),
                new Person(2, "劉銀鵬", 22, 176),
                new Person(3, "郭少波", 27, 178),
                new Person(1, "齊彥鵬", 24, 175)
        );

        DataFrame df = sqlContext.createDataFrame(persons, Person.class);   // 構造方法有多個,使用personsRDD的方法也是可以的

        // where age > 23 and height > 176
        df.select(new Column("id"),
                  new Column("name"),
                  new Column("age"),
                  new Column("height"))
                .where(new Column("age").gt(23).and(new Column("height").lt(179)))
                .show();

        df.registerTempTable("person");

        sqlContext.sql("select * from person where age > 23 and height < 179").show();

        jsc.close();

    }
}

輸出結果如下:

+---+----+---+------+
| id|name|age|height|
+---+----+---+------+
|  3| 郭少波| 27| 178.0|
|  1| 齊彥鵬| 24| 175.0|
+---+----+---+------+

+---+------+---+----+
|age|height| id|name|
+---+------+---+----+
| 27| 178.0|  3| 郭少波|
| 24| 175.0|  1| 齊彥鵬|
+---+------+---+----+

使用編程的方式將RDD轉換為DataFrame

1、通過編程接口來創建DataFrame,在Spark程序運行階段創建並保持一份最新的元數據信息,然後將此元數據信息應用到RDD上。

2、優點在於編寫程序時根本就不知道元數據的定義和內容,只有在運行的時候才有元數據的數據。這種方式是在動態的時候進行動態構建元數據方式。

Scala版

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p1

import cn.xpleaf.bigdata.spark.java.sql.p1.Person
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkRDD與DataFrame之間的轉換操作
  * 1.通過反射的方式,將RDD轉換為DataFrame
  * 2.通過動態編程的方式將RDD轉換為DataFrame
  * 這裏演示的是第2種
  */
object _03SparkRDD2DataFrame {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)
        // 使用kryo的序列化方式
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        conf.registerKryoClasses(Array(classOf[Person]))

        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val lines = sc.textFile("D:/data/spark/sql/sql-rdd-source.txt")
        val rowRDD:RDD[Row] = lines.map(line => {
            val fields = line.split(",")
            val id = fields(0).trim.toInt
            val name = fields(1).trim
            val age = fields(2).trim.toInt
            val height = fields(3).trim.toDouble
            Row(id, name, age, height)
        })

        val scheme = StructType(List(
            StructField("id", DataTypes.IntegerType, false),
            StructField("name", DataTypes.StringType, false),
            StructField("age", DataTypes.IntegerType, false),
            StructField("height", DataTypes.DoubleType, false)
        ))

        val df = sqlContext.createDataFrame(rowRDD, scheme)

        df.registerTempTable("person")
        sqlContext.sql("select max(age) as max_age, min(age) as min_age from person").show()

        sc.stop()

    }
}

輸出結果如下:

+-------+-------+
|max_age|min_age|
+-------+-------+
|     18|     13|
+-------+-------+

Java版

測試代碼如下:

package cn.xpleaf.bigdata.spark.java.sql.p1;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class _02SparkRDD2DataFrame {
    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName(_02SparkRDD2DataFrame.class.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(new Class[]{Person.class});
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);
        List<Person> persons = Arrays.asList(
                new Person(1, "孫人才", 25, 179),
                new Person(2, "劉銀鵬", 22, 176),
                new Person(3, "郭少波", 27, 178),
                new Person(1, "齊彥鵬", 24, 175)
        );

        Stream<Row> rowStream = persons.stream().map(new Function<Person, Row>() {
            @Override
            public Row apply(Person person) {
                return RowFactory.create(person.getId(), person.getName(), person.getAge(), person.getHeight());
            }
        });

        List<Row> rows = rowStream.collect(Collectors.toList());

        StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("name", DataTypes.StringType, false, Metadata.empty()),
                new StructField("age", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("height", DataTypes.DoubleType, false, Metadata.empty())
        });

        DataFrame df = sqlContext.createDataFrame(rows, schema);

        df.registerTempTable("person");

        sqlContext.sql("select * from person where age > 23 and height < 179").show();

        jsc.close();

    }
}

輸出結果如下:

+---+----+---+------+
| id|name|age|height|
+---+----+---+------+
|  3| 郭少波| 27| 178.0|
|  1| 齊彥鵬| 24| 175.0|
+---+----+---+------+

緩存表(列式存儲)案例與解析

緩存和列式存儲

Spark SQL 可以將數據緩存到內存中,我們可以見到的通過調用cache table tableName即可將一張表緩存到內存中,來極大的提高查詢效率。

sqlContext.cacheTable(tableName)

這就涉及到內存中的數據的存儲形式,我們知道基於關系型的數據可以存儲為基於行存儲結構或者基於列存儲結構,或者基於行和列的混合存儲,即Row Based Storage、Column Based Storage、 PAX Storage。

Spark SQL 的內存數據是如何組織的?

Spark SQL 將數據加載到內存是以列的存儲結構。稱為In-Memory Columnar Storage。

若直接存儲Java Object 會產生很大的內存開銷,並且這樣是基於Row的存儲結構。查詢某些列速度略慢,雖然數據以及載入內存,查詢效率還是低於面向列的存儲結構。

基於Row的Java Object存儲

內存開銷大,且容易FULL GC,按列查詢比較慢。

技術分享圖片

基於Column的ByteBuffer存儲(Spark SQL)

內存開銷小,按列查詢速度較快。

技術分享圖片

In-Memory Columnar Storage代碼分布

Spark SQL的In-Memory Columnar Storage是位於spark列下面org.apache.spark.sql.columnar包內:

核心的類有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.

如果列有壓縮的情況:compression包下面有具體的build列和access列的類。

技術分享圖片

性能調優

對於某些工作負載,可以在通過在內存中緩存數據或者打開一些實驗選項來提高性能。

在內存中緩存數據

Spark SQL可以通過調用sqlContext.cacheTable("tableName")方法來緩存使用柱狀格式的表。然後,Spark將會僅僅瀏覽需要的列並且自動地壓縮數據以減少內存的使用以及垃圾回收的 壓力。你可以通過調用sqlContext.uncacheTable("tableName")方法在內存中刪除表。

註意,如果你調用schemaRDD.cache()而不是sqlContext.cacheTable(...),表將不會用柱狀格式來緩存。在這種情況下,sqlContext.cacheTable(...)是強烈推薦的用法。

可以在SQLContext上使用setConf方法或者在用SQL時運行SET key=value命令來配置內存緩存。

技術分享圖片

技術分享圖片

DataFrame常用API

1、collect 和 collectAsList   將df中的數據轉化成Array和List
2、count         統計df中的總記錄數
3、first         獲取df中的第一條記錄,數據類型為Row
4、head          獲取df的前幾條記錄
5、show
6、take          獲取df中的前幾條記錄
7、cache         對df進行緩存
8、columns       顯示所有的列的schema列名,類型為Array[String]
9、dtypes        顯示所有的列的schema信息,類型為Array[(String, String)]
10、explain      顯示當前df的執行計劃
11、isLocal         當前spark sql的執行是否為本地,true為真,false為非本地
12、printSchema
13、registerTempTable
14、schema
15、toDF :備註:toDF帶有參數時,參數個數必須和調用這DataFrame的列個數據是一樣的
          類似於sql中的:toDF:insert into t select * from t1;
16、intersect:返回兩個DataFrame相同的Rows



原文鏈接:http://blog.51cto.com/xpleaf/2114298

Spark SQL筆記整理(二):DataFrame編程模型與操作案例