1. 程式人生 > >Spark2.x學習筆記:3、 Spark核心概念RDD

Spark2.x學習筆記:3、 Spark核心概念RDD

Spark學習筆記:3、Spark核心概念RDD

3.1 RDD概念

彈性分散式資料集(Resilient Distributed Datasets,RDD) ,可以分三個層次來理解:

  • 資料集:故名思議,RDD 是資料集合的抽象,是複雜物理介質上存在資料的一種邏輯檢視。從外部來看,RDD 的確可以被看待成經過封裝,帶擴充套件特性(如容錯性)的資料集合。
  • 分散式:RDD的資料可能在物理上儲存在多個節點的磁碟或記憶體中,也就是所謂的多級儲存。
  • 彈性:雖然 RDD 內部儲存的資料是隻讀的,但是,我們可以去修改(例如通過 repartition 轉換操作)平行計算計算單元的劃分結構,也就是分割槽的數量。

Spark資料儲存的核心是彈性分散式資料集(RDD),我們可以把RDD簡單地理解為一個抽象的大陣列,但是這個陣列是分散式的,邏輯上RDD的每個分割槽叫做一個Partition。  在物理上,RDD物件實質上是一個元資料結構,儲存著Block、Node等對映關係,以及其他元資料資訊。一個RDD就是一組分割槽(Partition),RDD的每個分割槽Partition對應一個Block,Block可以儲存在記憶體,當記憶體不夠時可以儲存到磁碟上。

如下圖所示,存在2個RDD:RDD1包含3個分割槽,分別儲存在Node1、Node2和Node3的記憶體中;RDD2也包含3個分割槽,p1和p2分割槽儲存在Node1和Node2的記憶體中,p3分割槽存在在Node3的磁碟中。 這裡寫圖片描述

RDD的資料來源也可以儲存在HDFS上,資料按照HDFS分佈策略進行分割槽,HDFS中的一個Block對應Spark RDD的一個Partition。

3.2 RDD基本操作

(1)RDD包括兩大類基本操作Transformation和Acion

  • Transformation 
    • 可以通過Scala集合或者Hadoop資料集鉤子一個新的RDD
    • 將已有RDD轉換為新的RDD
    • 常用運算元(操作,方法)有map、filter、groupBy、reduceBy
  • Aciton 
    • 通過RDD計算得到一個或者多個值
    • 常用運算元有count、reduce、saveAsTextFile

主要的Transformation和Acion如下表所示: 這裡寫圖片描述

(2)作用在RDD上的操作(運算元) 作用在RDD上的操作

(3)惰性執行(Lazy Execution)

  • Transformation只記錄RDD的轉換關係,並沒有真正執行轉換
  • Action是觸發程式執行的運算元

3.3 RDD操作示例

3.3.1 簡單例子

