1. 程式人生 > >RDDs基本操作、RDDs特性、KeyValue對RDDs

RDDs基本操作、RDDs特性、KeyValue對RDDs

clas count() cas 類型 cti 遍歷 strong 節點 分布式

摘要:RDD是Spark中極為重要的數據抽象,這裏總結RDD的概念,基本操作Transformation(轉換)與Action,RDDs的特性,KeyValue對RDDs的Transformation(轉換)。

1.RDDs是什麽

Resilient distributed datasets(彈性分布式數據集) 。RDDs並行的分布在整個集群中,是Spark分發數據和計算的基礎抽象類,一個RDD是一個不可改變的分布式集合對象,Spark中,所有的計算都是通過RDDs的創建,轉換操作完成的,一個RDD內部由許多partitions(分片)組成。

分片:每個分片包括一部分數據,partitions可在集群不同節點上計算;分片是Spark並行處理的單元,Spark順序的,並行的處理分片。

2.RDDs的創建

(1) 把一個存在的集合傳給SparkContext的parallelize()方法,測試用
  val rdd=sc.parallelize(Array(1,2,3,4),4)
  第一個參數:待並行化處理的集合;第二個參數:分片個數

(2) 加載外部數據集
  val rddText=sc.textFile("hellospark.txt")

3.RDD基本操作之Transformation(轉換)

從之前的RDD構建一個新的RDD,像map()和filter()

(1)逐元素Transformation
map(): 接收函數,把函數應用到RDD的每一個元素,返回新RDD。

  val lines=sc.parallelize(Array("hello","spark","hello","world","!"))
  val lines2=lines.map(word=>(word,1))
  lines2.foreach(println)
  //結果:
  (hello,1)
  (spark,1)
  (hello,1)
  (world,1)
  (!,1)

filter(): 接收函數,返回只包含滿足filter()函數的元素的新RDD。

  val lines=sc.parallelize(Array("hello","spark","hello","world","!"))
  val lines3
=lines.filter(word=>word.contains("hello"))   lines3.foreach(println)   //結果:      hello   hello

flatMap(): 對每個輸入元素,輸出多個輸出元素。flat壓扁的意思,將RDD中元素壓扁後返回一個新的RDD。

    val inputs=sc.textFile("/home/lucy/hellospark.txt")
    val lines=inputs.flatMap(line=>line.split(" "))
    lines.foreach(println)
    //結果
    hello
    spark
    hello
    world
    hello
    !
    //文件內容/home/lucy/hellospark.txt 
    hello spark
    hello world
    hello !

(2)集合運算

RDDs支持數學集合的計算,例如並集,交集計算

    val rdd1=sc.parallelize(Array("red","red","blue","black","white"))
    val rdd2=sc.parallelize(Array("red","grey","yellow"))

    //去重:
    val rdd_distinct=rdd1.distinct()
    //去重結果:
    white
    blue
    red
    black

    //並集:
    val rdd_union=rdd1.union(rdd2)
    //並集結果:
    red
    blue
    black
    white
    red
    grey
    yellow

    //交集:
    val rdd_inter=rdd1.intersection(rdd2)
    //交集結果:
    red

    //包含:
    val rdd_sub=rdd1.subtract(rdd2)
    //包含結果:
    blue
    white
    black

4.RDD基本操作之Action

在RDD上計算出來一個結果。把結果返回給driver program或保存在文件系統,count(),save。

函數名              功能              例子    結果
collect()            返回RDD的所有元素           rdd.collect()          {1,2,3,3}
count()           計數                  rdd.count()       4
countByValue()        返回一個map表示唯一元素出現的個數    rdd.countByValue()   {(1,1),(2,1),(3,2)}
take(num)        返回幾個元素               rdd.take(2)       {1,2}
top(num)        返回前幾個元素            rdd.top(2)       {3,3}
takeOrdered 返回基於提供的排序算法的前幾個元素 rdd.takeOrdered(2)(myOrdering) {3,3}
(num)(ordering)
takeSample 取樣例               rdd.takeSample(false,1)    不確定
(withReplacement,num,[seed])                           
reduce(func)       合並RDD中元素            rdd.reduce((x,y)=>x+y)      9
fold(zero)(func)    與reduce()相似提供zero value      rdd.fold(0)((x,y)=>x+y)   9
foreach(func)    對RDD的每個元素作用函數,什麽也不返回 rdd.foreach(func)       無

5.RDDs的特性

1.血統關系圖:
Spark維護著RDDs之間的依賴關系和創建關系,叫做血統關系圖,Spark使用血統關系圖來計算每個RDD的需求和恢復丟失的數據
2.延遲計算
Spark對RDDs的計算是,他們第一次使用action操作的時候。Spark內部記錄metadata表明transformations操作已經被響應了。加載數據也是延遲計算,數據只有 在必要的時候,才會被加載進去。
3.RDD.persist():
默認每次在RDDs上面進行action操作時,Spark都重新計算RDDs。如果想重復利用一個RDD,可以使用RDD.persist()。upersist()方法從緩存中移除。

技術分享

6.KeyValue對RDDs

創建KeyValue對RDDs:

val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))

KeyValue對RDDs的Transformation(轉換):

(1)reduceByKey(func) 把相同key的結合

  val rdd4=rdd3.reduceByKey((x,y)=>x+y)
  //結果
  (1,2)
  (3,10)

(2)groupByKey 把相同的key的values分組

  val rdd5=rdd3.groupByKey()
  //結果
  (1,CompactBuffer(2))
  (3,CompactBuffer(4, 6))

(3)mapValues() 函數作用於pairRDD的每個元素,key不變

  val rdd6=rdd3.mapValues(x=>x+1)
  //結果
  (1,3)
  (3,5)
  (3,7)

(4)keys/values
  rdd3.keys.foreach(println)
  1
  3
  3

  rdd3.values.foreach(println)
  2
  4
  6

(5)sortByKey
  val rdd7=rdd3.sortByKey()
  //結果
  (1,2)
  (3,4)
  (3,6)

(6)combineByKey(): (createCombiner,mergeValue,mergeCombiners,partitioner)

  最常用的基於key的聚合函數,返回的類型可以與輸入類型不一樣。許多基於key的聚合函數都用到了它,像groupByKey()

  原理:遍歷partition中的元素,元素的key,要麽之前見過的,要麽不是。如果是新元素,使用我們提供的createCombiner()函數,如果是這個partition中已經存在的key,就會使用mergeValue()函數,合計每個partition的結果的時候,使用mergeCombiner()函數

  例子:求平均值

  val score=sc.parallelize(Array(("Tom",80.0),("Tom",90.0),("Tom",85.0),("Ben",85.0),("Ben",92.0),("Ben",90.0)))
  val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
  //結果
  (Ben,(3,267.0))
  (Tom,(3,255.0))
val average
=score2.map{case(name,(num,score))=>(name,score/num)}   //結果   (Ben,89.0)   (Tom,85.0)

RDDs基本操作、RDDs特性、KeyValue對RDDs