1. 程式人生 > >Spark SQL 官方文件-中文翻譯

Spark SQL 官方文件-中文翻譯

1 概述(Overview)

Spark SQL是Spark的一個元件,用於結構化資料的計算。Spark SQL提供了一個稱為DataFrames的程式設計抽象,DataFrames可以充當分散式SQL查詢引擎。

2 DataFrames

DataFrame是一個分散式的資料集合,該資料集合以命名列的方式進行整合。DataFrame可以理解為關係資料庫中的一張表,也可以理解為R/Python中的一個data frame。DataFrames可以通過多種資料構造,例如:結構化的資料檔案、hive中的表、外部資料庫、Spark計算過程中生成的RDD等。
DataFrame的API支援4種語言:Scala、Java、Python、R。

2.1 入口:SQLContext(Starting Point: SQLContext)

Spark SQL程式的主入口是SQLContext類或它的子類。建立一個基本的SQLContext,你只需要SparkContext,建立程式碼示例如下:

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

除了基本的SQLContext,也可以建立HiveContext。SQLContext和HiveContext區別與聯絡為:

  • SQLContext現在只支援SQL語法解析器(SQL-92語法)
  • HiveContext現在支援SQL語法解析器和HiveSQL語法解析器,預設為HiveSQL語法解析器,使用者可以通過配置切換成SQL語法解析器,來執行HiveSQL不支援的語法。
  • 使用HiveContext可以使用Hive的UDF,讀寫Hive表資料等Hive操作。SQLContext不可以對Hive進行操作。
  • Spark SQL未來的版本會不斷豐富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最終可能兩者會統一成一個Context

HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,可以在部署基本的Spark的時候就不需要Hive的依賴包,需要使用HiveContext時再把Hive的各種依賴包加進來。

SQL的解析器可以通過配置spark.sql.dialect引數進行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中預設解析器為”hiveql“,也支援”sql“解析器。

2.2 建立DataFrames(Creating DataFrames)

使用SQLContext,spark應用程式(Application)可以通過RDD、Hive表、JSON格式資料等資料來源建立DataFrames。下面是基於JSON檔案建立DataFrame的示例:

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();

2.3 DataFrame操作(DataFrame Operations)

DataFrames支援Scala、Java和Python的操作介面。下面是Scala和Java的幾個操作示例:

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
  • Java
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

除了簡單列引用和表示式,DataFrames還有豐富的library,功能包括string操作、date操作、常見數學操作等。詳細內容請參考 DataFrame Function Reference

2.4 執行SQL查詢程式(Running SQL Queries Programmatically)

Spark Application可以使用SQLContext的sql()方法執行SQL查詢操作,sql()方法返回的查詢結果為DataFrame格式。程式碼如下:

  • Scala
val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
  • Java
SQLContext sqlContext = ...  // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")

2.5 DataFrames與RDDs的相互轉換(Interoperating with RDDs)

Spark SQL支援兩種RDDs轉換為DataFrames的方式:

  • 使用反射獲取RDD內的Schema
    • 當已知類的Schema的時候,使用這種基於反射的方法會讓程式碼更加簡潔而且效果也很好。
  • 通過程式設計介面指定Schema
    • 通過Spark SQL的介面建立RDD的Schema,這種方式會讓程式碼比較冗長。
    • 這種方法的好處是,在執行時才知道資料的列以及列的型別的情況下,可以動態生成Schema

2.5.1 使用反射獲取Schema(Inferring the Schema Using Reflection)

Spark SQL支援將JavaBean的RDD自動轉換成DataFrame。通過反射獲取Bean的基本資訊,依據Bean的資訊定義Schema。當前Spark SQL版本(Spark 1.5.2)不支援巢狀的JavaBeans和複雜資料型別(如:List、Array)。建立一個實現Serializable介面包含所有屬性getters和setters的類來建立一個JavaBean。通過呼叫createDataFrame並提供JavaBean的Class object,指定一個Schema給一個RDD。示例如下:

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  publicvoidsetName(String name) {
    this.name = name;
  }

  publicintgetAge() {
    return age;
  }

  publicvoidsetAge(int age) {
    this.age = age;
  }
}
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

2.5.2 通過程式設計介面指定Schema(Programmatically Specifying the Schema)

當JavaBean不能被預先定義的時候,程式設計建立DataFrame分為三步:

  • 從原來的RDD建立一個Row格式的RDD
  • 建立與RDD中Rows結構匹配的StructType,通過該StructType建立表示RDD的Schema
  • 通過SQLContext提供的createDataFrame方法建立DataFrame,方法引數為RDD的Schema

示例如下:

import org.apache.spark.api.java.function.Function;
// Import factory methods provided by DataTypes.
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
  fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
  new Function<String, Row>() {
    public Row call(String record) throws Exception {
      String[] fields = record.split(",");
      return RowFactory.create(fields[0], fields[1].trim());
    }
  });

// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

3 資料來源(Data Source)

Spark SQL的DataFrame介面支援多種資料來源的操作。一個DataFrame可以進行RDDs方式的操作,也可以被註冊為臨時表。把DataFrame註冊為臨時表之後,就可以對該DataFrame執行SQL查詢。Data Sources這部分首先描述了對Spark的資料來源執行載入和儲存的常用方法,然後對內建資料來源進行深入介紹。

3.1 一般Load/Save方法

Spark SQL的預設資料來源為Parquet格式。資料來源為Parquet檔案時,Spark SQL可以方便的執行所有的操作。修改配置項spark.sql.sources.default,可修改預設資料來源格式。讀取Parquet檔案示例如下:

  • Scala
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • Java
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

