1. 程式人生 > >[Spark04]RDD中的運算元

[Spark04]RDD中的運算元

1、RDD可以分為兩類,transformations和actions。

2、Transformations 變換/轉換運算元:將一個RDD轉換成另一個RDD,所有的Transformation都是lazy的,只有發生action是才會觸發計算

3、Action 行動運算元:這類運算元會觸發 SparkContext 提交 作業。

####思考:spark官網說這樣設定運算元會使spark執行地更加的高效,請問這是為什麼呢?

答:1)假設執行一個rdda.map().reduce()的操作,如果作為轉換運算元map()也觸發計算,則肯定得將結果寫出來,降低效率。

        2)由於lineage的關係,之後詳細講解。

4、當你對一個RDD進行轉換時,只要觸發action操作就可能會引起RDD的重算,RDD的重算機制使得當某個RDD資料丟失重算可以恢復該RDD。

5、你可以通過persist()或cache()方法將RDD儲存在記憶體中,這樣的話,在下次查詢時可以更快地訪問到RDD中的元素。spark也支援將RDD儲存在磁碟上,也支援RDD的跨節點複製。

---------------------------------------------------------------------------------------------------------------------------------------------------------------

---------------------------------------------------------------------------------------------------------------------------------------------------------------

以下對Transformations和Action 進行詳細的解釋:

一、Transformations中的運算元

a.map運算元

scala> sc.parallelize(1 to 4).map(x => x*x).collect

res0: Array[Int] = Array(1, 4, 9, 16)

b.flatMap運算元

scala> f.flatMap(_.map(_*2))

res20: List[Int] = List(2, 4, 6, 8, 4, 6, 8, 10)

scala> f.map(_.map(_*2))
res23: List[List[Int]] = List(List(2, 4, 6, 8), List(4, 6, 8, 10))


scala> sc.parallelize(List("aa,bb,cc","cxf,spring,struts2","java,C++,javaScript")).flatMap(x => x.split(",")).foreach(println)
aa
bb
cc
cxf
spring
struts2
java
C++
javaScript

c.filter()運算元

scala> sc.parallelize(1 to 9).filter(_%2==0).collect

res14: Array[Int] = Array(2, 4, 6, 8)

d.mapPartitions()運算元

與Map類似,但map中的func作用的是RDD中的每個元素,而mapPartitions中的func作用的物件是RDD的一整個分割槽

scala> sc.parallelize(1 to 9, 3).mapPartitions( a=>a.filter(_>=7)).collect

res15: Array[Int] = Array(7, 8, 9)

e.sample(withReplacementfractionseed)運算元

對RDD進行抽樣,其中引數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣比例;seed為隨機數種子,比如當前時間戳,這個值如果沒有傳入,預設值是一個0~Long.maxvalue之間的整數。

f、mapValues()運算元

##只對鍵值對中的Values進行操作

scala> sc.parallelize(Array("a","b","c","d")).map(x=>(x,1)).mapValues(_+1).collect

res0: Array[(String, Int)] = Array((a,2), (b,2), (c,2), (d,2))

h.subtract()運算元

##返回兩個RDD的差集

scala> val a=sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24


scala> val b=sc.parallelize(3 to 6)

b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> a.subtract(b).collect

res1: Array[Int] = Array(2, 8, 1, 7, 9)

i.intersection()運算元

##返回兩個RDD的交集

scala> a.intersection(b).collect

res2: Array[Int] = Array(4, 6, 3, 5)

二、Actions中的運算元

a.collect()運算元:

collect 相當於 toArray, toArray 已經過時不推薦使用, collect 將分散式的 RDD 返回為一個單機的 scala Array 陣列。在這個陣列上運用 scala 的函式式操作。

b.count()運算元:

count 返回整個 RDD 的元素個數

c.reduce(func)運算元:

func函式將兩個引數歸併為一個引數,並將最後結果返回給驅動程式

d.first()運算元:

返回RDD的第一個元素

e.take(n)運算元:

返回RDD中前N個元素

f.sum()運算元:

返回RDD中所有元素的最大值

g.top(n)運算元:

返回一個包含RDD中最大的前n的數新的RDD。對於數值,按大小排,對於字串,按字典順序排。

h.max()運算元:

返回一個RDD中的最大值

i.min()運算元:

返回一個RDD中的最小值

j.reduceByKey()運算元:

將key相同的value加起來

k.takeSample(withReplacementnum, [seed])運算元:

takeSample()函式和上面的sample函式是一個原理,但是不使用相對比例取樣而是按設定的取樣個數進行取樣,同時返回結果不再是RDD,而是相當於對取樣後的資料進行
Collect(),返回結果的集合為單機的陣列。

l.takeOrdered()運算元:

返回RDD前N元素,採用自然順序或自定義比較器

---------------------------------------------------------------------------------------------------------------------------------------

Transformations和actions運算元太他麼多了,不一一列舉了啊,參看以下別人的部落格。

Spark中的運算元詳解請轉到以下連結:

http://blog.csdn.net/dream0352/article/details/62229977

http://blog.csdn.net/zcf1002797280/article/details/50752537

http://lxw1234.com/archives/2015/07/363.htm