1. 程式人生 > >SparkSQL程式設計指南之Java篇二-資料來源(上)

SparkSQL程式設計指南之Java篇二-資料來源(上)

Spark SQL通過DataFrame介面支援各種不同資料來源的操作。一個DataFrame可以進行相關的轉換操作,也可以用於建立臨時檢視。註冊DataFrame為一個臨時檢視可以允許你對其資料執行SQL查詢。本文首先會介紹使用Spark資料來源載入和儲存資料的一般方法,然後對內建資料來源進行詳細介紹。

1. 一般的Load/Save方法

Spark SQL最簡單的也是預設的資料來源格式是Parquet(除非使用了spark.sql.sources.default配置修改),它將會被用於所有的操作。以下是一般的Load/Save方法:

// generic load/save functions
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

* 如果是使用windows系統的話,需要把對應版本編譯的hadoop.dll複製到C:\Windows\System32,否則會遇到以下錯誤:

java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223)
    ......

1.1 手動指定選項

我們也可以通過完整的全名(例如:org.apache.spark.sql.parquet)來指定資料來源的型別,對於那些內建的資料來源型別,也可以使用簡稱,例如:json, parquet, jdbc, orc, libsvm, csv, text。從任何資料來源型別載入的DataFrames可以轉換為其它的型別格式,例如:

// manually load options
Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

1.2 直接執行SQL

我們也可以直接執行SQL查詢而不需要使用API載入檔案為DataFrame然後再查詢,例如:

// run SQL on files directly
Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
sqlDF.show();
// +------+--------------+----------------+
// |  name|favorite_color|favorite_numbers|
// +------+--------------+----------------+
// |Alyssa|          null|  [3, 9, 15, 20]|
// |   Ben|           red|              []|
// +------+--------------+----------------+

