【Spark】Spark SQL, DataFrames and Datasets Guide(翻譯文,持續更新)
本文主要是翻譯Spark官網Spark SQL programming guide 。只能保證大概意思,儘量保證細節。英文水平有限,如果有錯誤的地方請指正,輕噴。目錄導航在右上角
概述
Spark SQL 是一個結構化資料處理的 Spark 模組 。 與基礎的 Spark RDD API 不同的是, Spark SQL 所提供的介面為 Spark 提供了 更多關於資料結構和正在執行的計算結構的資訊。 Spark 在其內部利用這些額外的資訊去做更多的優化。有幾種用於和 Sparrk SQL
互動的方法,包括 SQL 和 Dataset API。 當你計算一個結果, 會使用同一個執行引擎, 這獨立於你所用來描述這個演算法的API和語言。這種一致性意味著開發者可以輕易地在不同的 API 中來回切換, 因為它為表達給定的轉換提供了最自然的方式。
本頁所有示例使用了 Spark 提供的樣例資料並且可以在 spark-shell 、pyspark shell 或者 sparkR shell 中執行。
SQL
Spark SQL 的一個用處是執行 SQL 查詢。 Spark SQL 同樣可以用來從 現有的 HIVE 中讀取資料。 更多有關配置這個特性的資訊,請查閱 HIVE Tables 部分。當你使用其他語言執行SQL時,將會返回一個 Dataset 或者 DataFrame 作為結果。你同樣可以使用命令列或者 JDBC/ODBC 與 SQL 介面進行互動。
Dataset 和 Dataframe
Dataset 是一種分散式資料集,是 Spark1.6 新增的介面。它提供了RDD(強型別,可以使用強大的 lambda 表示式)的優點,並受益於Spark SQL 的優化執行引擎。Dataset 可以通過 JVM 構建,然後使用轉換方法(map, flatMap, filter等等)進行操作。 Dataset API
在 Java 和 Scala 中可用。 Python 並不支援Dataset API。但是由於Python的動態特性, Dataset API 的很多優勢都是可用的(比如你可以自然地使用名稱 row.columnName 來訪問 row 的域 )。 R 語言的情況類似。
DataFrame 是一種按列命名組織的 Dataset, 它在概念上等價於關係型資料庫的一個表或者 R/Python 的一個數據幀, 但是它(DataFrame)的底層做了更多的優化。DataFrame 可以通過大量的資料來源構建,例如:結構化的資料檔案, HIVE 的表, 資料庫,或現有的RDD。Java、Python、Scala、R語言都支援 DataFrame API。 在 Scala 和 Java, DataFrame 由Dataset的 rowS 表示。 在 Scala API 中,DataFrame 可以簡單地認為是 Dataset[Row] 的別名。 然而,在 Java API 中, 使用者需要使用 Dataset<Row> 來表示 DataFrame。
在整個文件中, 我們通常把 Scala/Java Dataset 的 RowS 稱為 DataFrames。
準備開始
起點: SparkSession
Spark 所有功能的入口是 SparkSession 類。建立最基本的 SaprkSession, 只需要呼叫 SparkSession.builder():
scala版
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits.
在 Spark 倉庫 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例程式碼。
java版
import org.apache.spark.sql.SparkSession; SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate();
在 Spark 倉庫 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例程式碼。
Python版
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
在 Spark 倉庫 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例程式碼。
R語言
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
在 Spark 倉庫 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例程式碼
請注意,sparkR.session() 第一次被呼叫時,它會初始化一個全域性的 SparkSession 單例物件,並且之後繼續呼叫這個方法都將返回這個例項。 通過這種方式,使用者只需要對 SparkSession 做一次初始化,然後 SparkR 的其他方法比如 read.df 將會隱式地訪問這個全域性地單例物件, 並且使用者不需要傳遞 SparkSession 的例項。
Spark2.0 的 SparkSession 提供了對 HIVE 特性的內建支援, 包括使用 HiveQL 編寫查詢語句的能力,訪問 Hive UDFs 和 從 Hive Table 中讀取資料的能力。為了使用這些特性,您需要安裝一個 HIVE。
建立 DataFrame
有了SparkSession, 應用程式可以通過本地的 R data.frame、Hive Table、 或者 Spark 資料來源 來建立DataFrame。
作為示例,以下程式碼使用一個 JSON 檔案的內容 建立一個 DataFrame
Scala版
val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 倉庫 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例程式碼。
Java版
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
在 Spark 倉庫 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例程式碼。
Python版
# spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
在 Spark 倉庫 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例程式碼。
R語言
df <- read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin # Another method to print the first few rows and optionally truncate the printing of long values showDF(df) ## +----+-------+ ## | age| name| ## +----+-------+ ## |null|Michael| ## | 30| Andy| ## | 19| Justin| ## +----+-------+
在 Spark 倉庫 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例程式碼
弱型別的 Dataset 操作(aka DataFrame 操作)
DataFrame 為 Scala、Java、Python、R語言提供了一種特定的結構化資料操作。
上面提到過,在 Spark2.0 中,DataFrame 對於 Scala 和 Java API 僅僅是 Dataset 的 RowS。這些操作也被稱為 “弱型別轉換”,這與 強型別的Scala/Java 中的 “強型別轉換” 形成了鮮明的對比。
這裡我們囊括了使用 Datasets 做結構化資料處理的基本示例:
Scala版
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
在 Spark 倉庫 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例程式碼。
java版
// col("...") is preferable to df.col("...") import static org.apache.spark.sql.functions.col; // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
在 Spark 倉庫 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例程式碼。
Python版
對於Python來說,我們可以通過屬性(df.age)或者通過索引(df['age']) 來訪問 DataFrame 的列。 雖然前者用於互動式資料探索非常方便, 但使用者強烈建議使用後者,因為它具有前瞻性,並且不會因為 DataFrame 的列命和屬性名重複產生衝突。
# spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
在 Spark 倉庫 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例程式碼。
R語言
# Create the DataFrame df <- read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin # Print the schema in a tree format printSchema(df) ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # Select only the "name" column head(select(df, "name")) ## name ## 1 Michael ## 2 Andy ## 3 Justin # Select everybody, but increment the age by 1 head(select(df, df$name, df$age + 1)) ## name (age + 1.0) ## 1 Michael NA ## 2 Andy 31 ## 3 Justin 20 # Select people older than 21 head(where(df, df$age > 21)) ## age name ## 1 30 Andy # Count people by age head(count(groupBy(df, "age"))) ## age count ## 1 19 1 ## 2 NA 1 ## 3 30 1
在 Spark 倉庫 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例程式碼
關於 DataFrame 可執行的操作的完整列表,請移步 API Documentation。
除了簡單的列引用和表示之外,DataFrame 同樣有一個豐富的函式庫,包括字串操作、日期演算法、常用數學操作 等等。 完整的列表可以在 DataFrame Function Reference.中找到。