1. 程式人生 > >RDD、DataFrame、Dataset介紹

RDD、DataFrame、Dataset介紹

rdd
優點:
編譯時型別安全 
編譯時就能檢查出型別錯誤
面向物件的程式設計風格 
直接通過類名點的方式來操作資料
缺點:
序列化和反序列化的效能開銷 
無論是叢集間的通訊, 還是IO操作都需要對物件的結構和資料進行序列化和反序列化.
GC的效能開銷 

頻繁的建立和銷燬物件, 勢必會增加GC

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))

DataFrame:

DataFrame引入了schema和off-heap
schema : RDD每一行的資料, 結構都是一樣的. 這個結構就儲存在schema中. Spark通過schame就能夠讀懂資料, 因此在通訊和IO時就只需要序列化和反序列化資料, 而結構的部分就可以省略了.
off-heap : 意味著JVM堆以外的記憶體, 這些記憶體直接受作業系統管理(而不是JVM)。Spark能夠以二進位制的形式序列化資料(不包括結構)到off-heap中, 當要操作資料時, 就直接操作off-heap記憶體. 由於Spark理解schema, 所以知道該如何操作.
off-heap就像地盤, schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制, 也就不再收GC的困擾了.
通過schema和off-heap, DataFrame解決了RDD的缺點, 但是卻丟了RDD的優點. DataFrame不是型別安全的, API也不是面向物件風格的.

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)
    /**
      * id      age
      * 1       30
      * 2       29
      * 3       21
      */
    val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

    val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

    val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
    // API不是面向物件的
    idAgeDF.filter(idAgeDF.col("age") > 25) 
    // 不會報錯, DataFrame不是編譯時型別安全的
    idAgeDF.filter(idAgeDF.col("age") > "") 
  }
}

1、RDD和Dataset不同,DataFrame每一行的型別固定為Row,只有通過解析才能獲取各個欄位的值

testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}
2、DataFrame與Dataset均支援sparksql的操作,比如select,groupby之類,還能註冊臨時表/視窗,進行sql語句操作,如
dataDF.createOrReplaceTempView("tmp")
spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

3、DataFrame與Dataset支援一些特別方便的儲存方式,比如儲存成csv,可以帶上表頭,這樣每一列的欄位名一目瞭然
//儲存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")
datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//讀取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")
val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load()

Dataset:
這裡主要對比Dataset和DataFrame,因為Dataset和DataFrame擁有完全相同的成員函式,區別只是每一行的資料型別不同
DataFrame也可以叫Dataset[Row],每一行的型別是Row,不解析,每一行究竟有哪些欄位,各個欄位又是什麼型別都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定欄位
而Dataset中,每一行是什麼型別是不一定的,在自定義了case class之後可以很自由的獲得每一行的資訊
case class Coltest(col1:String,col2:Int)extends Serializable //定義欄位名和型別
/**
      rdd
      ("a", 1)
      ("b", 1)
      ("a", 1)
      * */
val test: Dataset[Coltest]=rdd.map{line=>
      Coltest(line._1,line._2)
    }.toDS
test.map{
      line=>
        println(line.col1)
        println(line.col2)
    }

Dataset在需要訪問列中的某個欄位時是非常方便的,然而,如果要寫一些適配性很強的函式時,如果使用Dataset,行的型別又不確定,可能是各種case class,無法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題

DataSet結合了RDD和DataFrame的優點, 並帶來的一個新的概念Encoder
當序列化資料時, Encoder產生位元組碼與off-heap進行互動, 能夠達到按需訪問資料的效果, 而不用反序列化整個物件. Spark還沒有提供自定義Encoder的API, 但是未來會加入.

下面這段程式碼, 在1.6.x中建立的是DataFrame

// 上文DataFrame示例中提取出來的
val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)

但是同樣的程式碼在2.0.0-preview中, 建立的雖然還叫DataFrame
// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實現, 返回值依然是DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
sparkSession.createDataFrame(rowRDD, schema)
}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local") // 除錯的時候一定不要用local[*]
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

    val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

    // 在2.0.0-preview中這行程式碼創建出的DataFrame, 其實是DataSet[Row]
    val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)

    // 在2.0.0-preview中, 還不支援自定的Encoder, Row型別不行, 自定義的bean也不行
    // 官方文件也有寫通過bean建立Dataset的例子,但是我執行時並不能成功
    // 所以目前需要用建立DataFrame的方法, 來建立DataSet[Row]
    // sqlContext.createDataset(idAgeRDDRow)

    // 目前支援String, Integer, Long等型別直接建立Dataset
    Seq(1, 2, 3).toDS().show()
    sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()
  }
}


但是其實卻是DataSet, 因為DataFrame被宣告為Dataset[Row]
package object sql {
  // ...省略了不相關的程式碼

  type DataFrame = Dataset[Row]
}

因此當我們從1.6.x遷移到2.0.0的時候, 無需任何修改就直接用上了DataSet.

下面是一段DataSet的示例程式碼

DataFrame/Dataset轉RDD:

這個轉換很簡單

val rdd1=testDF.rdd
val rdd2=testDS.rdd


RDD轉DataFrame:

import spark.implicits._
val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")


一般用元組把一行的資料寫在一起,然後在toDF中指定欄位名

RDD轉Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義欄位名和型別
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS


可以注意到,定義每一行的型別(case class)時,已經給出了欄位名和型別,後面只要往case class裡面新增值即可

Dataset轉DataFrame:

這個也很簡單,因為只是把case class封裝成Row

import spark.implicits._
val testDF = testDS.toDF


DataFrame轉Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義欄位名和型別
val testDS = testDF.as[Coltest]


這種方法就是在給出每一列的型別後,使用as方法,轉成Dataset,這在資料型別是DataFrame又需要針對各個欄位處理時極為方便

特別注意:

在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用