3.1.1 手動指定選項(Manually Specifying Options)

當資料來源格式不是parquet格式檔案時,需要手動指定資料來源的格式。資料來源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果資料來源格式為內建格式,則只需要指定簡稱(json,parquet,jdbc)。通過指定的資料來源格式名,可以對DataFrames進行型別轉換操作。示例如下:

  • Scala
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • Java
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

3.1.2 儲存模式(Save Modes)

可以採用SaveMode執行儲存操作,SaveMode定義了對資料的處理模式。需要注意的是,這些儲存模式不使用任何鎖定,不是原子操作。此外,當使用Overwrite方式執行時,在輸出新資料之前原資料就已經被刪除。SaveMode詳細介紹如下表:

SaveModes

3.1.3 持久化到表(Saving to Persistent Tables)

當使用HiveContext時,可以通過saveAsTable方法將DataFrames儲存到表中。與registerTempTable方法不同的是,saveAsTable將DataFrame中的內容持久化到表中,並在HiveMetastore中儲存元資料。儲存一個DataFrame,可以使用SQLContext的table方法。table先建立一個表,方法引數為要建立的表的表名,然後將DataFrame持久化到這個表中。

預設的saveAsTable方法將建立一個“managed table”,表示資料的位置可以通過metastore獲得。當儲存資料的表被刪除時,managed table也將自動刪除。

3.2 Parquet檔案

Parquet是一種支援多種資料處理系統的柱狀的資料格式,Parquet檔案中保留了原始資料的模式。Spark SQL提供了Parquet檔案的讀寫功能。

3.2.1 讀取Parquet檔案(Loading Data Programmatically)

讀取Parquet檔案示例如下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • Java
// sqlContext from the previous example is used in this example.

DataFrame schemaPeople = ... // The DataFrame from the previous example.

// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

3.2.2 解析分割槽資訊(Partition Discovery)

對錶進行分割槽是對資料進行優化的方式之一。在分割槽的表內,資料通過分割槽列將資料儲存在不同的目錄下。Parquet資料來源現在能夠自動發現並解析分割槽資訊。例如,對人口資料進行分割槽儲存,分割槽列為gender和country,使用下面的目錄結構:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通過傳遞path/to/table給 SQLContext.read.parquet或SQLContext.read.load,Spark SQL將自動解析分割槽資訊。返回的DataFrame的Schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

需要注意的是,資料的分割槽列的資料型別是自動解析的。當前,支援數值型別和字串型別。自動解析分割槽型別的引數為:spark.sql.sources.partitionColumnTypeInference.enabled,預設值為true。如果想關閉該功能,直接將該引數設定為disabled。此時,分割槽列資料格式將被預設設定為string型別,不再進行型別解析。

3.2.3 Schema合併(Schema Merging)

像ProtocolBuffer、Avro和Thrift那樣,Parquet也支援Schema evolution(Schema演變)。使用者可以先定義一個簡單的Schema,然後逐漸的向Schema中增加列描述。通過這種方式,使用者可以獲取多個有不同Schema但相互相容的Parquet檔案。現在Parquet資料來源能自動檢測這種情況,併合並這些檔案的schemas。

因為Schema合併是一個高消耗的操作,在大多數情況下並不需要,所以Spark SQL從1.5.0開始預設關閉了該功能。可以通過下面兩種方式開啟該功能:

  • 當資料來源為Parquet檔案時,將資料來源選項mergeSchema設定為true
  • 設定全域性SQL選項spark.sql.parquet.mergeSchema為true

示例如下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

3.2.4 Hive metastore Parquet錶轉換(Hive metastore Parquet table conversion)

當向Hive metastore中讀寫Parquet表時,Spark SQL將使用Spark SQL自帶的Parquet SerDe(SerDe:Serialize/Deserilize的簡稱,目的是用於序列化和反序列化),而不是用Hive的SerDe,Spark SQL自帶的SerDe擁有更好的效能。這個優化的配置引數為spark.sql.hive.convertMetastoreParquet,預設值為開啟。

3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)

從表Schema處理的角度對比Hive和Parquet,有兩個區別:

  • Hive區分大小寫,Parquet不區分大小寫
  • hive允許所有的列為空,而Parquet不允許所有的列全為空

由於這兩個區別,當將Hive metastore Parquet錶轉換為Spark SQL Parquet表時,需要將Hive metastore schema和Parquet schema進行一致化。一致化規則如下:

  • 這兩個schema中的同名欄位必須具有相同的資料型別。一致化後的欄位必須為Parquet的欄位型別。這個規則同時也解決了空值的問題。
  • 一致化後的schema只包含Hive metastore中出現的欄位。
    • 忽略只出現在Parquet schema中的欄位
    • 只在Hive metastore schema中出現的欄位設為nullable欄位,並加到一致化後的schema中

3.2.4.2 元資料重新整理(Metadata Refreshing)

Spark SQL快取了Parquet元資料以達到良好的效能。當Hive metastore Parquet錶轉換為enabled時,表修改後快取的元資料並不能重新整理。所以,當表被Hive或其它工具修改時,則必須手動重新整理元資料,以保證元資料的一致性。示例如下:

  • Scala
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
  • Java
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

3.2.5 配置(Configuration)

配置Parquet可以使用SQLContext的setConf方法或使用SQL執行SET key=value命令。詳細引數說明如下:

Configuration

3.3 JSON資料集

Spark SQL能自動解析JSON資料集的Schema,讀取JSON資料集為Da