1. 程式人生 > >spark基礎之RDD和DataFrame的轉換方式

spark基礎之RDD和DataFrame的轉換方式

一 通過定義Case Class,使用反射推斷Schema

定義Case Class,在RDD的轉換過程中使用Case Class可以隱式轉換成SchemaRDD,然後再註冊成表,然後就可以利用sqlContext或者SparkSession操作了。

我們給出一個電影測試資料film.txt,定一個Case Class(Film),然後將資料檔案讀入後隱式轉換成SchemeRDD:film,並將film在SparkSession中註冊,最後對錶進行查詢

1.1 上傳測試資料

hdfs dfs -put /opt/data/film.txt /user/hadoop

1.2 定義SparkSession,靜態匯入其所有成員

val session = SparkSession.builder()
    .appName("Case Class To Define RDD")
    .config("spark.some.config.option", "some-value")
    .master("local[*]")
    .getOrCreate()

import session.implicits._

1.3 定義Film類,讀入資料並建立檢視

val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt"
) val filmDF = filmRdd     .map(_.split(","))     .map(fields => Film(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat))     .toDF() filmDF.createOrReplaceTempView("film")

1.4 查詢分數大於5.0的電影

val results =session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0"
)

1.5 對獲取到的Dataset進行對映,因為不知道資料的schema,所以我們需要getAs方法獲取對應的列,並將每一行結果返回,最後列印結果

val filmDS = results.map(film => {
    val name = film.getAs[String]("name")
    val director = film.getAs[String]("director")
    val style = film.getAs[String]("style")
    val score = film.getAs[Float]("score")
    (name,director,style,score)
})
filmDS.show(10)

二 通過程式設計介面,定義Schema,並應用到RDD上

通過使用createDataFrame定義RDD,通常有三個步驟

# 建立初始RDD

# 構建Row型別的RDD

# 構建該RDD對應的schema

然後呼叫createDataFrame方法

2.1 建立SparkSession,靜態匯入成員

val session = SparkSession.builder()
    .appName("Create DataFrame API To Define RDD")
    .config("spark.some.config.option", "some-value")
    .master("local[*]")
    .getOrCreate()

import session.implicits._

2.2HDFS 讀取資料,構建初始RDD

val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt")

2.3構建Row型別的RDD

val rowRdd = filmRdd
    .map(_.split(","))
    .map(fields =>
        Row(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat)
    )

2.4 構建該RDD對應的schema

// 這裡的資料型別必須和資料來源所有型別對應val schema:StructType = StructType(Array(
    StructField("filmid",StringType),
    StructField("director",StringType),
    StructField("name",StringType),
    StructField("release_time",StringType),
    StructField("box_office",IntegerType),
    StructField("style",StringType),
    StructField("score",FloatType)
))

2.5 建立DataFrame,並建立匯或者替換檢視,然後查詢查詢分數大於5.0的電影

val filmDF = session.createDataFrame(rowRdd,schema)
filmDF.createOrReplaceTempView("film")
val results = session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0")

2.6 獲取結果,進行展示

val filmDS = results.map(film => {
    val name = film.getAs[String]("name")
    val director = film.getAs[String]("director")
    val style = film.getAs[String]("style")
    val score = film.getAs[Float]("score")
    (name,director,style,score)
})
filmDS.show(10)