1. 程式人生 > >Spark運算元:transformation之鍵值轉換combineByKey、foldByKey

Spark運算元:transformation之鍵值轉換combineByKey、foldByKey

1、combineByKey

1)def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] 2)def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 3)def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]   該函式用於將RDD[K,V]轉換成RDD[K,C],這裡的V型別和C型別可以相同也可以不同。

引數說明:

createCombiner:組合器函式,用於將V型別轉換成C型別,輸入引數為RDD[K,V]中的V,輸出為C mergeValue:合併值函式,將一個C型別和一個V型別值合併成一個C型別,輸入引數為(C,V),輸出為C mergeCombiners:合併組合器函式,用於將兩個C型別值合併成一個C型別,輸入引數為(C,C),輸出為C numPartitions:結果RDD分割槽數,預設保持原有的分割槽數 partitioner:分割槽函式,預設為HashPartitioner mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,預設為true。

示例1:

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21
 
scala> rdd1.combineByKey(
     |       (v : Int) => v + "_",   
     |       (c : String, v : Int) => c + "@" + v,  
     |       (c1 : String, c2 : String) => c1 + "$" + c2
     |     ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

其中三個對映函式分別為: createCombiner: (V) => C (v : Int) => v + “_”   //在每一個V值後面加上字元_,返回C型別(String) mergeValue: (C, V) => C (c : String, v : Int) => c + “@” + v   //合併C型別和V型別,中間加字元@,返回C(String) mergeCombiners: (C, C) => C (c1 : String, c2 : String) => c1 + “$” + c2   //合併C型別和C型別,中間加$,返回C(String) 其他引數為預設值。將RDD[String,Int]轉換為RDD[String,String]。

示例2:

rdd1.combineByKey(
      (v : Int) => List(v),
      (c : List[Int], v : Int) => v :: c,
      (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

結果是將RDD[String,Int]轉換為了RDD[String,List[Int]]。

2、foldByKey

1)def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] 2)def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] 3)def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,進行初始化V,再將對映函式應用於初始化後的V。

示例1:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.foldByKey(0)(_+_).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 
//將rdd1中每個key對應的V進行累加,注意zeroValue=0,需要先初始化V,對映函式為+操
//作,比如("A",0), ("A",2),先將zeroValue應用於每個V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再將對映函式應用於初始化後的V,最後得到(A,0+2),即(A,2)

示例2:

scala> rdd1.foldByKey(2)(_+_).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先將zeroValue=2應用於每個V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再將對映函
//數應用於初始化後的V,最後得到:(A,2+4),即:(A,6)

示例3:

scala> rdd1.foldByKey(0)(_*_).collect
res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
//先將zeroValue=0應用於每個V,注意,這次對映函式為乘法,得到:("A",0*0), ("A",2*0),
//即:("A",0), ("A",0),再將對映函//數應用於初始化後的V,最後得到:(A,0*0),即:(A,0)
//其他K也一樣,最終都得到了V=0
 
scala> rdd1.foldByKey(1)(_*_).collect
res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
//對映函式為乘法時,需要將zeroValue設為1,才能得到我們想要的結果。

使用foldByKey運算元時候,要特別注意對映函式及zeroValue的取值!