1. 程式人生 > >spark高階運算元(一)

spark高階運算元(一)

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author zoujc
  * @date 2018/11/1
  */
object SparkRDDTest1 {
   def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("SparkRDDTest1").setMaster("local[2]")
      val sc = new SparkContext(conf)

      //指定為兩個分割槽
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7), 2) //設定一個函式,設定分割槽的ID索引,數值 val func1 = (index: Int, iter: Iterator[Int]) => { iter.toList.map(x => s"[partID: $index,val: $x]").iterator } //檢視每個分割槽的資訊 val res1 = rdd1.mapPartitionsWithIndex(func1) // println(res1.collect().toBuffer)
//用aggregate,指定初始值,對rdd1進行聚合操作,先區域性求和,在進行全域性求和 val res2 = rdd1.aggregate(0)(_ + _, _ + _) // println(res2) //將每個分割槽中最大的找出來求和 val res3 = rdd1.aggregate(0)(math.max(_, _),(_ + _)) //每個分割槽都以10為初始值,10用了3次 val res4 = rdd1.aggregate(10)(_ + _, _ + _) //在List中有多少元素比e大和有多少元素比e小
val rdd2 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")) val (biggerthane, lessthane) = rdd2.aggregate((0, 0))( (ee, str) => { var biggere = ee._1 var lesse = ee._2 if (str.compareTo("e") >= 0) biggere = ee._1 + 1 else if (str.compareTo("e") < 0) lesse = ee._2 + 1 (biggere, lesse) }, (x, y) => (x._1 + y._1, x._2 + y._2) ) // println((biggerthane,lessthane)) //aggregate與aggregateByKey區別:前者針對序列操作,後者針對k,v對操作 //原型 // def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V)) => // combOp: (U,U): RDD[(K,U)] = self.withScope{ // aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) // } //combineByKey // def combineByKey[C]( // createCombiner: V => C, // mergeValue: (C, V) => C, // mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { // combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) // } //從上面這段原始碼可以清晰看出,aggregateByKey呼叫的就是combineByKey方法。 // seqOp方法就是mergeValue,combOp方法則是mergeCombiners,cleanedSeqOp(createZero(), v)是createCombiner, // 也就是傳入的seqOp函式, 只不過其中一個值是傳入的zeroValue而已! //因此, 當createCombiner和mergeValue函式的操作相同, aggregateByKey更為合適! val rdd3 = sc.parallelize(List("a","b","c","d","e","f"),2) val res5 = rdd3.aggregate("|")(_ + _, _ + _) // println(res5) val rdd4 = sc.parallelize(List("12","23","345","4567"),2) //兩個分割槽,計算出字串最大長度,然後合成字串 val res6 = rdd4.aggregate("")((x,y) => math.max(x.length,y.length).toString, (x,y) => x + y) // println(res6) 24 val rdd5 = sc.parallelize(List("12", "23", "345", ""), 2) val res7 = rdd4.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y) // println(res7) 11 val rdd6 = sc.parallelize(List("12", "23", "", "345"), 2) val res8 = rdd6.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y) // println(res8) 11 //aggregateByKey可以先進行區域性操作,再進行全域性操作。 val pariRDD = sc.parallelize(List(("cat",2), ("cat",5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)),2) def func2(index: Int, iter: Iterator[(String,Int)]): Iterator[String] ={ iter.toList.map(x => s"[PartID: $index, val: $x]").iterator } println(pariRDD.mapPartitionsWithIndex(func2).collect().toBuffer) //把每種型別最大的次數取出來 val res9 = pariRDD.aggregateByKey(0)(math.max(_, _),_ + _) // println(res9.collect().toBuffer) // ArrayBuffer((dog,12), (cat,17), (mouse,6)) //不為10的變成10 val res10 = pariRDD.aggregateByKey(10)(math.max(_, _),_ + _) // println(res10.collect().toBuffer) // ArrayBuffer((dog,12), (cat,22), (mouse,20)) /** * pairRDD.aggregateByKey(0)(_ + _ , _ + _).collect與pairRDD.reduceByKey( _ + _).collect, * 這兩個方法執行結果是一樣的,實際上底層都是呼叫的同一個方法:combineByKey */ } }