1. 程式人生 > >Spark SQL中RDDs轉化為DataFrame(詳細全面)

Spark SQL中RDDs轉化為DataFrame(詳細全面)

除了呼叫SparkSesion.read().json/csv/orc/parqutjdbc 方法從各種外部結構化資料來源建立DataFrame物件外,Spark SQL還支援

將已有的RDD轉化為DataFrame物件,但是需要注意的是,並不是由任意型別物件組成的RDD均可轉化為DataFrame 物件,

只有當組成RDD[T]的每一個T物件內部具有公有且鮮明的欄位結構時,才能隱式或顯式地總結出建立DataFrame物件所必要的

結構資訊(Schema) 進行轉化,進而在DataFrame上呼叫RDD所不具備的強大豐富的API,或執行簡潔的SQL查詢。

Spark SQL支援將現有RDDS轉換為DataFrame的兩種不同方法,其實也就是隱式推斷或者顯式指定DataFrame物件的Schema。

1.使用反射機制( Reflection )推理出schema (結構資訊)

第一種將RDDS轉化為DataFrame的方法是使用Spark SQL內部反射機制來自動推斷包含特定型別物件的RDD的schema

(RDD的結構資訊)進行隱式轉化。採用這種方式轉化為DataFrame物件,往往是因為被轉化的RDDIT]所包含的T物件本身就

是具有典型-一維表嚴格的欄位結構的物件,因此Spark SQL很容易就可以自動推斷出合理的Schema這種基於反射機制隱式

地建立DataFrame的方法往往僅需更簡潔的程式碼即可完成轉化,並且執行效果良好。

Spark SQL的Scala介面支援自動將包含樣例類( case class物件的RDD轉換為DataFrame物件。在樣例類的宣告中 已預先定義

了表的結構資訊,內部通過反射機制即可讀取樣例類的引數的名稱、型別,轉化為DataFrame物件的Schema.樣例類不僅可以

包含Int、Double、String這樣的簡單資料型別,也可以巢狀或包含複雜型別,例如Seq或Arrays.

注意SparkContext是RDD的程式設計的主入口,SparkSession是SparkSQL的主入口,SparkSession初始化時,Sparkcontext

和SparkConf也會例項化,可有SparkSession呼叫。

以下將含有Person的樣例類物件的RDD隱式轉化為DataFrame物件的例項:

首先宣告Person樣例類,Person類物件用於裝載name,age

case class Person(name:String,age:Long)

匯入隱式類

import spark.implicits._

建立RDD

val personRDD = sparkSession.sparkContext.textFile("/spark/data/people.txt").map(_.split(" ")).map(attributes=>Person(attributes(0), attributes(1).trim.toInt))

轉化為DataFrame

val peopleDF = personRDD .toDF()

2.由開發者指定Schema

RDD轉化DataFrame的第二種方法是通過程式設計介面,允許先構建個schema,然後將其應用到現有的RDD(Row),較前一種方法

由樣例類或基本資料型別 (Int、String) 物件組成的RDD加過toDF ()直接隱式轉化為DataFrame不同,不僅需要根據需求、以及

資料結構構建Schema,而且需要將RDD[TI轉化為Row物件組成的RDD (RDD[Row]),這種方法雖然程式碼量一些,但也提供了更高

的自由度,更加靈活。

當case類不能提前定義時(例如資料集的結構資訊已包含在每一行中、一個文字資料集的事段對不同使用者來說需要被解析成不同

的欄位名),這時就可以通過以下三部完成Dataframe的轉化

(1)根據需求從源RDD轉化成RDD of rows.

(2)建立由符合在驟1中建立的RDD中的Rows結構的StructType表示的模式。

(3)通過SparkSession提供的createDataFrame方法將模式應用於行的RDD.

然後:val peopleDF  =  spark.createDataFrame(rowRDD,schema)