1. 程式人生 > >Spark運算元:RDD行動Action操作(4)–countByKey、foreach、foreachPartition、sortBy

Spark運算元:RDD行動Action操作(4)–countByKey、foreach、foreachPartition、sortBy

關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、countByKey、foreach、foreachPartition、sortBy

countByKey

def countByKey(): Map[K, Long]

countByKey用於統計RDD[K,V]中每個K的數量。

  1. scala>var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
  2. rdd1: org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[7
    ] at makeRDD at :21
  3. scala> rdd1.countByKey
  4. res5: scala.collection.Map[String,Long]=Map(A ->2, B ->3)

foreach

def foreach(f: (T) ⇒ Unit): Unit

foreach用於遍歷RDD,將函式f應用於每一個元素。

但要注意,如果對RDD執行foreach,只會在Executor端有效,而並不是Driver端。

比如:rdd.foreach(println),只會在Executor的stdout中打印出來,Driver端是看不到的。

我在Spark1.4中是這樣,不知道是否真如此。

這時候,使用accumulator共享變數與foreach結合,倒是個不錯的選擇。

  1. scala>var cnt = sc.accumulator(0)
  2. cnt: org.apache.spark.Accumulator[Int]=0
  3. scala>var rdd1 = sc.makeRDD(1 to 10,2)
  4. rdd1: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[5] at makeRDD at :21
  5. scala> rdd1.foreach(x => cnt += x)
  6. scala> cnt.value
  7. res51
    :Int=55
  8. scala> rdd1.collect.foreach(println)
  9. 1
  10. 2
  11. 3
  12. 4
  13. 5
  14. 6
  15. 7
  16. 8
  17. 9
  18. 10

foreachPartition

def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit

foreachPartition和foreach類似,只不過是對每一個分割槽使用f。

  1. scala>var rdd1 = sc.makeRDD(1 to 10,2)
  2. rdd1: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[5] at makeRDD at :21
  3. scala>var allsize = sc.accumulator(0)
  4. size: org.apache.spark.Accumulator[Int]=0
  5. scala>var rdd1 = sc.makeRDD(1 to 10,2)
  6. rdd1: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[6] at makeRDD at :21
  7. scala> rdd1.foreachPartition { x =>{
  8. | allsize += x.size
  9. |}}
  10. scala> println(allsize.value)
  11. 10

sortBy

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortBy根據給定的排序k函式將RDD中的元素進行排序。

  1. scala>var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
  2. scala> rdd1.sortBy(x => x).collect
  3. res1:Array[Int]=Array(0,1,2,3,6,7)//預設升序
  4. scala> rdd1.sortBy(x => x,false).collect
  5. res2:Array[Int]=Array(7,6,3,2,1,0)//降序
  6. //RDD[K,V]型別
  7. scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
  8. scala> rdd1.sortBy(x => x).collect
  9. res3:Array[(String,Int)]=Array((A,1),(A,2),(B,3),(B,6),(B,7))
  10. //按照V進行降序排序
  11. scala> rdd1.sortBy(x => x._2,false).collect
  12. res4:Array[(String,Int)]=Array((B,7),(B,6),(B,3),(A,2),(A,1))

更多關於Spark運算元的介紹,可參考spark運算元系列文章:

http://blog.csdn.net/ljp812184246/article/details/53895299