(1)程式碼

  1. [[email protected] ~]# spark-shell

  2. 17/09/06 03:36:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

  3. 17/09/06 03:36:39 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

  4. Spark context Web UI available at http://192.168.1.180:4040

  5. Spark context available as 'sc' (master = local[*], app id = local-1504683394043).

  6. Spark session available as 'spark'.

  7. Welcome to

  8. ____ __

  9. / __/__ ___ _____/ /__

  10. _\ \/ _ \/ _ `/ __/ '_/

  11. /___/ .__/\_,_/_/ /_/\_\ version 2.2.0

  12. /_/

  13. Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)

  14. Type in expressions to have them evaluated.

  15. Type :help for more information.

  16. scala> val rdd1=sc.parallelize(1 to 100,5)

  17. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

  18. scala> val rdd2=rdd1.map(_+1)

  19. rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

  20. scala> rdd2.take(2)

  21. res0: Array[Int] = Array(2, 3)

  22. scala> rdd2.count

  23. res1: Long = 100

  24. scala>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

(2)程式說明

  • spark-shell的日誌資訊Spark context available as 'sc',表示spark-shell中已經預設將SparkContext類初始化為物件sc,在spark-shell中可以直接使用SparkContext的物件sc。
  • SparkContext 的 parallelize(),將一個存在的集合,變成一個RDD,這種方式試用於學習spark和做一些spark的測試
  • sc.parallelize(1 to 100,5)表示將1 to 100產生的集合(Range)轉換成一個RDD,並建立5個partition。
  • 當我們忘記了parallelize單詞時,我們可以在spark-shell中輸入sc.pa,然後按tab鍵,會自動補齊。這是一個非常實用的功能!
  • rdd1.map(_+1)表示每個元素+1,併產生一個新的RDD。這是一個Transformation操作。
  • take(2)表示取RDD前2個元素,這是個Action操作。當這個Action操作執行時,上面的map(_+1)操作才真正執行。
  • count表示RDD元素總數,也是一個Action操作。
  • 在Spark WebUI中可以看到兩個Action操作,如下圖。

這裡寫圖片描述

3.3.2 常用運算元

(1)程式碼

  1. scala> val listRdd=sc.parallelize(List(1,2,3),3)

  2. listRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

  3. scala> val squares=listRdd.map(x=>x*x)

  4. squares: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

  5. scala> squares.take(3)

  6. res3: Array[Int] = Array(1, 4, 9)

  7. scala> val even=squares.filter(_%2==0)

  8. even: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:28

  9. scala> squares.first

  10. res4: Int = 1

  11. scala> even.first

  12. res5: Int = 4

  13. scala> val nums=sc.parallelize(1 to 3)

  14. nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

  15. scala> val mapRdd=nums.flatMap(x=>1 to x)

  16. mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at flatMap at <console>:26

  17. scala> mapRdd.count

  18. res6: Long = 6

  19. scala> mapRdd.take(6)

  20. res7: Array[Int] = Array(1, 1, 2, 1, 2, 3)

  21. scala>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

(2)程式說明

  • map(x=>x*x)每個元素平方,生成新的RDD
  • filter(_%2==0)對RDD中每個元素進行過濾(偶數留下),生成新的RDD
  • nums.flatMap(x=>1 to x),將一個元素對映成多個元素,生成新的RDD

3.3.3 Key/Value型RDD

(1)程式碼

  1. scala> val pets=sc.parallelize(List( ("cat",1),("dog",1),("cat",2) ))

  2. pets: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24

  3. scala> val pets2=pets.reduceByKey(_+_)

  4. pets2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:26

  5. scala> pets2.count

  6. res8: Long = 2

  7. scala> pets2.take(2)

  8. res10: Array[(String, Int)] = Array((dog,1), (cat,3))

  9. scala> val pets3=pets.groupByKey()

  10. pets3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[10] at groupByKey at <console>:26

  11. scala> pets3.count

  12. res11: Long = 2

  13. scala> pets3.take(2)

  14. res12: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1)), (cat,CompactBuffer(1, 2)))

  15. scala> val pets4=pets.sortByKey()

  16. pets4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at sortByKey at <console>:26

  17. scala> pets4.take(3)

  18. res14: Array[(String, Int)] = Array((cat,1), (cat,2), (dog,1))

  19. scala>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

(2)程式說明

  • reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。reduceByKey(_+_)對每個key對應的多個value進行merge操作,自動在map端進行本地combine
  • groupByKey()對每個key進行歸併,但只生成一個sequence。
  • sortByKey()按照key進行排序

3.3.4 WordCount

WordCount是大資料處理的HelloWorld,下面看看Spark是如何實現。  (1)準備資料

  1. [[email protected] ~]# mkdir data

  2. [[email protected] ~]# vi data/words

  3. [[email protected] ~]# cat data/words

  4. hi hello

  5. how do you do?

  6. hello, Spark!

  7. hello, Scala!

  8. [[email protected] ~]#

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

(2)轉換處理

  1. scala> val rdd=sc.textFile("file:///root/data/words")

  2. rdd: org.apache.spark.rdd.RDD[String] = file:///root/data/words MapPartitionsRDD[3] at textFile at <console>:24

  3. scala> val mapRdd=rdd.flatMap(_.split(" "))

  4. mapRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26

  5. scala> mapRdd.first

  6. res2: String = hi

  7. scala> val kvRdd=mapRdd.map(x=>(x,1))

  8. kvRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:28

  9. scala> kvRdd.first

  10. res3: (String, Int) = (hi,1)

  11. scala> kvRdd.take(2)

  12. res4: Array[(String, Int)] = Array((hi,1), (hello,1))

  13. scala> val rsRdd=kvRdd.reduceByKey(_+_)

  14. rsRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:30

  15. scala> rsRdd.take(2)

  16. res5: Array[(String, Int)] = Array((how,1), (do?,1))

  17. scala> rsRdd.saveAsTextFile("file:///tmp/output")

  18. scala>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

程式說明:

  • sc.textFile()方法表示將某個檔案轉換為RDD(實際上是利用了TextInputFormat生成了一個HadoopRDD),所以sc.textFile(“file:///root/data/words”)表示將本地檔案/root/data/words轉換為一個RDD。
  • core-site.xml配置檔案中fs.defaultFS預設值是file://,表示本地檔案。file:///root/data/words實際上是file://和/root/data/words的組合,此處未使用HDFS,所以指定本地檔案。
  • rdd.flatMap(_.split(" "))表示將RDD每個元素(檔案的每行)按照空格分割,並生成新的RDD
  • mapRdd.map(x=>(x,1))表示將RDD每個元素x生成(x,1)Key-Value對,並生成新的RDD
  • kvRdd.reduceByKey(_+_)對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義(value值相加)。
  • rsRdd.saveAsTextFile(“file:///tmp/output”)表示將rsRdd資料儲存到本地/tmp/output目錄下。

(3)檢視結果

  1. [[email protected] ~]# ll /tmp/output

  2. total 8

  3. -rw-r--r-- 1 root root 48 Sep 6 03:51 part-00000

  4. -rw-r--r-- 1 root root 33 Sep 6 03:51 part-00001

  5. -rw-r--r-- 1 root root 0 Sep 6 03:51 _SUCCESS

  6. [[email protected] ~]# cat /tmp/output/part-00000

  7. (how,1)

  8. (do?,1)

  9. (hello,,2)

  10. (hello,1)

  11. (Spark!,1)

  12. [[email protected] ~]# cat /tmp/output/part-00001

  13. (you,1)

  14. (Scala!,1)

  15. (hi,1)

  16. (do,1)

  17. [[email protected] ~]#

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.4 Spark程式設計基本流程

Spark程式設計基本流程  1)建立SparkContext物件  每個Spark應用程式有且僅有一個SparkContext物件,封裝了Spark執行環境資訊  2)建立RDD  可以從Scala集合或Hadoop資料集上建立  3)在RDD之上進行轉換和action  MapReduce只提供了map和reduce兩種操作,而Spark提供了多種轉換和action函式  4)返回結果  儲存到HDFS中,或直接打印出來。