1. 程式人生 > >spark入門四(RDD高階運算元一)

spark入門四(RDD高階運算元一)

1. mapPartitionsWithIndex

建立RDD,指定分割槽數為2

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7),2)

檢視分割槽

scala> rdd1.partitions

– 內容如下:

res0: Array[org.apache.spark.Partition] = Array([email protected], org.apa[email protected])

檢視分割槽數量

scala> rdd1.partitions.length   //結果: res1: Int = 2

建立一個迭代函式

def func(index : Int, iter : Iterator[Int]) : Iterator[String] = {
	iter.toList.map(x => "[partID:" + index + ",val: " + x +"]").iterator
}

檢視分割槽內容

scala> rdd1.mapPartitionsWithIndex(func).collect()

內容如下:

res2: Array[String] = Array([partID:0,val: 1], [partID:0,val: 2], [partID:0,val: 3], [partID:1,val: 4], [partID:1,val: 5], [partID:1,val: 6], [partID:1,val: 7])

2. aggregate 聚合 更靈活

建立RDD

scala>  val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)	

RDD求和

scala>   rdd.aggregate(0)(_+_,_+_)

求每個分割槽最大值的工作,再對最大值求和

引導: 如何求陣列最大值:val arr = Array(1,2,3)
arr.reduce(math.max(,)) //獲取到最大值為3

scala> rdd.aggregate(0)(math.max(_,_),_+_)  

結果為:res6: Int = 13
說明:第一個分割槽最大值為4,第二個為9,加起來為13

求最大值:

scala> rdd.aggregate(0)(math.max(_,_),math.max(_,_))
res0: Int = 9 

這裡面有個問題:初始值為0,必須小於數組裡面所有的元素才可以,否則結果報錯。為了避免這個問題,可以將初始值定義為rdd的第一個元素,程式碼如下:

scala> rdd.aggregate(rdd.first)(math.max(_,_),math.max(_,_))
res6: Int = 9

練習:

scala> rdd.aggregate(10)(math.max(_,_),_+_)
res7: Int = 30

說明:
初始值為10
10 和 第一個分割槽最大值4比較,結果為10
10 和 第一個分割槽最大值9比較,結果為10
結果為:10+10+10 = 3

scala> rdd.aggregate(6)(math.max(_,_),_+_)
res8: Int = 21

說明:
初始值為6
6 和 第一個分割槽最大值4比較,結果為6
6 和 第一個分割槽最大值9比較,結果為9
結果為:6+6+9 = 21

scala> rdd.aggregate(3)(math.max(_,_),_+_)
res9: Int = 16

說明:
初始值為3
3 和 第一個分割槽最大值4比較,結果為4
3 和 第一個分割槽最大值9比較,結果為9
結果為:3+4+9 = 16

scala> val rdd1 = sc.parallelize(List("a","b","c","d","e"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd1.aggregate("")(_+_,_+_)
res10: String = abcde
def func2(index : Int, iter : Iterator[String]) : Iterator[String] = {
	iter.toList.map(x => "[partID:" + index + ",val: " + x +"]").iterator
}
```java

```java
scala> rdd1.mapPartitionsWithIndex(func2).collect()
res13: Array[String] = Array([partID:0,val: a], [partID:0,val: b], [partID:1,val: c],[partID:1,val: d], [partID:1,val: e])
scala> rdd1.aggregate("|")(_+_,_+_)
res15: String = ||ab|cde
scala> val rdd2 = sc.parallelize(List("12","23","345","4567"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> rdd2.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y);
res16: String = 24

scala> rdd2.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y);
res17: String = 42

scala> rdd2.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y);
res18: String = 24

scala> rdd2.aggregate("")((x,y)=>math.max(x.length,y.length).toString,(x,y)=>x+y);
res19: String = 42

說明:
相同的程式碼,得到的結果有兩個,
第一個分割槽最大長度是2,第二個分割槽最大長度是4,
因為有兩個分割槽計算,並不知道哪個先返回,所以會有兩個結果。

scala> val rdd3 = sc.parallelize(List("12","23","345",""),2)	

scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y);
res25: String = 01

scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y);
res26: String = 10

scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y);
res27: String = 10

scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y);
res28: String = 01

說明:
第一個分割槽:
初始值""長度0和"12"的長度2比較,結果為0,呼叫toString,結果為 “0”
再將 "0"長度為1和 “23"的長度2比較,結果為1,呼叫toString,結果為 “1”
第二個分割槽:
初始值”"長度0和"345"的長度3比較,結果為0,呼叫toString,結果為 “0”
再將 "0"長度為1和 ""的長度0比較,結果為0,呼叫toString,結果為 “1”
因為有兩個分割槽計算,並不知道哪個先返回,所以會有兩個結果。

3. aggregateByKey 把相同的Key進行操作

//1. 建立k-v對RDD
scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("cat",13),("mouse",2)))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
//2. 檢視內容
def func3(index : Int, it : Iterator[Any]) : Iterator[Any] = {
    it.toList.map(x => "[partID:" + index + ",val: " + x +"]").iterator
}

scala> pairRDD.mapPartitionsWithIndex(func3).collect()
res30: Array[Any] = Array([partID:0,val: (cat,2)], [partID:0,val: (cat,5)], [partID:0,val: (mouse,4)], [partID:1,val: (cat,12)], [partID:1,val: (cat,13)], [partID:1,val: (mouse,2)])
//統計每種動物的總數量,分割槽求和,再求和
scala> pairRDD.aggregateByKey(0)(_+_,_+_).collect()
res31: Array[(String, Int)] = Array((cat,32), (mouse,6))

//將每種動物的每個分割槽最大數量的那個元素求出來,再對每個分割槽求和
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect()
res32: Array[(String, Int)] = Array((cat,18), (mouse,6))

//將每種動物的最大數量的那個元素求出來
scala> pairRDD.aggregateByKey(0)(math.max(_,_),math.max(_,_)).collect()
res33: Array[(String, Int)] = Array((cat,13), (mouse,4))

完…