1. 程式人生 > >spark RDD,DataFrame,DataSet 介紹

spark RDD,DataFrame,DataSet 介紹

列式存儲 ren gre rds 包含 執行 這一 ces 中一

彈性分布式數據集(Resilient Distributed Dataset,RDD)

RDD是Spark一開始就提供的主要API,從根本上來說,一個RDD就是你的數據的一個不可變的分布式元素集合,在集群中跨節點分布,可以通過若幹提供了轉換和處理的底層API進行並行處理。每個RDD都被分為多個分區,這些分區運行在集群不同的節點上。

RDD支持兩種類型的操作,轉化操作(transform)和行動操作(action)。轉化操作會有一個RDD生成一個新的RDD,行動操作則要計算出來一個結果。spark對於轉化操作是惰性的,只有當遇到第一個action操作時才會去執行。

下面是使用RDD的場景和常見案例:

  • 你希望可以對你的數據集進行最基本的轉換、處理和控制;
  • 你的數據是非結構化的,比如流媒體或者字符流;
  • 你想通過函數式編程而不是特定領域內的表達來處理你的數據;
  • 你不希望像進行列式處理一樣定義一個模式,通過名字或字段來處理或訪問數據屬性;
  • 你並不在意通過DataFrame和Dataset進行結構化和半結構化數據處理所能獲得的一些優化和性能上的好處;

DataFrame 和DataSet

與RDD相似,DataFrame也是數據的一個不可變分布式集合。但與RDD不同的是,數據都被組織到有名字的列中,就像關系型數據庫中的表一樣。設計DataFrame的目的就是要讓對大型數據集的處理變得更簡單,它讓開發者可以為分布式的數據集指定一個模式,進行更高層次的抽象。它提供了特定領域內專用的API來處理你的分布式數據,並讓更多的人可以更方便地使用Spark,而不僅限於專業的數據工程師。

一圖勝千言,先從一張圖中看一下dataFrame和RDD的區別。

技術分享圖片

上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什麽。DataFrame多了數據的結構信息,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計劃的優化,比如filter下推、裁剪等。

對比與RDD,DataFrame的優點主要有以下幾項:

提升執行效率

RDD API是函數式的,強調不變性,在大部分場景下傾向於創建新對象而不是修改老對象。這一特點雖然帶來了幹凈整潔的API,卻也使得Spark應用程序在運行期傾向於創建大量臨時對象,對GC造成壓力。在現有RDD API的基礎之上,我們固然可以利用mapPartitions方法來重載RDD單個分片內的數據創建方式,用復用可變對象的方式來減小對象分配和GC的開銷,但這犧牲了代碼的可讀性,而且要求開發者對Spark運行時機制有一定的了解,門檻較高。另一方面,Spark SQL在框架內部已經在各種可能的情況下盡量重用對象,這樣做雖然在內部會打破了不變性,但在將數據返回給用戶時,還會重新轉為不可變數據。利用 DataFrame API進行開發,可以免費地享受到這些優化效果。

減少數據讀取

分析大數據,最快的方法就是 ——忽略它。這裏的“忽略”並不是熟視無睹,而是根據查詢條件進行恰當的剪枝。

上文討論分區表時提到的分區剪 枝便是其中一種——當查詢的過濾條件中涉及到分區列時,我們可以根據查詢條件剪掉肯定不包含目標數據的分區目錄,從而減少IO。

對於一些“智能”數據格 式,Spark SQL還可以根據數據文件中附帶的統計信息來進行剪枝。簡單來說,在這類數據格式中,數據是分段保存的,每段數據都帶有最大值、最小值、null值數量等 一些基本的統計信息。當統計信息表名某一數據段肯定不包括符合查詢條件的目標數據時,該數據段就可以直接跳過(例如某整數列a某段的最大值為100,而查詢條件要求a > 200)。

此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其余列的數據。

執行優化

技術分享圖片 人口數據分析示例

為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之後又做了一次filter操作。如果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,便可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基於關系代數的等價變換,將高成本的操作替換為低成本操作的過程。

得到的優化執行計劃在轉換成物 理執行計劃的過程中,還可以根據具體的數據源的特性將過濾條件下推至數據源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用於執行最終的讀取操作的表掃描節點內。

對於普通開發者而言,查詢優化 器的意義在於,即便是經驗並不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。


DataFrame和DataSet

Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record存儲的是一個強類型值而不是一個Row。因此具有如下三個特點:

  • DataSet可以在編譯時檢查類型

  • 並且是面向對象的編程接口。用wordcount舉例:

//DataFrame

// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
  .flatMap(_.split(" "))               // Split on whitespace
  .filter(_ != "")                     // Filter empty words
  .toDF()                              // Convert to DataFrame to perform aggregation / sorting
  .groupBy($"value")                   // Count number of occurences of each word
  .agg(count("*") as "numOccurances")
  .orderBy($"numOccurances" desc)      // Show most common words first
//DataSet,完全使用scala編程,不要切換到DataFrame

val wordCount = 
  ds.flatMap(_.split(" "))
    .filter(_ != "")
    .groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
    .count()
  • 後面版本DataFrame會繼承DataSet,DataFrame是面向Spark SQL的接口。

DataFrame和DataSet可以相互轉化,df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。

參考資料:

1 《spark快速大數據分析》

2、https://www.jianshu.com/p/c0181667daa0

3、http://www.infoq.com/cn/articles/three-apache-spark-apis-rdds-dataframes-and-datasets

spark RDD,DataFrame,DataSet 介紹