1. 程式人生 > >aggregat和aggregateByKey用法

aggregat和aggregateByKey用法

aggregate用法 —先區域性再全域性計算

val rdd2= sc.parallelize(List(1,2,3,4,5,6,7,8),2)
其中rdd2被分為兩個分割槽,0分割槽的資料是1,2,3,4;1分割槽的資料是5,6,7,8

rdd2.aggregate(0)(math.max(_,_),_+_) 結果是12
分析:初始值0和第一個分割槽的資料比較,取最大值4,然後初始值0和第二個分割槽的資料比較,取最大值8,然後全域性相加,全域性相加的時候0也要加一次,所以 4+8+0=12

rdd2.aggregate(5)(math.max(_,_),_+_) 結果是18
分析:初始值5和第一個分割槽的資料比較,取最大值5,然後初始值5和第二個分割槽的資料比較,取最大值8,然後全域性相加,全域性相加的時候0也要加一次,所以 5+8+5=18

rdd2.aggregate(5)(_+_,_+_) 結果是51
分析:初始值5和第一個分割槽的所有資料相加,得到1+2+3+4+5=15 ,初始值5和第二個分割槽的所有資料相加,得到5+6+7+8+5=31
然後全域性相加 15 + 31 + 5= 51

val rdd1= sc.parallelize(List(“a”,“b”,“c”,“d”,“e”,“f”),2)

rdd1.aggregate("")(_+_,_+_) 結果可能是abcdef,也可能是defabc,因為是兩個分割槽對應2個task,並行跑,誰先跑完就在最前面

val rdd3= sc.parallelize(List(“12”,“23”,“345”,“4567”),2)

rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y) => x+y) 結果是24或者42

val rdd4= sc.parallelize(List("12","23","345",""),2)

rdd4.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y) => x+y)結果是10或者01
分析:先對初始值空字元進行length 得到的是0 然後0再toString,再length得到的是1,之後分別和第一第二分割槽的每個數比較,最後一個數據比較之後直接返回,

val rdd5= sc.parallelize(List(“12”,“23”,"",“345”),2)

rdd5.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y) => x+y)結果是11

第二個分割槽的第一個資料比較之後是0,然後toString,之後還要比較,所以長度是1,在和345比較。

aggregateByKey用法 —只計算區域性

val rdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)

rdd.reduceByKey(_+_).collect 結果是Array((cat,19),(dog,12),(mouse,6))

rdd.aggregateByKey(100)(_+_,_+_).collect 結果是Array((cat,219),(dog,112),(mouse,206))

rdd.aggregateByKey(100)(math.max(_,_),_+_).collect結果是 Array((dog,100), (cat,200), (mouse,200)) 因為初始值是100,分別和兩個分割槽的每個資料進行、比對,取最大值,然後再相加。100先和"(“cat”,2)比取100,再和(“cat”,5)比還是100,所以第一個分割槽的cat是100,第二個分割槽的cat也是100,全域性相加100+100=200

rdd.aggregateByKey(5)(math.max(_,_),_+_).collect
結果是Array((dog,12), (cat,17), (mouse,10)) 結果自己分析