1. 程式人生 > >Spark 中RDD和DataSet之間的轉換

Spark 中RDD和DataSet之間的轉換

什麼是RDD:Spark提供了一個抽象的彈性分散式資料集,是一個由叢集中各個節點以分割槽的方式排列的集合,用以支援平行計算。RDD在驅動程式呼叫hadoop的檔案系統的時候就建立(其實就是讀取檔案的時候就建立),或者通過驅動程式中scala集合轉化而來,使用者也可以用spark將RDD放入快取中,來為叢集中某臺機器宕掉後,確保這些RDD資料可以有效的被複用。 總之,RDD能自動從宕機的節點中恢復過來。

摘抄自官網的說明:

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

RDD的的操作型別(以下為個人從官網翻譯過來)

對於RDD的操作支援兩種型別的,一種是transformation,一種是action. 對於transformation,是將一個數據集從一個結構轉換成另外一個結構。 對於Action來說,是需要在資料集的計算任務之後,返回給驅動程式一個結果。 比如map函式就是一個transformation操作,它對資料集合中的每個元素都執行一個方法,而且返回一個新的RDD結果集。

而reduce就是一個action,它通過一些函式將RDD中的所有元素重新組合,將最終結果傳送給驅動程式。 儘管,這裡也有一個並行處理。reduceByKey返回的是一個分散式資料集合

總結起來:

transformation 是將資料集從一個結構轉換成另外一個結構。這個過程中的RDD內容是不發生變化的。

action是出發對RDD的計算,並對計算結果執行某種操作,要麼返回給使用者,要麼儲存到外部儲存器中。

RDD的一些特性:

對於每個被transformed的RDD結果,一旦你在其上執行一個action,就會再次計算。 你也可以把這個RDD存放到記憶體中,通過使用persist方法,那樣spark就會將這個資料儲存在叢集中 以用於下次你查詢的時候,快速給出結果, 它也支援持久化RDD到磁碟中,或者在多個節點中進行備份。

在執行計算任務之前呼叫persist方法,可以將RDD放入指定的地方,也就是設定RDD的快取級別。

RDD的快取級別
Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER  (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER  (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

前提,需要執行spark-shell命令執行以下操作

>>>通過外部資料來源建立兩個RDD

scala> val rdd1= sc.textFile("file:///opt/datas/stu.txt")
scala> val rdd2 = sc.textFile("file:///opt/datas/stu1.txt")

>>>將外部資料來源轉換成一個數據源,這是一個轉換(transformation)操作

scala> val rdd =rdd1.union(rdd2)

>>>將外部RDD轉化為DF

filter(x => (x._2 > 1))是篩選出值大於1的資料,在scala中x._2表示value,x._1表示key,

sortByKey是排序,引數為true,則是升序,false則是降序.

因為在這個RDD中,key是String型別的,而Value是Int型別的。所以當我們需要按照value排序的時候,就需要重新將Key value交換再呼叫sortBykey的方法。

scala> val lines = rdd.flatMap(x =>x.split(" ")).map(x =>(x,1)).reduceByKey((a,b) =>

(a+b)).filter(x => (x._2 > 1)).map(x =>(x._2,x._1)).sortByKey(false).map(x=>

(x._2,x._1)).toDF


lines: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

>>>轉化成DF的時候,如果不指定,就預設schema為x._1,x._2這兩列。當我們需要指定的時候,方法如下:

scala> val lines = rdd.flatMap(x =>x.split(" ")).map(x =>(x,1)).reduceByKey((a,b) =>(a+b)).filter(x => (x._2 > 1)).map(x =>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).toDF("key","value")
lines: org.apache.spark.sql.DataFrame = [key: string, value: int] 

scala> lines.printSchema
root
 |-- key: string (nullable = true)
 |-- value: integer (nullable = false)

>>>通過DF對資料集進行操作:

scala> lines.select("key").show
+------+
|   key|
+------+
| spark|
|  hive|
|  java|
|  lele|
|spring|
| hbase|
+------+

>>>我們還可以把轉化而來的dataFrame註冊成一張臨時表,呼叫spark.sql的語句來分析、查詢資料

scala> lines.createOrReplaceTempView("spark") 
scala> spark.sql("select * from spark")
res4: org.apache.spark.sql.DataFrame = [key: string, value: int]
其實結果還是一個dataFrame,只不過這個spark.sql函式接收的物件是一段sql語句。
scala> spark.sql("select * from spark").show
+------+-----+
|   key|value|
+------+-----+
| spark|    8|
|  hive|    7|
|  java|    5|
|  lele|    2|
|spring|    2|
| hbase|    2|
+------+-----+
scala> spark.sql("select count(1) from spark").show
+--------+                                                                      
|count(1)|
+--------+
|       6|
+--------+
scala> spark.sql("select key from spark").show
+------+
|   key|
+------+
| spark|
|  hive|
|  java|
|  lele|
|spring|
| hbase|
+------+