1. 程式人生 > >Spark運算元:Action之countByKey、foreach、foreachPartition、sortBy

Spark運算元:Action之countByKey、foreach、foreachPartition、sortBy

1、countByKey:def countByKey(): Map[K, Long]

countByKey用於統計RDD[K,V]中每個K的數量。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
 
scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)

2、foreach:def foreach(f: (T) ⇒ Unit): Unit

foreach用於遍歷RDD,將函式f應用於每一個元素。如果對RDD執行foreach,只會在Executor端有效,而並不是Driver端。

比如:rdd.foreach(println),只會在Executor的stdout中打印出來,Driver端是看不到的,此時可以使用accumulator共享變數與foreach結合來處理。

scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0
 
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 
scala> rdd1.foreach(x => cnt += x)
 
scala> cnt.value
res51: Int = 15
 
scala> rdd1.collect.foreach(println)
1
2
3
4
5

3、foreachPartition:def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit

foreachPartition和foreach類似,只不過是對每一個分割槽使用f。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 
scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0
 
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at :21
 
scala>     rdd1.foreachPartition { x => {
     |       allsize += x.size
     |     }}
 
scala> println(allsize.value)
10

4、sortBy

def sortBy[K](f:(T) ⇒ K, ascending: Boolean= true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortBy根據給定的排序k函式將RDD中的元素進行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
 
scala> rdd1.sortBy(x => x).collect
res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //預設升序
 
scala> rdd1.sortBy(x => x,false).collect
res2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序
 
//RDD[K,V]型別
scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
 
scala> rdd1.sortBy(x => x).collect
res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))
 
//按照V進行降序排序
scala> rdd1.sortBy(x => x._2,false).collect
res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))