1. 程式人生 > >[Spark]學習筆記二——RDDs

[Spark]學習筆記二——RDDs

一、一些物件

1.Driver Program:包含程式的main()方法,RDDs的定義和操作,它管理很多節點,我們稱之為executors
2.SparkContext:Driver Program通過SparkContext物件訪問Spark,SparkContext物件代表和一個叢集的連線
3.在shell中SparkContext物件自動建立好了,就是sc,可以在shell中直接使用sc

二、RDDs(Resilient distributed dataset——彈性分散式資料集)

1.RDDs的介紹:
1)並行的分佈在叢集中
2)RDDs是Spark分發資料和計算的基礎抽象類
3)一個RDD是不可改變

的分散式集合物件
4)Spark中,所有的計算都是通過RDDs的建立、轉換等操作完成的
5)一個RDD內部由許多==partitions(分片)==組成

分片:
每個分片包括一部分資料,partitions可在叢集不同節點上計算
分片是Spark並行處理的單元,Spark會順序的、並行的處理分片

2.RDDs的建立方法:
1)把一個已存在的集合傳給SparkContext的parallelize()方法,可用來測試

val rdd=sc.parallelize(Array(1,2,2,4),4)	----第一個引數:待並行化處理的集合

2)載入外部資料集:可以載入本地檔案,也可以載入hadoop檔案

三、RDD基本操作之Transformation

1.Transformation:從之前的RDD構建一個新的RDD,像map()和filter()……
2.逐元素Transformation:
1)map()——接收函式,把函式應用到RDD的每一個元素,返回新的RDD
2)filter()——接受一個函式,返回只包含,滿足filter()函式的新RDD
3)flatMap()——對每個輸入元素,輸出多個輸出元素,flatMap()將RDD中的元素壓扁後返回一個新的RDD
3.集合運算:RDDs支援數學集合的運算,例如:並集、交集等
1)distinct()——去重方法

val c1=sc.parallelize(Array("coffe","coffe","tea","tea","coka"))

c1.foreach(println)
tea
coffe
tea
coka
coffe

c1.distinct.foreach(println)
coka
tea
coffe

2)union()——並集

val c2=sc.parallelize(Array("coffe","tea"))
c1.distinct.union(c2).foreach(println)
coffe
tea
coffe
coka
tea

3)intersection()——交集

c1.distinct.intersection(c2).foreach(println)
coffe
tea

4)subtract()

c1.distinct.subtract(c2).foreach(println)
coka

四、RDD基本操作之Action

1.Action:在RDD上計算出來一個結果,把結果返回給driver program或儲存在檔案系統,像count(),save……
2.collect():遍歷整個RDD,向driver program返回RDD的內容,可以用此檢視小資料;大資料的時候,使用savaAsTextFile()這一action
3.reduce():接收一個函式,作用在RDD兩個型別相同的元素上,返回新元素;可以實現:RDD中元素的累加,計數,和其他型別的聚集操作
4.take(n):返回RDD的n個元素,返回結果是無序的,一般是測試使用
5.top():排序
6.foreach():計算RDD中的每個元素,但不返回到本地,可以配合println友好的打印出資料

五、RDDs的特性

1.RDDs的血統關係圖:
Spark維護著RDDs之間的依賴關係和建立關係,叫做血統關係圖
Spark使用血統關係圖來計算每個RDD的需求和恢復丟失的資料

2.延遲計算
Spark對RDDs的計算是,他們第一次使用action操作的時候
這種方式在處理大資料的時候特別有用,可以減少資料的傳輸
Spark內部記錄metadat,表明transformation操作已經被響應了
載入資料也是延遲計算,資料只有在必要的時候,才會被載入進去

3.RDD.persist()
預設每次在RDDs上面進行action操作時,Spark都重新計算RDDs
如果想重複利用一個RDD,可以使用RDD.persist()
unpersist()方法是從快取中移除

六、KeyValue對

1.建立KeyValue對RDDs:使用map()函式,返回KeyValue對
2.reduceByKey():把相同的key結合

val rdd1=sc.parallelize(Array((1,2),(3,4),(3,6)))
val rdd2=rdd1.reduceByKey(_+_)
	
rdd2.foreach(println)
	          (1,2)
  	          (3,10)

3.groupByKey():把相同key的values分組

val rdd3=rdd1.groupByKey()

rdd3.foreach(println)
	          (1,CompactBuffer(2))
              (3,CompactBuffer(4, 6))

4.mapValues():函式作用於pairRDD的每個元素,key不變

val rdd4=rdd1.mapValues(x=>x+1)

rdd4.foreach(println)
	          (1,3)
              (3,5)
              (3,7)

5.flatMapValues():符號化的時候使用

val rdd5=rdd1.flatMapValues(x=>x to 5)

rdd5.foreach(println)
	          (3,4)
          	  (3,5)
	          (1,2)
	          (1,3)
	          (1,4)
	          (1,5)

6.keys:僅返回keys

val rdd6=rdd1.keys

rdd6.foreach(println)
	          1
	          3
	          3

7.values:僅返回values
8.sortByKey():按照key排序的RDD
9.combineByKey():!!!
引數(createCombiner,mergeValue,mergeCombiners,partitioner)
最常用的基於key的聚合函式,返回的型別可以與輸入型別不一樣,許多基於key的聚合函式都用到了它,像groupByKey()……
元素的key,要麼是之前見過的,要麼是新的
遍歷partition中的如果是新元素,使用我們提供的createCombiner()函式
如果是這個partition中已經存在的key,就會使用mergeValue()函式
合計每個partition的結果的時候,使用mergeCombiners()函式元素

//求平均成績

//初始化
val scores=sc.parrallelize(Array(("Nina",80),("Nina",90),("Nina",100),("Jack",100),("Jack",100),("Jack",100)))

scores.foreach(println)
(Jack,100)
(Jack,100)
(Jack,100)
(Nina,80)
(Nina,90)
(Nina,100)

//使用combineByKey()函式,求每個人的總科目數和總成績
val result=scores.combineByKey(score=>(1,score),(c1:(Int,Int),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))

result.foreach(println)
(Jack,(3,300))
(Nina,(3,270))

//求平均成績
val average=result.map{case(name,(num,score))=>(name,score/num)}

average.foreach(println)
(Nina,90)
(Jack,100)