1. 程式人生 > >別再人云亦云了!!!你真的搞懂了RDD、DF、DS的區別嗎?

別再人云亦云了!!!你真的搞懂了RDD、DF、DS的區別嗎?

幾年前,包括最近,我看了各種書籍、教程、官網。但是真正能夠把RDD、DataFrame、DataSet解釋得清楚一點的、論據多一點少之又少,甚至有的人號稱Spark專家,但在這一塊根本說不清楚。還有國內的一些書籍,小猴真的想問一聲:Are you OK?書名別再叫精通xxx技術了,請改名為 xxx技術從入門到放棄。這樣可以有效避免耽誤別人學習,不好嗎? 大家都在告訴我們結論,但其實,小猴作為一名長期混跡於開源社群、並仍在一線大資料開發的技術人,深諳技術文化之一: > **To experience** | 去經歷 這是我要提倡的技術文化之一。之前有人把Experience譯為體驗,但在小猴的技術世界裡,Experience更多的是自己去經歷,而不能跟團去旅遊一樣,那樣你只能是一個外包而已,想要做到卓越,就得去經歷。技術,只有去經歷才會有成長。 ## 目錄 [TOC] ## RDD、DataFrame、DataSet介紹 我們每天都在基於框架開發,對於我們來說,一套易於使用的API太重要了。對於Spark來說,有三套API。 ![image-20210201000858671](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210201000858.png) 分別是: * RDD * DataFrame * DataSet 三套的API,開發人員就要學三套。不過,從Spark 2.2開始,DataFrame和DataSet的API已經統一了。而編寫Spark程式的時候,RDD已經慢慢退出我們的視野了。 但Spark既然提供三套API,我們到底什麼時候用RDD、什麼時候用DataFrame、或者DataSet呢?我們先來了解下這幾套API。 ### RDD #### RDD的概念 * RDD是Spark 1.1版本開始引入的。 * RDD是Spark的基本資料結構。 * RDD是Spark的彈性分散式資料集,它是不可變的(Immutable)。 * RDD所描述的資料分佈在叢集的各個節點中,基於RDD提供了很多的轉換的並行處理操作。 * RDD具備容錯性,在任何節點上出現了故障,RDD是能夠進行容錯恢復的。 * **RDD專注的是How!**就是如何處理資料,都由我們自己來去各種運算元來實現。 #### 什麼時候使用RDD? * **應該避免使用RDD!** #### RDD的短板 * 叢集間通訊都需要將JVM中的物件進行序列化和反序列化,RDD開銷較大 * 頻繁建立和銷燬物件會增加GC,GC的效能開銷較大 ![image-20210201231436680](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210201231436.png) > **Spark 2.0開始,RDD不再是一等公民** > > 從Apache Spark 2.0開始,RDD已經被降級為二等公民,RDD已經被棄用了。而且,我們一會就會發現,DataFrame/DataSet是可以和RDD相互轉換的,DataFrame和DataSet也是建立在RDD上。 ### DataFrame #### DataFrame概念 * DataFrame是從Spark 1.3版本開始引入的。 * 通過DataFrame可以簡化Spark程式的開發,讓Spark處理結構化資料變得更簡單。DataFrame可以使用SQL的方式來處理資料。例如:業務分析人員可以基於編寫Spark SQL來進行資料開發,而不僅僅是Spark開發人員。 * DataFrame和RDD有一些共同點,也是不可變的分散式資料集。但與RDD不一樣的是,DataFrame是有schema的,有點類似於關係型資料庫中的**表**,每一行的資料都是一樣的,因為。有了schema,這也表明了DataFrame是比RDD提供更高層次的抽象。 * DataFrame支援各種資料格式的讀取和寫入,例如:CSV、JSON、AVRO、HDFS、Hive表。 * DataFrame使用Catalyst進行優化。 * DataFrame專注的是**What!**,而不是How! #### DataFrame的優點 * 因為DataFrame是有統一的schema的,所以序列化和反序列無需儲存schema。這樣節省了一定的空間。 * DataFrame儲存在off-heap(堆外記憶體)中,由作業系統直接管理(RDD是JVM管理),可以將資料直接序列化為二進位制存入off-heap中。操作資料也是直接操作off-heap。 #### DataFrane的短板 * DataFrame不是型別安全的 * API也不是面向物件的 ### Apache Spark 2.0 統一API 從Spark 2.0開始,DataFrame和DataSet的API合併在一起,實現了跨庫統一成為一套API。這樣,開發人員的學習成本就降低了。只需要學習一個High Level的、型別安全的DataSet API就可以了。——這對於Spark開發人員來說,是一件好事。 上圖我們可以看到,從Spark 2.0開始,Dataset提供了兩組不同特性的API: * 非型別安全 * 型別安全 其中非型別安全就是DataSet[Row],我們可以對Row中的欄位取別名。這不就是DataFrame嗎?而型別安全就是JVM物件的集合,型別就是scala的樣例類,或者是Java的實體類。 有Spark 2.0原始碼為證: ```scala package object sql { // ... type DataFrame = Dataset[Row] } ``` https://github.com/IloveZiHan/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/package.scala 也就是說,每當我們用導DataFrame其實就是在使用Dataset。 > 針對Python或者R,不提供型別安全的DataSet,只能基於DataFrame API開發。 #### 什麼時候使用DataFrame ### DataSet * DataSet是從Spark 1.6版本開始引入的。 * DataSet具有RDD和DataFrame的優點,既提供了更有效率的處理、以及型別安全的API。 * DataSet API都是基於Lambda函式、以及JVM物件來進行開發,所以在編譯期間就可以快速檢測到錯誤,節省開發時間和成本。 * DataSet使用起來很像,但它的執行效率、空間資源效率都要比RDD高很多。可以很方便地使用DataSet處理結構化、和非結構資料。 #### DataSet API的優點 ![image-20210201231136055](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210201231136.png) * DataSet結合了RDD和DataFrame的優點。 * 當序列化資料時,Encoder生成的位元組碼可以直接與堆互動,實現對資料按需訪問,而無需反序列化整個物件。 ##### 型別安全 寫過Java或者C#的同學都會知道,一旦在程式碼中型別使用不當,編譯都編譯不過去。日常開發中,我們更多地是使用泛型。因為一旦我們使用非型別安全的型別,軟體的維護週期一長,如果集合中放入了一些不合適的型別,就會出現嚴重的故障。這也是為什麼Java、C#還有C++都要去支援泛型的原因。 在Spark中也會有型別安全的問題。而且,一旦在執行時出現型別安全問題,會影響整個大規模計算作業。這種作業的錯誤排除難度,要比單機故障排查起來更復雜。如果在執行時期間就能發現問題,這很美好啊。 DataFrame中編寫SQL進行資料處理分析,在編譯時是不做檢查的,只有在Spark程式執行起來,才會檢測到問題。 | | SQL | DataFrame | Dataset | | -------- | ------ | --------- | ------- | | 語法錯誤 | 執行時 | 編譯時 | 編譯時 | | 解析錯誤 | 執行時 | 執行時 | 編譯時 | ##### 對結構化和半結構化資料的High Level抽象 例如:我們有一個較大的網站流量日誌JSON資料集,可以很容易的使用DataSet[WebLog]來處理,強型別操作可以讓處理起來更加簡單。 ##### 以RDD更易用的API DataSet引入了更豐富的、更容易使用的API操作。這些操作是基於High Level抽象的,而且基於實體類的操作,例如:進行groupBy、agg、select、sum、avg、filter等操作會容易很多。 ##### 效能優化 使用DataFrame和DataSet API在效能和空間使用率上都有大幅地提升。 1. DataFrame和DataSet API是基於Spark SQL引擎之上構建的,會使用Catalyst生成優化後的邏輯和物理執行計劃。尤其是無型別的DataSet[Row](DataFrame),它的速度更快,很適合互動式查詢。 2. 由於Spark能夠理解DataSet中的JVM物件型別,所以Spark會將將JVM物件對映為Tungsten的內部記憶體方式儲存。而Tungsten編碼器可以讓JVM物件更有效地進行序列化和反序列化,生成更緊湊、更有效率的位元組碼。 ![RDD儲存效率 VS DataSet儲存效率](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210131233908.png)通過上圖可以看到,DataSet的空間儲存效率是RDD的4倍。RDD要使用60GB的空間,而DataSet只需要使用不到15GB就可以了。 ## Youtube視訊分析案例 ### 資料集 去Kaggle下載youtube地址: ```xml https://www.kaggle.com/datasnaek/youtube-new?select=USvideos.csv ``` 每個欄位的含義都有說明。 ### Maven開發環境準備 ```xml ``` ### RDD開發 ```scala /** * Spark RDD處理示例 */ object RddAnalysis { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD Process").setMaster("local[*]") val sc = new SparkContext(conf) // 讀取本地檔案建立RDD val youtubeVideosRDD = { sc.textFile("""E:\05.git_project\dataset\youtube""") } // 統計不同分類Youtube視訊的喜歡人數、不喜歡人數 // 1. 新增行號 // 建立計數器 val rownumAcc = sc.longAccumulator("rownum") // 帶上行號 youtubeVideosRDD.map(line => { rownumAcc.add(1) rownumAcc.value -> line }) // 過濾掉第一行 .filter(_._1 != 1) // 去除行號 .map(_._2) // 過濾掉非法的資料 .filter(line => { val fields = line.split("\001") val try1 = scala.util.Try(fields(8).toLong) val try2 = scala.util.Try(fields(9).toLong) if(try1.isFailure || try2.isFailure) false else true }) // 讀取三個欄位(視訊分類、喜歡的人數、不喜歡的人數 .map(line => { // 按照\001解析CSV val fields = line.split("\001") // 取第4個(分類)、第8個(喜歡人數)、第9個(不喜歡人數) // (分類id, 喜歡人數, 不喜歡人數) (fields(4), fields(8).toLong, fields(9).toLong) }) // 按照分類id分組 .groupBy(_._1) .map(t => { val result = t._2.reduce((r1, r2) => { (r1._1, r1._2 + r2._2, r1._3 + r2._3) }) result }) .foreach(println) } } ``` 執行結果如下: ```shell ("BBC Three",8980120,149525) ("Ryan Canty",11715543,80544) ("Al Jazeera English",34427,411) ("FBE",9003314,191819) ("Sugar Pine 7",1399232,81062) ("Rob Scallon",11652652,704748) ("CamilaCabelloVEVO",19077166,1271494) ("Grist",3133,37) ``` 程式碼中做了一些資料的過濾,然後進行了分組排序。如果Spark都要這麼來寫的話,業務人員幾乎是沒法寫了。著程式碼完全解釋了How,而不是What。每一個處理的細節,都要我們自己親力親為。實現起來臃腫。 #### 檢視下基於RDD的DAG 開啟瀏覽器,輸入:localhost:4040,來看下DAG。 ![image-20210203225714246](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203225714.png) DAG非常的直觀,按照shuffle分成了兩個Stage來執行。Stage中依次執行了每個Operator。程式沒有經過任何優化。我把每一個操作都和DAG上的節點對應了起來。 ### DataFrame開發 ```scala object DataFrameAnalysis { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Youtube Analysis") .master("local[*]") .config("spark.sql.shuffle.partitions",1) .getOrCreate() import spark.sqlContext.implicits._ // 讀取CSV val youtubeVideoDF = spark.read.option("header", true).csv("""E:\05.git_project\dataset\USvideos.csv""") import org.apache.spark.sql.functions._ // 按照category_id分組聚合 youtubeVideoDF.select($"category_id", $"likes".cast(LongType), $"dislikes".cast(LongType)) .where($"likes".isNotNull) .where( $"dislikes".isNotNull) .groupBy($"category_id") .agg(sum("likes"), sum("dislikes")) .show() } } ``` 大家可以看到,現在實現方式非常的簡單,而且清晰。 #### 檢視下基於DataFrame的執行計劃與DAG ![image-20210203230500292](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203230500.png) 但我們執行上面的Spark程式時,其實運行了兩個JOB。 ![image-20210203230807773](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203230807.png) 下面這個是第一個Job的DAG。我們看到只有一個Stage。這個DAG我們看得不是特別清楚做了什麼,因為Spark SQL是做過優化的,我們需要檢視Query的詳細資訊,才能看到具體執行的工作。 ![image-20210203233802280](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203233802.png) 第一個Job的詳細執行資訊如下: ![image-20210203231757894](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203231757.png) 哦,原來這個JOB掃描了所有的行,然後執行了一個Filter過濾操作。再檢視下查詢計劃: ```shell == Parsed Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- Filter (length(trim(value#6, None)) > 0) +- Project [value#0 AS value#6] +- Project [value#0] +- Relation[value#0] text == Analyzed Logical Plan == value: string GlobalLimit 1 +- LocalLimit 1 +- Filter (length(trim(value#6, None)) > 0) +- Project [value#0 AS value#6] +- Project [value#0] +- Relation[value#0] text == Optimized Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- Filter (length(trim(value#0, None)) > 0) +- Relation[value#0] text == Physical Plan == CollectLimit 1 +- *(1) Filter (length(trim(value#0, None)) > 0) +- FileScan text [value#0] Batched: false, DataFilters: [(length(trim(value#0, None)) > 0)], Format: Text, Location: InMemoryFileIndex[file:/E:/05.git_project/dataset/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema