1. 程式人生 > >【spark 讀寫資料】資料來源的讀寫操作

【spark 讀寫資料】資料來源的讀寫操作

通用的 Load/Save 函式

在最簡單的方式下,預設的資料來源(parquet 除非另外配置通過spark.sql.sources.default)將會用於所有的操作。

Parquet 是一個列式儲存格式的檔案,被許多其他資料處理系統所支援。Spark SQL 支援對 Parquet 檔案的讀寫還可以自動的儲存源資料的模式


val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet"
)

手動指定選項

你也可以手動的指定資料來源,並且將與你想要傳遞給資料來源的任何額外選項一起使用。資料來源由其完全限定名指定(例如 : org.apache.spark.sql.parquet),不過對於內建資料來源你也可以使用它們的縮寫名(json, parquet, jdbc)。使用下面這個語法可以將從任意型別資料來源載入的DataFrames 轉換為其他型別。

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age"
).write.format("parquet").save("namesAndAges.parquet")

直接在檔案上執行 SQL

你也可以直接在檔案上執行 SQL 查詢來替代使用 API 將檔案載入到 DataFrame 再進行查詢。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

儲存為持久化的表

import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json"
) peopleDF.write.parquet("people.parquet") val parquetFileDF = spark.read.parquet("people.parquet") parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+