1. 程式人生 > >Spark中的RDD、DataFrame、Dataset對比

Spark中的RDD、DataFrame、Dataset對比

1 RDD

RDD是Spark提供的最重要的抽象的概念,彈性的分散式資料集,它是一種有容錯機制的特殊集合,可以分佈在叢集的節點上,以函式式編操作集合的方式,進行各種並行操作。Spark的RDD內建了各種函式操作,舉個例子,我們編寫wordcount案例,如果使用mapreduce進行程式設計,還是很複雜的,如果用RDD的話程式碼量大大的減少(scala程式設計一句話搞定),所以相對mapreduce來說單從程式設計上就簡化了很多。但是同時也出現了一個問題,學習Scala、python、java語言,那麼這個使用的成本以及門檻就會很高了對於不太懂開發的人(DBA)想要使用spark是比較困難的。

2 DataFrame

對於DataFrame這一概念最早是出現在R和Pandas裡面的,R語言是非常適合做一些資料統計和分析的一些操作,但是它僅支援單機的處理,隨著網際網路的快速發展,單機處理的日誌、資料必然是很有限的,而且現在的日誌/資料量是越來越大,隨著spark的不斷壯大,在spark裡面就出現了DataFrame的API(1.3版本出現的)。

A DataFrame is a Dataset organized into named columns
DataFrame是一個數據集,他會被按照列進行組織,並且這個列是被取了一個名稱的。它在概念上等同於關係資料庫中的表或R / Python中的資料框,但是進行了更豐富的優化。 DataFrame可以從各種來源構建而成(外部資料來源

),例如:結構化資料檔案,Hive中的表格,外部資料庫或現有的RDD。 DataFrame API可用於Scala,Java,Python和R.在Scala和Java中。 在Scala API中,DataFrame只是Dataset [Row]的類型別名。 而在Java API中,使用者需要使用資料集來表示DataFrame。

總的來說DataFrame 是一個Dataset,那是一個Dataset又是啥子呢,先彆著急,有個概念的認知。但是我們可以知道DataFrame 概念上等同於關係資料庫中的表,看到表,我們肯定會想到,表中的一些欄位,型別,值等等。而且還能通過各種資料來源構建,瞬間感覺功能好強大啊,還有一句,可以通過現有的RDD獲取也就是涉及到DataFrame和RDD之間的轉化(後面進行詳解)。

3 DataFrame VS RDD

通過上面的介紹我們可以知道DataFrame和RDD都是分散式的資料集,但是DataFrame更像是一個傳統的資料庫裡面的表,他除了資料之外還能夠知道更多的資訊,比如說列名、列值和列的屬性,這一點就和hive很類似了,而且他也能夠支援一些複雜的資料格式。從API應用的角度來說DataFrame提供的API他的層次更高,比RDD程式設計還要方便,學習的門檻更低。下面舉個例子進行對比:
這裡寫圖片描述
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為型別引數,但Spark框架本身不瞭解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構資訊,使得Spark SQL可以清楚地知道該資料集中包含哪些列,每列的名稱和型別各是什麼。DataFrame多了資料的結構資訊,即schema。RDD是分散式的Java物件的集合。DataFrame是分散式的Row物件的集合。

sparksql執行的時候就能瞭解到更多的資訊,sparksql的查詢優化器(Catalyst)能夠更好的優化,比如你只是想要那麼欄位,它僅僅只需要把那麼那一列取出來就可以了,age和height根本就不需要去讀取了,有了這些資訊以後在編譯的時候能夠做更多的優化,比如filter下推、裁剪等。
使用RDD的方式如果你用java/Scala那你需要執行在jvm上,python就是python runtime,所以效能上會有所差別。
但是DataFrame底層採用同一個優化器,在效能上都是一樣的。

4Dataset

Dataset是分散式資料集合。是Spark1.6中新增的新介面,它提供了RDD的優點以及Spark SQL優化執行引擎的優勢。資料集可以從JVM物件構建,然後使用功能轉換(map, flatMap, filter, 等)進行操作。資料集API可用於Scala和Java。 Python不支援資料集API。但由於Python的動態特性,資料集API的許多優點已經可用(即,您可以通過自然的row.columnName名稱訪問行的欄位)。 R的情況類似。

5 Datasets VS DataFrames

Dataset to represent a DataFrame.
Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record儲存的是一個強型別值而不是一個Row。因此具有如下三個特點:
1. DataSet可以在編譯時檢查型別
2. 並且是面向物件的程式設計介面。
3. 後面版本DataFrame會繼承DataSet,DataFrame是面向Spark SQL的介面。

