SparkSQL(Spark-1.4.0)實戰系列(二)——DataFrames進階
阿新 • • 發佈:2019-02-05
本節主要內容如下
- DataFrame與RDD的互操作實戰
- 不同資料來源構建DataFrame實戰
DataFrame與RDD的互操作實戰
1 採用反映機制進行Schema型別推導(RDD到DataFrame的轉換)
SparkSQL支援RDD到DataFrame的自動轉換,實現方法是通過Case類定義表的Schema,Spark會通過反射機制讀取case class的引數名並將其配置成表的列名。
//匯入該語句後,RDD將會被隱式轉換成DataFrame
import sqlContext.implicits._
//定義一個類為Person的Case Class作為Schema
case class Person(name: String, age: Int)
//讀取檔案並將資料Map成Person例項,然後轉換為DataFrame,採用toDF()方法,本例項從HDFS上進行資料讀取
val people = sc.textFile("/data/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
//將例項為peopler的DataFrame註冊成表
people.registerTempTable("people")
//採用SQLContext中的sql方法執行SQL語句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
//輸出返回結果
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
2 利用程式動態指定Schema
在某些應用場景下,我們可能並不能提前確定對應列的個數,因而case class無法進行定義,此時可以通過傳入一個字串來設定Schema資訊。具體過程如下:
// 建立RDD
val people = sc.textFile ("/data/people.txt")
//Schema字串
val schemaString = "name age"
// 匯入Row
import org.apache.spark.sql.Row;
//匯入Spark SQL資料型別
import org.apache.spark.sql.types.{StructType,StructField,StringType};
//利用schemaString動態生成Schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 將people RDD轉換成Rows
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 建立DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//註冊成表
peopleDataFrame.registerTempTable("people")
//執行SQL語句.
val results = sqlContext.sql("SELECT name FROM people")
//列印輸出
results.map(t => "Name: " + t(0)).collect().foreach(println)
通過不同資料來源建立DataFrame
前面我們建立DataFrame時,讀取的是HDFS中的txt型別資料,在SparkSQL中,它支援多種資料來源,主要包括JSON、Parquet等。
//讀取json格式資料
val jsonFile= sqlContext.read.json("/data/people.json")
//jsonFile註冊成表
jsonFile.registerTempTable("peopleJson")
val teenagers = sqlContext.sql("SELECT name FROM peopleJson WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
//儲存為parquet格式資料
jsonFile.select("name", "age").write.format("parquet").save("/data/namesAndAges.parquet")
parquet檔案目錄結構如下圖
//讀取parquet格式資料
val parquetFile = sqlContext.read.parquet(“/data/namesAndAges.parquet”)
//parquetFile註冊成表
parquetFile.registerTempTable(“parquetPerson”)
val teenagers = sqlContext.sql(“SELECT name FROM parquetPerson WHERE age >= 13 AND age <= 19”)
teenagers.map(t => “Name: ” + t(0)).collect().foreach(println)
新增公眾微訊號,可以瞭解更多最新技術資訊