1. 程式人生 > >Spark常用函式講解之Action操作+例項

Spark常用函式講解之Action操作+例項

RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集
RDD有兩種操作運算元:

        Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另一個RDD時並沒有立即進行轉換,僅僅是記住       了資料集的邏輯操作
         Ation(執行):觸發Spark作業的執行,真正觸發轉換運算元的計算


 
本系列主要講解Spark中常用的函式操作:
         1.RDD基本轉換
         2.鍵-值RDD轉換
         3.Action操作篇

本發所講函式

1.reduce

2.collect

3.count

4.first

5.take

6.top

7.takeOrdered

8.countByKey

9.collectAsMap

10.lookup

11.aggregate

12.fold

13.saveAsFile

14.saveAsSequenceFile

 

1.reduce(func):通過函式func先聚集各分割槽的資料集,再聚集分割槽之間的資料,func接收兩個引數,返回一個新值,新值再做為引數繼續傳遞給函式func,直到最後一個元素

 

2.collect():以資料的形式返回資料集中的所有元素給Driver程式,為防止Driver程式記憶體溢位,一般要控制返回的資料集大小

 

3.count():返回資料集元素個數

 

4.first():返回資料集的第一個元素

 

5.take(n):以陣列的形式返回資料集上的前n個元素

 

6.top(n):按預設或者指定的排序規則返回前n個元素,預設按降序輸出

 

7.takeOrdered(n,[ordering]): 按自然順序或者指定的排序規則返回前n個元素

例1:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local").setAppName("reduce")

    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(1 to 10,2)

    val reduceRDD = rdd.reduce(_ + _)

    val reduceRDD1 = rdd.reduce(_ - _) //如果分割槽資料為1結果為 -53

    val countRDD = rdd.count()

    val firstRDD = rdd.first()

    val takeRDD = rdd.take(5)    //輸出前個元素

    val topRDD = rdd.top(3)      //從高到底輸出前三個元素

    val takeOrderedRDD = rdd.takeOrdered(3)    //按自然順序從底到高輸出前三個元素

 

    println("func +: "+reduceRDD)

    println("func -: "+reduceRDD1)

    println("count: "+countRDD)

    println("first: "+firstRDD)

    println("take:")

    takeRDD.foreach(x => print(x +" "))

    println("\ntop:")

    topRDD.foreach(x => print(x +" "))

    println("\ntakeOrdered:")

    takeOrderedRDD.foreach(x => print(x +" "))

    sc.stop

  }

輸出:

複製程式碼

func +: 55
func -: 15 //如果分割槽資料為1結果為 -53
count: 10
first: 1
take:
1 2 3 4 5
top:
10 9 8
takeOrdered:
1 2 3

複製程式碼

 (RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分割槽集合,下同)

 

         (RDD依賴圖)

 

8.countByKey():作用於K-V型別的RDD上,統計每個key的個數,返回(K,K的個數)

 

9.collectAsMap():作用於K-V型別的RDD上,作用與collect不同的是collectAsMap函式不包含重複的key,對於重複的key。後面的元素覆蓋前面的元素

 

10.lookup(k):作用於K-V型別的RDD上,返回指定K的所有V值

例2:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

def main(args: Array[String]) {

   val conf = new SparkConf().setMaster("local").setAppName("KVFunc")

   val sc = new SparkContext(conf)

   val arr = List(("A"1), ("B"2), ("A"2), ("B"3))

   val rdd = sc.parallelize(arr,2)

   val countByKeyRDD = rdd.countByKey()

   val collectAsMapRDD = rdd.collectAsMap()

 

   println("countByKey:")

   countByKeyRDD.foreach(print)

 

   println("\ncollectAsMap:")

   collectAsMapRDD.foreach(print)

   sc.stop

 }

輸出:

countByKey:
(B,2)(A,2)
collectAsMap:
(A,2)(B,3)

 

        (RDD依賴圖)

 

11.aggregate(zeroValue:U)(seqOp:(U,T) => U,comOp(U,U) => U):

seqOp函式將每個分割槽的資料聚合成型別為U的值,comOp函式將各分割槽的U型別資料聚合起來得到型別為U的值

1

2

3

4

5

6

7

8

def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local").setAppName("Fold")

    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(List(1,2,3,4),2)

    val aggregateRDD = rdd.aggregate(2)(_+_,_ * _)

    println(aggregateRDD)

    sc.stop

  }

輸出:

90

步驟1:分割槽1:zeroValue+1+2=5   分割槽2:zeroValue+3+4=9

 

步驟2:zeroValue*分割槽1的結果*分割槽2的結果=90

 

            (RDD依賴圖)

 

12.fold(zeroValue:T)(op:(T,T) => T):通過op函式聚合各分割槽中的元素及合併各分割槽的元素,op函式需要兩個引數,在開始時第一個傳入的引數為zeroValue,T為RDD資料集的資料型別,,其作用相當於SeqOp和comOp函式都相同的aggregate函式

例3

1

2

3

4

5

6

7

8

def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local").setAppName("Fold")

    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(Array(("a"1), ("b"2), ("a"2), ("c"5), ("a"3)), 2)

    val foldRDD = rdd.fold(("d"0))((val1, val2) => { if (val1._2 >= val2._2) val1 else val2

    })

    println(foldRDD)

  }

輸出:

1

c,5

其過程如下:

1.開始時將(“d”,0)作為op函式的第一個引數傳入,將Array中和第一個元素("a",1)作為op函式的第二個引數傳入,並比較value的值,返回value值較大的元素

 

2.將上一步返回的元素又作為op函式的第一個引數傳入,Array的下一個元素作為op函式的第二個引數傳入,比較大小

 

3.重複第2步驟

 

每個分割槽的資料集都會經過以上三步後匯聚後再重複以上三步得出最大值的那個元素,對於其他op函式也類似,只不過函式裡的處理資料的方式不同而已

 

             (RDD依賴圖)

 

13.saveAsFile(path:String):將最終的結果資料儲存到指定的HDFS目錄中

 

14.saveAsSequenceFile(path:String):將最終的結果資料以sequence的格式儲存到指定的HDFS目錄中

 

例子原始碼地址:https://github.com/Mobin-F/SparkExample/tree/master/src/main/scala/com/mobin/SparkRDDFun/TransFormation/Action