6 DataFrame與RDD互操作

Spark SQL支援將現有RDD轉換為Datasets的兩種不同方法。 第一種方法使用反射來推斷包含特定型別物件的RDD的模式。這種基於反射的方法會導致更簡潔的程式碼,並且在編寫Spark應用程式時已經知道模式的情況下工作良好.第二種建立資料集的方法是通過程式設計介面,允許您構建模式,然後將其應用於現有的RDD。 雖然此方法更詳細,但它允許您在直到執行時才知道列及其型別時才能構建資料集。

  • 反射方式
    scala提供給Spark SQL的介面支援自動將一個包含case class的RDD轉成DataFrame,這個case class必須定義了表的schema,case class中的引數通過反射讀取進來就成了列名,class class可以巢狀或者包含一些複雜的型別,比如sequence、Array等。這個RDD可以隱式地轉換成一個DataFrame並且註冊成一張表,該表可以使用SQL進行查詢。
    注意:使用這種方式在scala2.10之前最多支援22個欄位,scala2.11解決了這問題。
person.txt

1,20,Michael
2,30,Andy
3,19,ustin
4,45,Tom
import org.apache.spark.sql.SparkSession

object RDDToDataFrame {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("RDDToDataFrame").master("local[2]").getOrCreate()

    //RDD
    val personRDD=spark.sparkContext.textFile("E:/person.txt")
        .map((_.split(","))).map(attributes => Person(attributes(0).toInt, attributes(1).trim.toInt,attributes(2)))

    //匯入隱式轉換
    import spark.implicits._
    //RDD->DataFrame
    val personDF=personRDD.toDF()
    personDF.show()

    spark.stop()
  }

  case class Person(id: Int,  age: Int,name: String)
}


結果:
+---+---+-------+
| id|age|   name|
+---+---+-------+
|  1| 20|Michael|
|  2| 30|   Andy|
|  3| 19|  ustin|
|  4| 45|    Tom|
+---+---+-------+

/只查詢name
//personDF.select("name").show()

//filter
//personDF.filter(personDF.col("id")>2).show()


通過SQL方式進行查詢

    //用sparkSQL的方式來查詢資料
     personDF.createTempView("person")
    spark.sql("show tables").show()

    spark.sql("select * from person").show()

小結:
步驟:首先定義一個case class 使用sparkContext.textFile讀取檔案得到一個RDD,把RDD轉化為DF
1. 需要事先知道Schema結構
2. 使用反射來推斷包含了特定資料型別的RDD的元資料,這個元資料就是個case class
3. 可以使用DataFrame API或者SQL方式程式設計

  • 程式設計方式
    第二種程式設計方式是比第一種反射方式要複雜的,但是這第二種他允許你構建一個DataFrame/Dataset,什麼條件下呢?就是當你事先不知道這些列和他們的型別除非等到執行的時候才知道。
    首先官網說了,當我們的case class不能提前定義的時候就需要使用這一種方式了,這個方式我們必須要遵從下面三個步驟:
    1. 建立一個RDD,我們用RowS來建立;
    2. 定義一個Schema,我們用StructType來定義;
    3. 把這個Schema作用到RDD的RowS上面通過createDataFrame這個方法來實現,當然這個方法是通過SaprkSession來提供的.
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}


object RDDToDataFrame1 {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("RDDToDataFrame1").master("local[2]").getOrCreate()

    // Create an RDD
    val personRDD=spark.sparkContext.textFile("E:/person.txt")

    // The schema is encoded in a string
    val schemalString="id,age,name"

    // Generate the schema based on the string of schema
    val fields=schemalString.split(",").map(filedName=>StructField(filedName,StringType,nullable =true ))
    val schema=StructType(fields)

    // Convert records of the RDD (person) to Rows
    val rowRDD=personRDD.map(x=>x.split(",")).map(attributes=>Row(attributes(0),attributes(1),attributes(2)))

    // Apply the schema to the RDD
    val personDF = spark.createDataFrame(rowRDD, schema)

    //print nice tree
    personDF.printSchema()
    //show
    personDF.show()

  }
}

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- name: string (nullable = true)

+---+---+-------+
| id|age|   name|
+---+---+-------+
|  1| 20|Michael|
|  2| 30|   Andy|
|  3| 19|  ustin|
|  4| 45|    Tom|
+---+---+-------+

總結:
DataFrame和RDD互操作的兩個方式:
1、反射:case class 前提:事先需要知道你的欄位、欄位型別
2、程式設計:Row 如果第一種情況不能滿足你的要求(事先不知道列)
3、選型:優先考慮第一種