Spark 的鍵值對(pair RDD)操作,Scala實現
一:什麼是Pair RDD?
Spark為包含鍵值對對型別的RDD提供了一些專有操作,這些操作就被稱為Pair RDD,Pair RDD是很多程式的構成要素,因為它們提供了並行操作對各個鍵或跨節點重新進行資料分組的操作介面。
二:Pair RDD的操作例項
1:建立Pair RDD
在saprk中有很多種建立pairRDD的方式,很多儲存鍵值對的資料格式會在讀取時直接返回由其鍵值對資料組成的pair RDD,此外需要把一個普通的RDD轉化為pair RDD時,可以呼叫map函式來實現,傳遞的函式需要返回鍵值對。
scala> var lines = sc.parallelize(List("i love you")) lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27 scala> val pairs = lines.map(x=>(x,1)) pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:29 scala> pairs.foreach(println) (i love you,1)
2:Pai RDDr的轉化操作
由於pair RDD中包含二元組,所以需要傳遞函式應當操作二元組而不是獨立的元素,假設鍵值對集合為{(1,2),(3,4),(3,6)}
rdd.reduceByKey(func):合併具有相同key的value值
rdd.groupByKey():對具有相同鍵的進行分組 [資料分組]scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:27 scala> val result = rdd.reduceByKey((x,y)=>x+y) result: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[14] at reduceByKey at <console>:29 scala> result.foreach(println) (1,2) (3,10)
scala> val result = rdd.groupByKey()
result: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[1] at groupByKey at <console>:29
scala> result.foreach(println)
(3,CompactBuffer(4, 6))
(1,CompactBuffer(2))
rdd.mapValues(func):對pairRDD中的每個值應用func 鍵不改變rdd.flatMapValues(func):類似於mapValues,返回的是迭代器函式scala> val result = rdd.mapValues(x=>x+1) result: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapValues at <console>:29 scala> result.foreach(println) (1,3) (3,5) (3,7)
scala> val result = rdd.flatMapValues(x=>(x to 5))
result: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at flatMapValues at <console>:29
scala> result.foreach(println)
(3,4)
(3,5)
(1,2)
(1,3)
(1,4)
(1,5)
rdd.keys:返回一個僅包含鍵的RDDscala> val result = rdd.keys
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at keys at <console>:29
scala> result.foreach(println)
3
1
3
rdd.values:返回一個僅包含value的RDDscala> val result = rdd.values
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at values at <console>:29
scala> result.foreach(println)
2
4
6
rdd.sortByKey():返回一個根據鍵排序的RDD
資料排序,可以通過接受ascending的引數表示我們是否想要結果按升序排序(預設是true)
scala> val result = rdd.sortByKey().collect()
result: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
scala> result
res8: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))
<pre name="code" class="java">scala> val result = rdd.sortByKey(ascending=false).collect()
result: Array[(Int, Int)] = Array((3,4), (3,6), (1,2))
3:針對兩個pair RDD 的轉化操作
函式名 | 目的 | 示例 | 結果 |
substractByKey | 刪掉RDD中鍵與other RDD中的鍵相同的元素 | rdd.subtractByKey(other) | {(1,2)} |
join | 對兩個RDD進行內連線 | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
rightOuterJoin | 對兩個RDD進行連線操作,右外連線 | rdd.rightOuterJoin(other) | {(3,(4,9)),(3,(6,9))} |
leftOuterJoin | 對兩個RDD進行連線操作,左外連線 | rdd.rightOuterJoin(other) | {(1,(2,None)),(3,(4,9)),(3,(6,9))} |
cogroup | 將兩個RDD中擁有相同鍵的資料分組 | rdd.cogroup(other) | {1,([2],[]),(3,[4,6],[9])} |
假設:rdd={(1,2),(3,4),(3,6)} other={(3,9)}
rdd.subtractByKey( other ):刪除掉RDD中與other RDD中鍵相同的元素
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:27
scala> val other = sc.parallelize(List((3,9)))
other: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:27
scala> val result = rdd.subtractByKey(other)
result: org.apache.spark.rdd.RDD[(Int, Int)] = SubtractedRDD[6] at subtractByKey at <console>:31
scala> result.foreach(println)
(1,2)
rdd.join( other ):對兩個RDD進行內連線scala> val result = rdd.join(other)
result: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:31
scala> result.foreach(println)
(3,(4,9))
(3,(6,9))
rdd.rightOuterJoin(other):對兩個RDD進行連線操作,確保第二個RDD的鍵必須存在(右外連線)scala> val result = rdd.rightOuterJoin(other)
result: org.apache.spark.rdd.RDD[(Int, (Option[Int], Int))] = MapPartitionsRDD[15] at rightOuterJoin at <console>:31
scala> result.foreach(println)
(3,(Some(4),9))
(3,(Some(6),9))
rdd.leftOuterJoin(other):對兩個RDD進行連線操作,確保第一個RDD的鍵必須存在(左外連線)scala> val result = rdd.leftOuterJoin(other)
result: org.apache.spark.rdd.RDD[(Int, (Int, Option[Int]))] = MapPartitionsRDD[18] at leftOuterJoin at <console>:31
scala> result.foreach(println)
(3,(4,Some(9)))
(3,(6,Some(9)))
(1,(2,None))
rdd.cogroup(other),將有兩個rdd中擁有相同鍵的資料分組scala> val result = rdd.cogroup(other)
result: org.apache.spark.rdd.RDD[(Int, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[20] at cogroup at <console>:31
scala> result.foreach(println)
(1,(CompactBuffer(2),CompactBuffer()))
(3,(CompactBuffer(4, 6),CompactBuffer(9)))
4:過濾操作
這裡假設rdd={(1,2),(3,4),(3,6)}
對value做控制,key不加限制條件
scala> val result = rdd.filter{case(x,y)=>y%3==0}
result: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[22] at filter at <console>:29
scala> result.foreach(println)
(3,6)
scala> val result = rdd.filter{case(x,y)=>y<=4}
result: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[23] at filter at <console>:29
scala> result.foreach(println)
(1,2)
(3,4)
scala>
對key做控制,value不控制
scala> val result = rdd.filter{case(x,y)=>x<3}
result: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[24] at filter at <console>:29
scala> result.foreach(println)
(1,2)
5:聚合操作
使用reduceByKey()和mapValues()計算每個鍵對應的平均值
scala> val rdd = sc.parallelize(List(Tuple2("panda",0),Tuple2("pink",3),Tuple2("pirate",3),Tuple2("panda",1),Tuple2("pink",4)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at parallelize at <console>:27
scala> val result = rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
result: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[47] at reduceByKey at <console>:29
scala> result.foreach(println)
(pirate,(3,1))
(panda,(1,2))
(pink,(7,2))
實現經典的分散式單詞計數問題(使用flatMap() 來生成以單詞為鍵,以數字1為值的pair RDD)scala> val rdd = sc.parallelize(List("i am thinkgamer, i love cyan"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[49] at parallelize at <console>:27
scala> val words = rdd.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at flatMap at <console>:29
scala> val result = words.map(x=>(x,1)).reduceByKey((x,y) => x+y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[52] at reduceByKey at <console>:31
scala> result.foreach(println)
(cyan,1)
(love,1)
(thinkgamer,,1)
(am,1)
(i,2)
實現經典的分散式單詞計數問題(使用countByValue更快的實現單詞計數)scala> val rdd = sc.parallelize(List("i am thinkgamer, i love cyan"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val result = rdd.flatMap(x=>x.split(" ")).countByValue()
result: scala.collection.Map[String,Long] = Map(am -> 1, thinkgamer, -> 1, i -> 2, love -> 1, cyan -> 1)
scala> result.foreach(println)
(am,1)
(thinkgamer,,1)
(i,2)
(love,1)
(cyan,1)
scala>
並行度調優
每個RDD都有固定數目的分割槽,分割槽數決定了在RDD 上執行操作時的並行度,在執行聚合或分組函式時,可以要求Spark使用給定的分割槽,Spark始終嘗試根據叢集的大小,推斷出一個有意義的預設值,但是有時候你可能要對並行度進行調優來獲取更好的效能發展。
scala中自定義reduceByKey()的並行度
val data = Seq(("a",3),("b",4),("c",5))
sc.parallelize(data).reduceByKey((x,y)=>x+y) //預設並行度
sc.parallelize(data).reduceByKey((x,y)=>x+y,10) //自定義並行度
6:pair RDD的行動操作
和轉化操作一樣,所有基礎RDD支援的傳統行動操作也都在pair RDD上可用,pair RDD提供了一些額外的行動操作,可以讓我們充分利用資料的鍵值對特性,如下
以鍵值對集合{(1,2),(3,4),(3,6)}為例
函式名 | 描述 | 示例 | 結果 |
countByKey | 對每個鍵對應的元素分別計數 | rdd.countByKey(other) | {(1,1),(3,2)} |
collectAsMap() | 將結果以對映表的形式返回,以便查詢 | rdd.collectAsMap() | Map{(1,2),(3,4),(3,6)} |
lookup(key) | 返回給定鍵對應的所有值 | rdd.lookup(3) | [4,6] |
7:獲取RDD的分割槽方式
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:27
scala> pairs.partitioner
res4: Option[org.apache.spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new org.apache.spark.HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:29
scala> partitioned.partitioner
res5: Option[org.apache.spark.Partitioner] = Some([email protected])