1. 程式人生 > >spark aggregate函數

spark aggregate函數

cnblogs 註意 part logs bsp inner aggregate all mat

aggregate函數將每個分區裏面的元素進行聚合,然後用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 註意: 1.每個分區開始聚合第一個元素都是zeroValue 2.分區之間的聚合,zeroValue也參與運算
scala> val rdd = sc.parallelize(List(18,28,7,66,-19,100,29,55,4),3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at 
<console>:24 //先對分區內的元素進行聚合; scala> def InnerCom(a:Int, b:Int) : Int = { | println("InnerCom: " + a + " : " + b) | math.min(a,b) | } InnerCom: (a: Int, b: Int)Int //對聚合後的分區之間進行聚合 scala> def partitionCom(a:Int, b:Int): Int = { | println("partitionCom: " + a + " : " + b) | a + b | } partitionCom: (a: Int, b: Int)Int //3個分區,min(分區1)=7,min(分區2)=-19,min(分區1)=4 //50 + 7 + -19 + 4 = 42 scala> rdd.aggregate(50)(InnerCom,partitionCom) InnerCom: 50 : 18 InnerCom: 18 : 28 InnerCom: 18 : 7 partitionCom: 50 : 7 InnerCom: 50 : 66 InnerCom: 50 : -19 InnerCom: -19 : 100 partitionCom: 57 : -19 InnerCom: 50 : 29 InnerCom: 29 : 55 InnerCom: 29 : 4 partitionCom: 38 : 4 res5: Int = 42

spark aggregate函數