1. 程式人生 > >Spark學習筆記3:鍵值對操作

Spark學習筆記3:鍵值對操作

對象 常用 ava java 參數 通過 頁面 ascend 處理過程

鍵值對RDD通常用來進行聚合計算,Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為pair RDD。pair RDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作接口。

Spark中創建pair RDD的方法:存儲鍵值對的數據格會在讀取時直接返回由其鍵值對數據組成的pair RDD,還可以使用map()函數將一個普通的RDD轉為pair RDD。

  • Pair RDD的轉化操作
  1. reduceByKey() 與reduce類似 ,接收一個函數,並使用該函數對值進行合並,為每個數據集中的每個鍵進行並行的歸約操作。返回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。例如 :上一章中單詞計數的例子:val counts = words.map(word => (word,1)).reduceByKey{ case (x,y) => x + y}
  2. foldByKey()與fold()類似,都使用一個與RDD和合並函數中的數據類型相同的零值最為初始值。val counts = words.map(word => (word,1)).foldByKey{ case (x,y) => x + y}
  3. combineByKey()是最為常用的基於鍵進行聚合的函數,可以返回與輸入類型不同的返回值。

  理解combineByKey處理數據流程,首先需要知道combineByKey的createCombiner()函數用來創建那個鍵對應的累加器的初始值,mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合並。mergeCombiners()方法將各個分區的結果進行合並。

使用combineByKey進行單詞計數的例子:

import org.apache.spark.{SparkConf, SparkContext}

object word {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(("coffee",1),("coffee",2),("panda",3),("coffee",9)))
val counts = input.combineByKey(
(v) => (v,1),
(acc:(Int,Int) ,v) => (acc._1 + v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
)
counts.foreach(println)
}
}

輸出結果:

技術分享

這個例子中的數據流示意圖如下:

技術分享

簡單說過程就是,將輸入鍵值對數據進行分區,每個分區先根據鍵計算相應的值以及鍵出現的次數。然後對不同分區進行合並得出最後的結果。

  4.groupByKey()使用RDD中的鍵來對數據進行分組,對於一個由類型K的鍵和類型V的值組成的RDD,所得到的結果RDD類型會是[K, Iterable[V] ]

 例如:

import org.apache.spark.{SparkConf, SparkContext}

object word {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("word")
    val sc = new SparkContext(conf)
    val input = sc.parallelize(List("scala spark scala core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" ")).map(word => (word,1))
    val counts = words.groupByKey()
    counts.foreach(println)
  }
}

  輸出:

技術分享

  5、cogroup函數對多個共享同一個鍵的RDD進行分組,對兩個鍵類型均為K而值類型分別為V和W的RDD進行cogroup時,得到的結果RDD類型為[(K,(Iterable[V],Iterable[W]))]

  6、join(other)這樣的連接是內連接,只有在兩個pair RDD中都存在的鍵才輸出。若一個輸入對應的鍵有多個值時,生成的pair RDD會包括來自兩個輸入RDD的每一組相對應的記錄。理解這句話看下面的例子:

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val joins = rdd.join(other)

  輸出結果:

技術分享

  7、leftOuterJoin(other)左外連接和rightOuterJoin(other)右外連接都會根據鍵連接兩個RDD,但是允許結果中存在其中的一個pair RDD所缺失的鍵。

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val join1 = rdd.rightOuterJoin(other)

  輸出結果:

技術分享

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val join2 = rdd.leftOuterJoin(other)

  輸出結果: 

 技術分享

  8、sortByKey()函數接收一個叫做ascending的參數,表示想要讓結果升序排序還是降序排序。

val input = sc.parallelize(List("scala spark scala core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y)=>x+y)
    val counts = words.sortByKey()

  輸出結果:

技術分享

  • Pair RDD的行動操作
  1. countByKey() 對每個鍵對應的元素分別計數。
  2. collectAsMap()將結果以映射表的形式返回,註意後面的value會覆蓋前面的。
    val num = sc.parallelize(List((1,2),(3,4),(3,6)))
    println(num.collectAsMap().mkString(" "))
    

    輸出結果:技術分享

  3. lookup(key)返回給定鍵對應的所有值。
  • 數據分區

  Spark程序可以通過控制RDD分區方式來減少通信開銷。

運行下面這段代碼,用來查看用戶查閱了自己訂閱的主題的頁面的數量,結果返回3:

val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問情況
val userData = sc.parallelize(list1)
val events = sc.parallelize(list2)
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println(results)

  上面這段代碼中,用到了join操作,會將兩個數據集中的所有鍵的哈希值都求出來,將該哈希值相同的記錄通過網絡傳到同一臺機器上,然後在那臺機器上對所有鍵相同的記錄進行連接操作。

  假如userdata表很大很大,而且幾乎是不怎麽變化的,那麽每次都對userdata表進行哈希值計算和跨節點的數據混洗,就會產生很多的額外開銷。

如下:

技術分享

解決這一產生額外開銷的方法就是,對userdata表使用partitionBy()轉化操作,將這張表轉為哈希分區。修改後的代碼如下:

    val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
    val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問情況
    val userData = sc.parallelize(list1)
    val events = sc.parallelize(list2)
    userData.partitionBy(new DomainNamePartitioner(10)).persist()
    val joined = userData.join(events)
    val results = joined.filter({
      case (id, (info, link)) =>
        info.contains(link)
    }
    ).count()
    println(results)

  構建userData時調用了partitionBy(),在調用join()時,Spark只會對events進行數據混洗操作,將events中特定UserID的記錄發送到userData的對應分區所在的那臺機器上。這樣,通過網絡傳輸的數據就大大減少,程序運行速度也可以顯著提升。partitionBy()是一個轉化操作,因此它的返回值是一個新的RDD。

  新的數據處理過程如下:

技術分享

  scala可以使用RDD的partitioner屬性來獲取RDD的分區方式,它會返回一個scala.Option對象。

  可以從數據分區中獲益的操作有cogroup() , groupWith() , join() , leftOuterJoin() , rightOuterJoin() , groupByKey() , reduceByKey() , combineByKey()以及lookup()。

  實現自定義分區器,需要繼承org.apache.spark.Partitioner類並實現下面的三個方法:

  • numPartitions: Int :返回創建出來的分區數
  • getPartition(key: Any):Int : 返回給定鍵的分區編號(0 到 numPartitions - 1)
  • equals() : Java判斷相等的方法,Spark用這個方法來檢查分區器對象是否和其他分區器實例相同,這樣Spark才可以判斷兩個RDD的分區方式是否相同。

Spark學習筆記3:鍵值對操作