* 注意parquet.後面的路徑前後有個`的字元(與鍵盤~一起的那個逗點)

1.3 儲存模式

儲存操作可以選擇性地使用SaveMode指定如何處理存在的資料。需要注意的是這些儲存模式不使用任何鎖和不是原子操作的。此外,當使用Overwrite模式時,原資料會在寫入新資料之前就會被刪除。以下是SaveMode的選項,當儲存一個DataFrame到指定資料來源的時候,如果輸出路徑已經存在:

SaveMode.ErrorIfExists(預設)     丟擲異常
SaveMode.Append                      資料會以追加的方式儲存
SaveMode.Overwrite                   新資料會覆蓋原資料(先刪除原資料,再儲存新資料)
SaveMode.Ignore                        不儲存新資料,相當於SQL語句的CREATE TABLE IF NOT EXISTS

例如:

usersDF.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");

1.4 持久化到表

我們也可以使用saveAsTable方法把DataFrames儲存為持久化的表到Hive metastore。值得注意的是我們不需要部署Hive的環境,Spark會建立一個預設本地的Hive metastore(使用Derby)。與createOrReplaceTempView方法不同,saveAsTable會實質化DataFrame的內容,然後在Hive metastore建立它的指標。只要連線到相同metastore的連線不中斷,即使Spark程式重新啟動,持久化的表也會一直存在。

預設地saveAsTable方法將建立一個“管理表”(managed table),表示資料的位置是由metastore來控制管理的。當持久化的表被刪除時,managed table將會自動刪除相應的資料。

2. Parquet檔案

Parquet是一種被其它多種資料處理系統支援的縱列格式。Spark SQL提供了讀寫Parquet檔案的功能,儲存的Parquet檔案會自動保留原始資料的schema。當儲存Parquet檔案時,基於相容性考慮,所有的列會被自動轉換為允許空值。

2.1 以程式設計方式載入資料

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(row -> "Name: " + row.getString(0), Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

2.2 分割槽推斷

對錶進行分割槽是對資料進行優化的方式之一。在一個分割槽的表內,資料通常是通過分割槽列將資料儲存在不同的目錄裡面。Parquet資料來源現在能夠自動地發現並推斷分割槽資訊。例如,可以使用下面的目錄結構儲存人口資料到分割槽表裡面,分割槽列為gender和country:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通過傳遞path/to/table給SparkSession.read.parquet或SparkSession.read.load,Spark SQL將自動抽取分割槽資訊。返回的DataFrame的Schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

需要注意的是,分割槽列的資料型別是自動解析的。目前,數值型別和字串型別是支援的。如果不想分割槽列的資料型別被自動解析,可以通過配置spark.sql.sources.partitionColumnTypeInference.enabled=false,預設是true。當該配置被設為false時,分割槽列資料型別會使用string型別。

從Spark 1.6.0版本開始,預設地,分割槽資訊解析只會作用於指定路徑下面的分割槽。例如上面的例子,如果使用者傳遞path/to/table/gender=male給SparkSession.read.parquet或SparkSession.read.load,gender將不會是分割槽列。如果使用者需要指定基礎的路徑作為分割槽資訊解析的開始路徑,那麼可以在資料來源選項設定basePath。例如,path/to/table/gender=male是資料的路徑,使用者設定了basePath=path/to/table/,那麼gender將會是分割槽列。

2.3 Schema合併

像ProtocolBuffer、Avro和Thrift一樣,Parquet也支援schema evolution(schema演變)。使用者可以先定義一個簡單的schema,然後根據需要逐漸地向schema中增加列。通過這種方式,使用者可以有多個不同的schemas但它們是互相相容的Parquet檔案。Parquet資料來源現在能夠自動檢測這種情況併合並這些檔案的schemas。

因為Schema合併是一個相對高消耗的操作,在大多數的情況下並不需要,所以從Spark SQL 1.5.0版本開始,預設關閉了該功能。可以通過下面兩種方式開啟:
  • 當讀取Parquet檔案時,設定資料來源選項mergeSchema=true(例如下面的例子)
  • 設定全域性SQL選項spark.sql.parquet.mergeSchema=true
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
  private int value;
  private int square;

  // Getters and setters...

}

public static class Cube implements Serializable {
  private int value;
  private int cube;

  // Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
  Square square = new Square();
  square.setValue(value);
  square.setSquare(value * value);
  squares.add(square);
}

// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
  Cube cube = new Cube();
  cube.setValue(value);
  cube.setCube(value * value * value);
  cubes.add(cube);
}

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

2.4 Hive metastore Parquet錶轉換

當讀寫Hive metastore Parquet表時,基於效能考慮,Spark SQL會先嚐試使用自帶的Parquet SerDe(序列化與反序列化,Serialize/Deserilize的簡稱),而不是Hive的SerDe。這個優化選項可以通過spark.sql.hive.convertMetastoreParquet配置,預設為開啟。

2.4.1 Hive/Parquet Schema一致化

從表schema處理的角度來看,Hive和Parquet有2個主要的不同點:
  • Hive不區分大小寫,Parquet區分大小寫
  • Hive認為所有的列都可以為空,而Parquet的空值性是有重要意義的(Hive considers all columns nullable, while nullability in Parquet is significant)
由於以上不同點,當把Hive metastore Parquet錶轉換為Spark SQL Parquet表時,必須將Hive metastore schema和Parquet schema進行一致化。其規則如下:
  • 兩個schema中,忽略空值性,具有相同名字的欄位必須具有相同的資料型別。一致化後的欄位型別應該與Parquet的欄位型別一致,以便遵守空值性原則
  • 一致化後的schema只包含在Hive metastore schema定義的欄位:
             i. 丟棄只在Parquet schema定義的欄位
             i. 把只在Hive metastore schema定義的欄位設為允許為空

2.4.2 元資料重新整理

為了提高效能,Spark SQL會快取Parquet元資料(metadata)。當Hive metastore Parquet錶轉換的選項開啟時,轉換後的表元資料也會被快取。如果這些表被Hive或者其它外部工具更新,則需要手動重新整理快取以確保元資料的一致性。

// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");

2.5 配置

Parquet的配置可以使用SparkSession的setConf方法或者使用SQL執行SET key=value命令。詳細的配置引數如下:



3. JSON Datasets

Spark SQL可以自動推斷JSON資料集的schema並載入為Dataset<Row>。此轉換可以使用SparkSession.read().json()方法讀取一個String型別的RDD或者一個JSON檔案。需要注意的是,這裡的JSON檔案不是典型的JSON格式。這裡的JSON檔案每一行必須包含一個獨立有效的JSON物件,也稱為換行符分割JSON檔案。因此,一個規則的多行JSON檔案會導致讀取出錯。讀取JSON資料集例子如下:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
        new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

* 參考Spark SQL官方連結:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources

TO BE CONTINUED...O(∩_∩)O