1. 程式人生 > >【Spark】Spark SQL, DataFrames and Datasets Guide(翻譯文,持續更新)

【Spark】Spark SQL, DataFrames and Datasets Guide(翻譯文,持續更新)

本文主要是翻譯Spark官網Spark SQL programming guide 。只能保證大概意思,儘量保證細節。英文水平有限,如果有錯誤的地方請指正,輕噴。目錄導航在右上角

概述

  Spark SQL 是一個結構化資料處理的 Spark 模組 。 與基礎的 Spark RDD API 不同的是, Spark SQL 所提供的介面為 Spark 提供了 更多關於資料結構和正在執行的計算結構的資訊。 Spark 在其內部利用這些額外的資訊去做更多的優化。有幾種用於和 Sparrk SQL

互動的方法,包括 SQL 和 Dataset API。 當你計算一個結果, 會使用同一個執行引擎, 這獨立於你所用來描述這個演算法的API和語言。這種一致性意味著開發者可以輕易地在不同的 API 中來回切換, 因為它為表達給定的轉換提供了最自然的方式。

  本頁所有示例使用了 Spark 提供的樣例資料並且可以在 spark-shell 、pyspark shell 或者 sparkR shell 中執行。

SQL

  Spark SQL 的一個用處是執行 SQL 查詢。 Spark SQL 同樣可以用來從 現有的 HIVE 中讀取資料。 更多有關配置這個特性的資訊,請查閱 HIVE Tables 部分。當你使用其他語言執行SQL時,將會返回一個 Dataset 或者 DataFrame 作為結果。你同樣可以使用命令列或者 JDBC/ODBC 與 SQL 介面進行互動。

Dataset 和 Dataframe

  Dataset 是一種分散式資料集,是 Spark1.6 新增的介面。它提供了RDD(強型別,可以使用強大的 lambda 表示式)的優點,並受益於Spark SQL 的優化執行引擎。Dataset 可以通過 JVM 構建,然後使用轉換方法(map, flatMap, filter等等)進行操作。 Dataset API 

在 Java 和 Scala 中可用。 Python 並不支援Dataset API。但是由於Python的動態特性, Dataset API 的很多優勢都是可用的(比如你可以自然地使用名稱 row.columnName 來訪問 row 的域 )。 R 語言的情況類似。

  DataFrame 是一種按列命名組織的 Dataset, 它在概念上等價於關係型資料庫的一個表或者 R/Python 的一個數據幀, 但是它(DataFrame)的底層做了更多的優化。DataFrame 可以通過大量的資料來源構建,例如:結構化的資料檔案, HIVE 的表, 資料庫,或現有的RDD。Java、Python、Scala、R語言都支援 DataFrame API。 在 Scala 和 Java, DataFrame 由Dataset的 rowS 表示。 在 Scala API 中,DataFrame 可以簡單地認為是 Dataset[Row] 的別名。 然而,在 Java API 中, 使用者需要使用 Dataset<Row> 來表示 DataFrame。

  在整個文件中, 我們通常把 Scala/Java Dataset 的 RowS 稱為 DataFrames。

 準備開始

起點: SparkSession

Spark 所有功能的入口是 SparkSession 類。建立最基本的 SaprkSession, 只需要呼叫 SparkSession.builder():

scala版

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits. 

在  Spark 倉庫 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例程式碼。

java版

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

在  Spark 倉庫 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例程式碼。

Python版

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

在  Spark 倉庫 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例程式碼。

R語言

sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))

在  Spark 倉庫 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例程式碼

請注意,sparkR.session() 第一次被呼叫時,它會初始化一個全域性的 SparkSession 單例物件,並且之後繼續呼叫這個方法都將返回這個例項。 通過這種方式,使用者只需要對 SparkSession 做一次初始化,然後 SparkR 的其他方法比如 read.df 將會隱式地訪問這個全域性地單例物件, 並且使用者不需要傳遞 SparkSession 的例項。

Spark2.0 的 SparkSession 提供了對 HIVE 特性的內建支援, 包括使用 HiveQL 編寫查詢語句的能力,訪問 Hive UDFs 和 從 Hive Table 中讀取資料的能力。為了使用這些特性,您需要安裝一個 HIVE。

建立 DataFrame

有了SparkSession, 應用程式可以通過本地的 R data.frame、Hive Table、 或者 Spark 資料來源 來建立DataFrame。

作為示例,以下程式碼使用一個 JSON 檔案的內容 建立一個 DataFrame

Scala版

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

在  Spark 倉庫 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例程式碼。

Java版

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

在  Spark 倉庫 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例程式碼。

Python版

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

在  Spark 倉庫 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例程式碼。

R語言

df <- read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame
head(df)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
## +----+-------+
## | age|   name|
## +----+-------+
## |null|Michael|
## |  30|   Andy|
## |  19| Justin|
## +----+-------+

在  Spark 倉庫 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例程式碼

弱型別的 Dataset 操作(aka DataFrame 操作)

DataFrame 為 Scala、Java、Python、R語言提供了一種特定的結構化資料操作。

上面提到過,在 Spark2.0 中,DataFrame 對於 Scala 和 Java API 僅僅是 Dataset 的 RowS。這些操作也被稱為 “弱型別轉換”,這與 強型別的Scala/Java 中的 “強型別轉換” 形成了鮮明的對比。

這裡我們囊括了使用 Datasets 做結構化資料處理的基本示例:

Scala版

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在  Spark 倉庫 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例程式碼。

java版

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在  Spark 倉庫 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例程式碼。

Python版

 對於Python來說,我們可以通過屬性(df.age)或者通過索引(df['age']) 來訪問 DataFrame 的列。 雖然前者用於互動式資料探索非常方便, 但使用者強烈建議使用後者,因為它具有前瞻性,並且不會因為 DataFrame 的列命和屬性名重複產生衝突。

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+ 

在  Spark 倉庫 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例程式碼。

R語言

# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
head(df)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin


# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
head(select(df, "name"))
##      name
## 1 Michael
## 2    Andy
## 3  Justin

# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
##      name (age + 1.0)
## 1 Michael          NA
## 2    Andy          31
## 3  Justin          20

# Select people older than 21
head(where(df, df$age > 21))
##   age name
## 1  30 Andy

# Count people by age
head(count(groupBy(df, "age")))
##   age count
## 1  19     1
## 2  NA     1
## 3  30     1

在  Spark 倉庫 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例程式碼

關於 DataFrame 可執行的操作的完整列表,請移步 API Documentation

除了簡單的列引用和表示之外,DataFrame 同樣有一個豐富的函式庫,包括字串操作、日期演算法、常用數學操作 等等。 完整的列表可以在 DataFrame Function Reference.中找到。