1. 程式人生 > >SparkSQL(Spark-1.4.0)實戰系列(二)——DataFrames進階

SparkSQL(Spark-1.4.0)實戰系列(二)——DataFrames進階

本節主要內容如下

  1. DataFrame與RDD的互操作實戰
  2. 不同資料來源構建DataFrame實戰

DataFrame與RDD的互操作實戰

1 採用反映機制進行Schema型別推導(RDD到DataFrame的轉換)
SparkSQL支援RDD到DataFrame的自動轉換,實現方法是通過Case類定義表的Schema,Spark會通過反射機制讀取case class的引數名並將其配置成表的列名。

//匯入該語句後,RDD將會被隱式轉換成DataFrame
import sqlContext.implicits._

//定義一個類為Person的Case Class作為Schema
case class Person(name: String, age: Int) //讀取檔案並將資料Map成Person例項,然後轉換為DataFrame,採用toDF()方法,本例項從HDFS上進行資料讀取 val people = sc.textFile("/data/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() //將例項為peopler的DataFrame註冊成表 people.registerTempTable("people") //採用SQLContext中的sql方法執行SQL語句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") //輸出返回結果 teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

2 利用程式動態指定Schema
在某些應用場景下,我們可能並不能提前確定對應列的個數,因而case class無法進行定義,此時可以通過傳入一個字串來設定Schema資訊。具體過程如下:

// 建立RDD
val people = sc.textFile
("/data/people.txt") //Schema字串 val schemaString = "name age" // 匯入Row import org.apache.spark.sql.Row; //匯入Spark SQL資料型別 import org.apache.spark.sql.types.{StructType,StructField,StringType}; //利用schemaString動態生成Schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // 將people RDD轉換成Rows val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // 建立DataFrame val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) //註冊成表 peopleDataFrame.registerTempTable("people") //執行SQL語句. val results = sqlContext.sql("SELECT name FROM people") //列印輸出 results.map(t => "Name: " + t(0)).collect().foreach(println)

通過不同資料來源建立DataFrame

前面我們建立DataFrame時,讀取的是HDFS中的txt型別資料,在SparkSQL中,它支援多種資料來源,主要包括JSON、Parquet等。

//讀取json格式資料
val jsonFile= sqlContext.read.json("/data/people.json")

//jsonFile註冊成表
jsonFile.registerTempTable("peopleJson")
val teenagers = sqlContext.sql("SELECT name FROM peopleJson WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

這裡寫圖片描述

//儲存為parquet格式資料
jsonFile.select("name", "age").write.format("parquet").save("/data/namesAndAges.parquet")

這裡寫圖片描述
parquet檔案目錄結構如下圖
這裡寫圖片描述

//讀取parquet格式資料
val parquetFile = sqlContext.read.parquet(“/data/namesAndAges.parquet”)

//parquetFile註冊成表
parquetFile.registerTempTable(“parquetPerson”)
val teenagers = sqlContext.sql(“SELECT name FROM parquetPerson WHERE age >= 13 AND age <= 19”)
teenagers.map(t => “Name: ” + t(0)).collect().foreach(println)

新增公眾微訊號,可以瞭解更多最新技術資訊
這裡寫圖片描述