1. 程式人生 > >我是60歲程式設計師

我是60歲程式設計師



import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author zoujc
  * @date 2018/10/31
  */
object LearnRDD {
   def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setAppName("SparkRDDTest").setMaster("local[2]")
      val sc = new SparkContext(sparkConf)

      //通過並行化,生成RDD
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)) //對rdd1裡面的每個元素*2後排序,true表示正序,false表示倒序 val res1 = rdd1.map(_ * 2).sortBy(x => x, true) //過濾出大於等於10的元素 val res2 = res1.filter(_ >= 10) val rdd2 = sc.parallelize(Array("a,b,c","d,e,f","g,h,i")) //對rdd2裡的每個元素先切分再壓平
val res3 = rdd2.flatMap(_.split(",")) val rdd3 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i g","a a b"))) //將rdd3的每個元素先切分在壓平 val res4 = rdd3.flatMap(_.flatMap(_.split(" "))) val rdd4 = sc.parallelize(List(5,6,4,3)) val rdd5 = sc.parallelize
(List(1,2,3,4)) //求並集 val res5 = rdd4.union(rdd5) //求交集 val res6 = rdd4.intersection(rdd5) //去重 val res7 = rdd4.union(rdd5).distinct() val rdd6 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2))) val rdd7 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2))) //求join val res8 = rdd6.join(rdd7) //求左連線、右連線 val res9 = rdd6.leftOuterJoin(rdd7) val res10 = rdd6.rightOuterJoin(rdd7) //求並集,可以不用.union,也很強大 val res11 = rdd6 union rdd7 //按key分組 val res12 = res11.groupByKey() //分別用groupByKey和reduceByKey實現單詞計數,注意groupByKey與reduceByKey的區別 //groupByKey val res13 = res11.groupByKey().mapValues(_.sum) /** * groupByKey:groupByKey會對每一個RDD中的value值進行聚合形成一個序列(Iterator) * 此操作發生在reduce端,所以勢必所有的資料將會通過網路傳輸,造成不必要的浪費 * 同時如果資料量十分大,可能還會造成OutOfMemoryError */ //reduceByKey,先進行區域性聚合,在進行全域性聚合 val res14 = res11.reduceByKey(_ + _) /** * reduceByKey:reduceByKey會在結果傳送至reduce之前會對每個mapper在本地進行merge, * 有點類似於MapReduce中的combiner。這樣做的好處是在map端進行一次reduce之後, * 資料量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的進行結果計算。 */ val rdd8 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd9 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //cogroup ,注意cgroup與groupByKey區別 val res15 = rdd8.cogroup(rdd9) /** * cogroup是將輸入的資料集(k,v)和另外的資料集(k,w)進行cogroup, * 得到的資料集是(k,Seq(v),Seq(w))的資料集 */ val rdd10 = sc.parallelize(List(1, 2, 3, 4, 5)) //reduce聚合 reduce是action類運算元 val res16 = rdd10.reduce(_ + _) val rdd11 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd12 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) val rdd13 = rdd11.union(rdd12) //按key進行聚合 val res17 = rdd13.reduceByKey(_ + _) //按value的降序排序 val res18 = rdd13.reduceByKey(_ + _).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)) /** * 笛卡爾積: 笛卡爾乘積是指在數學中,兩個集合X和Y的笛卡尓積(Cartesian product),又稱直積, * 表示為X×Y,第一個物件是X的成員而第二個物件是Y的所有可能有序對的其中一個成員[3] 。 * 假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積為{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。 */ val res19 = rdd11.cartesian(rdd12) //要通過action型別的運算元才能顯示出結果,將結果放到可變陣列中,就可以看到輸出結果, //如果不加toBuffer,則打印出來的是一個引用。 // println(res1.collect().toBuffer) // println(res2.collect().toBuffer) // ArrayBuffer(10, 12, 14, 16, 18, 20) // println(res3.collect().toBuffer) // println(res4.collect().toBuffer) // println(res5.collect().toBuffer) // println(res6.collect().toBuffer) // println(res7.collect().toBuffer) // ArrayBuffer(4, 1, 5, 6, 2, 3) // println(res8.collect().toBuffer) // println(res9.collect().toBuffer) // println(res10.collect().toBuffer) // res8.foreach(println) // res9.foreach(println) // res10.foreach(println) // println(res11.collect().toBuffer) // println(res12.collect().toBuffer) // println(res13.collect().toBuffer) // println(res14.collect().toBuffer) // println(res15.collect().toBuffer) // println(res16) // println(res17.collect().toBuffer) // println(res18.collect().toBuffer) // println(res19.collect().toBuffer) } }