1. 程式人生 > >Spark計算模型之熟練使用RDD的運算元完成計算 ----tranfoemation

Spark計算模型之熟練使用RDD的運算元完成計算 ----tranfoemation

Spark計算模型之熟練使用RDD的運算元完成計算  ----tranfoemation

1.彈性分散式資料集RDD   

1.1什麼是RDD   RDD(Rsedilient Distributed Dataaset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯示德將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大的提升了查詢速度。 

1.2RDD的屬性   

 1)、一組分片(Partition),即資料集的基本組成單位,對於RDD來說,每個分片都會一個計算任務處理,並決定平行計算的顆粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配的CPU  Core的數目。

2)、一個計算每個分割槽的函式。Spark中的RDD的計算是以分片為單位,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。

3)、RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似流水線一樣的前後依賴關係,再部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。

4)、一個Partitioner,即RDD的分片函式,當前Spark中實現了兩種的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangPartitioner,只有對於Key-value的RDD,才會有Partitioner,非Key-value的RDD的Partitioner的值是None.Partitioner函式不但決定了RDD本身的分片數量,也決定了Partent RDD  shuffle輸出時的分片數量。

5)、一個列表,儲存存取每個Partitioner的優先位置(Preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要資料塊的儲存位置。

3.1建立RDD 

1、啟動shell, 定義master 、executor-memory 執行記憶體 

 total-executior-cores 全部行的核數

./spark-shell --master spark://root1:7077 --executor-memory 512m --total-executor-cores 1

2)、由一個已經存在的Scala集合建立

val rdd1=sc.parallelize(Array(1,2,3,4,5,6))

3)、由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS,Cassandra、HBase等

scala> val rdd2=sc.textFile("hdfs://root1:9000/words.txt")

----------------------------------------------------------------------------------------------------------------------------------------------------------------

scala> val rdd2=sc.textFile("hdfs://root1:9000/words.txt").flatMap(_.split(" ")).map((_,1)).collect

3.2  RDD程式設計API

3.2.1  Tranformation  

RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。

常用的Transformation:

    

     

3.2.2 Action

   

 

4.練習

4.1 parallelize   類似於Map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iteratior[T] =>Iteratior[U]

scala> val rdd1=sc.parallelize(Array(1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27

4.2 map(func)  

每個元素都乘以10


scala> val rdd2=rdd1.map(_*10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:29

scala> val rdd2=rdd1.map(_*10).collect
rdd2: Array[Int] = Array(10, 20, 30, 40, 50, 60)

選出 >30 的元素

scala> val rdd3=rdd2.filter(_>30)
rdd3: Array[Int] = Array(40, 50, 60)

4.3  設定分割槽的長度

scala> val rdd4=sc.parallelize(Array(1,2,3,4,5,6),5)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27

讀取分割槽的長度

scala> rdd4.partitions.length
res1: Int = 5

4.4 陣列的每個元素都+2,並正排序  

scala> val rdd5=sc.parallelize(Array(1,0,5,9,4,3),2)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:27

scala> val rdd6=rdd5.map(_+2).sortBy(x=>x,true)
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at sortBy at <console>:29

scala> val rdd6=rdd5.map(_+2).sortBy(x=>x,true).collect
rdd6: Array[Int] = Array(2, 3, 5, 6, 7, 11)

4.5 篩選陣列rdd5中大於10的元素

scala> val rdd7=rdd6.filter(_>10)
rdd7: Array[Int] = Array(11)

4.6  進行聯絡,建立一個數組,每個元素都*2,按照正排序進行排序

scala> val rdd8=sc.parallelize(List(15,12,1,3,6)).map(_*2).sortBy(x=>x,true).collect
rdd8: Array[Int] = Array(2, 6, 12, 24, 30)

4.7 建立String的陣列,進行統計相同單詞的個數

scala> val rrd9=sc.parallelize(Array("a b c","c d a"))
rrd9: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at <console>:27

scala> rrd9.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(t=>(t._1,t._2.size)).collect
res12: Array[(String, Int)] = Array((d,1), (b,1), (a,2), (c,2))

4.8 

scala> val rdd10=sc.parallelize(List(1,3,4,5))
rdd10: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[64] at parallelize at <console>:27

scala> val rdd11=sc.parallelize(List(2,3,4,6))
rdd11: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:27

scala> val rdd12=rdd10.union(rdd11).collect
rdd12: Array[Int] = Array(1, 3, 4, 5, 2, 3, 4, 6)

4.9   distinct  去除陣列中重複的數字

scala> rdd12.distinct.sortBy(x=>x).collect
res17: Array[Int] = Array(1, 2, 3, 4, 5, 6)

4.10 交集    intersection

scala> val rdd13=rdd11.intersection(rdd10).collect
rdd13: Array[Int] = Array(4, 3)

4.11   string陣列求交集

scala> val rdd14=sc.parallelize(List(("tom",1),("kack",2),("rose",1)))
rdd14: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[99] at parallelize at <console>:27

scala> val rdd15=sc.parallelize(List(("tom",2),("jeery",2),("lishua",1)))
rdd15: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[100] at parallelize at <console>:27

scala> rdd14.intersection(rdd15)
res18: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[106] at intersection at <console>:32

scala> rdd14.intersection(rdd15).collect
res19: Array[(String, Int)] = Array()

4.12  求兩個string陣列中相同的Key

scala> rdd14.join(rdd15).collect
res20: Array[(String, (Int, Int))] = Array((tom,(1,2)))

4.13  利用leftOutJoin 求兩個string陣列中相同的Key

scala> val rdd16=rdd14.leftOuterJoin(rdd15).collect
rdd16: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (kack,(2,None)), (rose,(1,None)))