1. 程式人生 > >Spark aggregate和combineByKey聚合操作

Spark aggregate和combineByKey聚合操作

aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
val pairRDD = listRDD.aggregate(zeroValue)((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))


combineByKey[C]( createCombiner: V => C,  mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
val result = pairRDD.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

例:

對於RDD{1,2,3,4},求和並計算個數

Aggregate( (6, 5) )( (acc, value) => (acc._1 + value), (acc._2 + 1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

zeroValue既是seqOp的acc的初始值,也是combOp的acc1的初始值

acc2的RDD是seqOp的計算結果

1. 執行seqOp

acc = (6, 5) value = 1 => (7, 6)

acc = (7, 6) value = 2 => (9, 7)

acc = (9, 7) value 

= 3 => (12, 8)

acc = (12, 8) value = 4 => (16, 9)

2. 執行combOp

acc1= (6, 5) acc2=(16, 9) => (22, 14)