Spark2.x學習筆記:3、 Spark核心概念RDD
Spark學習筆記:3、Spark核心概念RDD
3.1 RDD概念
彈性分散式資料集(Resilient Distributed Datasets,RDD) ,可以分三個層次來理解:
- 資料集:故名思議,RDD 是資料集合的抽象,是複雜物理介質上存在資料的一種邏輯檢視。從外部來看,RDD 的確可以被看待成經過封裝,帶擴充套件特性(如容錯性)的資料集合。
- 分散式:RDD的資料可能在物理上儲存在多個節點的磁碟或記憶體中,也就是所謂的多級儲存。
- 彈性:雖然 RDD 內部儲存的資料是隻讀的,但是,我們可以去修改(例如通過 repartition 轉換操作)平行計算計算單元的劃分結構,也就是分割槽的數量。
Spark資料儲存的核心是彈性分散式資料集(RDD),我們可以把RDD簡單地理解為一個抽象的大陣列,但是這個陣列是分散式的,邏輯上RDD的每個分割槽叫做一個Partition。 在物理上,RDD物件實質上是一個元資料結構,儲存著Block、Node等對映關係,以及其他元資料資訊。一個RDD就是一組分割槽(Partition),RDD的每個分割槽Partition對應一個Block,Block可以儲存在記憶體,當記憶體不夠時可以儲存到磁碟上。
如下圖所示,存在2個RDD:RDD1包含3個分割槽,分別儲存在Node1、Node2和Node3的記憶體中;RDD2也包含3個分割槽,p1和p2分割槽儲存在Node1和Node2的記憶體中,p3分割槽存在在Node3的磁碟中。
RDD的資料來源也可以儲存在HDFS上,資料按照HDFS分佈策略進行分割槽,HDFS中的一個Block對應Spark RDD的一個Partition。
3.2 RDD基本操作
(1)RDD包括兩大類基本操作Transformation和Acion
- Transformation
- 可以通過Scala集合或者Hadoop資料集鉤子一個新的RDD
- 將已有RDD轉換為新的RDD
- 常用運算元(操作,方法)有map、filter、groupBy、reduceBy
- Aciton
- 通過RDD計算得到一個或者多個值
- 常用運算元有count、reduce、saveAsTextFile
主要的Transformation和Acion如下表所示:
(2)作用在RDD上的操作(運算元)
(3)惰性執行(Lazy Execution)
- Transformation只記錄RDD的轉換關係,並沒有真正執行轉換
- Action是觸發程式執行的運算元
3.3 RDD操作示例
3.3.1 簡單例子
(1)程式碼
-
[[email protected] ~]# spark-shell
-
17/09/06 03:36:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-
17/09/06 03:36:39 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
-
Spark context Web UI available at http://192.168.1.180:4040
-
Spark context available as 'sc' (master = local[*], app id = local-1504683394043).
-
Spark session available as 'spark'.
-
Welcome to
-
____ __
-
/ __/__ ___ _____/ /__
-
_\ \/ _ \/ _ `/ __/ '_/
-
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
-
/_/
-
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
-
Type in expressions to have them evaluated.
-
Type :help for more information.
-
scala> val rdd1=sc.parallelize(1 to 100,5)
-
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
-
scala> val rdd2=rdd1.map(_+1)
-
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
-
scala> rdd2.take(2)
-
res0: Array[Int] = Array(2, 3)
-
scala> rdd2.count
-
res1: Long = 100
-
scala>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
(2)程式說明
- spark-shell的日誌資訊
Spark context available as 'sc'
,表示spark-shell中已經預設將SparkContext類初始化為物件sc,在spark-shell中可以直接使用SparkContext的物件sc。 - SparkContext 的 parallelize(),將一個存在的集合,變成一個RDD,這種方式試用於學習spark和做一些spark的測試
- sc.parallelize(1 to 100,5)表示將1 to 100產生的集合(Range)轉換成一個RDD,並建立5個partition。
- 當我們忘記了parallelize單詞時,我們可以在spark-shell中輸入sc.pa,然後按tab鍵,會自動補齊。這是一個非常實用的功能!
- rdd1.map(_+1)表示每個元素+1,併產生一個新的RDD。這是一個Transformation操作。
- take(2)表示取RDD前2個元素,這是個Action操作。當這個Action操作執行時,上面的map(_+1)操作才真正執行。
- count表示RDD元素總數,也是一個Action操作。
- 在Spark WebUI中可以看到兩個Action操作,如下圖。
3.3.2 常用運算元
(1)程式碼
-
scala> val listRdd=sc.parallelize(List(1,2,3),3)
-
listRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
-
scala> val squares=listRdd.map(x=>x*x)
-
squares: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26
-
scala> squares.take(3)
-
res3: Array[Int] = Array(1, 4, 9)
-
scala> val even=squares.filter(_%2==0)
-
even: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:28
-
scala> squares.first
-
res4: Int = 1
-
scala> even.first
-
res5: Int = 4
-
scala> val nums=sc.parallelize(1 to 3)
-
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
-
scala> val mapRdd=nums.flatMap(x=>1 to x)
-
mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at flatMap at <console>:26
-
scala> mapRdd.count
-
res6: Long = 6
-
scala> mapRdd.take(6)
-
res7: Array[Int] = Array(1, 1, 2, 1, 2, 3)
-
scala>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
(2)程式說明
- map(x=>x*x)每個元素平方,生成新的RDD
- filter(_%2==0)對RDD中每個元素進行過濾(偶數留下),生成新的RDD
- nums.flatMap(x=>1 to x),將一個元素對映成多個元素,生成新的RDD
3.3.3 Key/Value型RDD
(1)程式碼
-
scala> val pets=sc.parallelize(List( ("cat",1),("dog",1),("cat",2) ))
-
pets: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24
-
scala> val pets2=pets.reduceByKey(_+_)
-
pets2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:26
-
scala> pets2.count
-
res8: Long = 2
-
scala> pets2.take(2)
-
res10: Array[(String, Int)] = Array((dog,1), (cat,3))
-
scala> val pets3=pets.groupByKey()
-
pets3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[10] at groupByKey at <console>:26
-
scala> pets3.count
-
res11: Long = 2
-
scala> pets3.take(2)
-
res12: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1)), (cat,CompactBuffer(1, 2)))
-
scala> val pets4=pets.sortByKey()
-
pets4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at sortByKey at <console>:26
-
scala> pets4.take(3)
-
res14: Array[(String, Int)] = Array((cat,1), (cat,2), (dog,1))
-
scala>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
(2)程式說明
- reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。
reduceByKey(_+_)
對每個key對應的多個value進行merge操作,自動在map端進行本地combine groupByKey()
對每個key進行歸併,但只生成一個sequence。sortByKey()
按照key進行排序
3.3.4 WordCount
WordCount是大資料處理的HelloWorld,下面看看Spark是如何實現。 (1)準備資料
-
[[email protected] ~]# mkdir data
-
[[email protected] ~]# vi data/words
-
[[email protected] ~]# cat data/words
-
hi hello
-
how do you do?
-
hello, Spark!
-
hello, Scala!
-
[[email protected] ~]#
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
(2)轉換處理
-
scala> val rdd=sc.textFile("file:///root/data/words")
-
rdd: org.apache.spark.rdd.RDD[String] = file:///root/data/words MapPartitionsRDD[3] at textFile at <console>:24
-
scala> val mapRdd=rdd.flatMap(_.split(" "))
-
mapRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26
-
scala> mapRdd.first
-
res2: String = hi
-
scala> val kvRdd=mapRdd.map(x=>(x,1))
-
kvRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:28
-
scala> kvRdd.first
-
res3: (String, Int) = (hi,1)
-
scala> kvRdd.take(2)
-
res4: Array[(String, Int)] = Array((hi,1), (hello,1))
-
scala> val rsRdd=kvRdd.reduceByKey(_+_)
-
rsRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:30
-
scala> rsRdd.take(2)
-
res5: Array[(String, Int)] = Array((how,1), (do?,1))
-
scala> rsRdd.saveAsTextFile("file:///tmp/output")
-
scala>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
程式說明:
- sc.textFile()方法表示將某個檔案轉換為RDD(實際上是利用了TextInputFormat生成了一個HadoopRDD),所以sc.textFile(“file:///root/data/words”)表示將本地檔案/root/data/words轉換為一個RDD。
- core-site.xml配置檔案中fs.defaultFS預設值是file://,表示本地檔案。file:///root/data/words實際上是file://和/root/data/words的組合,此處未使用HDFS,所以指定本地檔案。
rdd.flatMap(_.split(" "))
表示將RDD每個元素(檔案的每行)按照空格分割,並生成新的RDDmapRdd.map(x=>(x,1))
表示將RDD每個元素x生成(x,1)Key-Value對,並生成新的RDDkvRdd.reduceByKey(_+_)
對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義(value值相加)。- rsRdd.saveAsTextFile(“file:///tmp/output”)表示將rsRdd資料儲存到本地/tmp/output目錄下。
(3)檢視結果
-
[[email protected] ~]# ll /tmp/output
-
total 8
-
-rw-r--r-- 1 root root 48 Sep 6 03:51 part-00000
-
-rw-r--r-- 1 root root 33 Sep 6 03:51 part-00001
-
-rw-r--r-- 1 root root 0 Sep 6 03:51 _SUCCESS
-
[[email protected] ~]# cat /tmp/output/part-00000
-
(how,1)
-
(do?,1)
-
(hello,,2)
-
(hello,1)
-
(Spark!,1)
-
[[email protected] ~]# cat /tmp/output/part-00001
-
(you,1)
-
(Scala!,1)
-
(hi,1)
-
(do,1)
-
[[email protected] ~]#
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
3.4 Spark程式設計基本流程
Spark程式設計基本流程 1)建立SparkContext物件 每個Spark應用程式有且僅有一個SparkContext物件,封裝了Spark執行環境資訊 2)建立RDD 可以從Scala集合或Hadoop資料集上建立 3)在RDD之上進行轉換和action MapReduce只提供了map和reduce兩種操作,而Spark提供了多種轉換和action函式 4)返回結果 儲存到HDFS中,或直接打印出來。