1. 程式人生 > >RDD程式設計-行動運算元

RDD程式設計-行動運算元

2.4 Action

2.4.1 reduce(func)案例

  1. 作用:通過func函式聚集RDD中的所有元素,先聚合分割槽內資料,再聚合分割槽間資料。
  2. 需求:建立一個RDD,將所有元素聚合得到結果

(1)建立一個RDD[Int]

scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

(2)聚合RDD[Int]所有元素

scala> rdd1.reduce(_+_)
res50: Int = 55

(3)建立一個RDD[String]

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

(4)聚合RDD[String]所有資料

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)

2.4.2 collect()案例

  1. 作用:在驅動程式中,以陣列的形式返回資料集的所有元素。
  2. 需求:建立一個RDD,並將RDD內容收集到Driver端列印

(1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)將結果收集到Driver端

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  

2.4.3 count()案例

  1. 作用:返回RDD中元素的個數
  2. 需求:建立一個RDD,統計該RDD的條數

(1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)統計該RDD的條數

scala> rdd.count
res1: Long = 10

2.4.4 first()案例

  1. 作用:返回RDD中的第一個元素
  2. 需求:建立一個RDD,返回該RDD中的第一個元素

(1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)統計該RDD的條數

scala> rdd.first
res2: Int = 1

2.4.5 take(n)案例

  1. 作用:返回一個由RDD的前n個元素組成的陣列
  2. 需求:建立一個RDD,統計該RDD的條數

(1)建立一個RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

(2)統計該RDD的條數

scala> rdd.take(3)
res10: Array[Int] = Array(2, 5, 4)

2.4.6 takeOrdered(n)案例

  1. 作用:返回該RDD排序後的前n個元素組成的陣列
  2. 需求:建立一個RDD,統計該RDD的條數

(1)建立一個RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

(2)統計該RDD的條數

scala> rdd.takeOrdered(3)
res18: Array[Int] = Array(2, 3, 4)

2.4.7 aggregate案例

  1. 引數:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
  2. 作用:aggregate函式將每個分割槽裡面的元素通過seqOp和初始值進行聚合,然後用combine函式將每個分割槽的結果和初始值(zeroValue)進行combine操作。這個函式最終返回的型別不需要和RDD中元素型別一致。
  3. 需求:建立一個RDD,將所有元素相加得到結果

(1)建立一個RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

(2)將該RDD所有元素相加得到結果

scala> rdd.aggregate(0)(_+_,_+_)
res22: Int = 55

2.4.8 fold(num)(func)案例

  1. 作用:摺疊操作,aggregate的簡化操作,seqop和combop一樣。
  2. 需求:建立一個RDD,將所有元素相加得到結果

(1)建立一個RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

(2)將該RDD所有元素相加得到結果

scala> rdd.fold(0)(_+_)
res24: Int = 55

2.4.9 saveAsTextFile(path)

作用:將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它轉換為檔案中的文字

2.4.10 saveAsSequenceFile(path)

作用:將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。

2.4.11 saveAsObjectFile(path)

作用:用於將RDD中的元素序列化成物件,儲存到檔案中。

2.4.12 countByKey()案例

  1. 作用:針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
  2. 需求:建立一個PairRDD,統計每種key的個數

(1)建立一個PairRDD

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

(2)統計每種key的個數

scala> rdd.countByKey
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

2.4.13 foreach(func)案例

  1. 作用:在資料集的每一個元素上,執行函式func進行更新。
  2. 需求:建立一個RDD,對每個元素進行列印

(1)建立一個RDD

scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

(2)對該RDD每個元素進行列印

scala> rdd.foreach(println(_))
3
4
5
1
2