1. 程式人生 > >spark 常用函式總結

spark 常用函式總結

1, textFile()  讀取外部資料來源


2, map() 對每一條資料進行相應的處理 如切分


3, reduceByKey(_+_) 傳入一個函式,將key相同的一類進行聚合計算 如相加


4, mapvalues(_+10) 傳入一個函式,類似於map方法,不過這裡只是對元組中的value進行計算


5,filter() 傳入一個函式, 使用者過濾處理資料 


6,sortBy() 傳入對哪個欄位進行排序 對資料進行排序


7,partitionBy() 傳入一個自定義的分割槽類,可進行資料的分割槽,


8,mapPartitions() 對每個分割槽中的每條資料進行處理 類似於map,不過map是針對整個資料的,而mapPartitions()是針對分割槽
假設一個rdd有10個元素,分成3個分割槽。如果使用map方法,map中的輸入函式會被呼叫10次;而使用mapPartitions方法的話,其輸入函式會只會被呼叫3次,每個分割槽呼叫1次。返回的資料需要轉換為iterator


9,reverse 將排序好的資料進行反轉


10,it.toList 將Iterator轉換為list 然後就可以使用list的sortBy()函式進行排序


11,.iterator  it.toList.sortBy(_._2._2).reverse.take(2).iterator  將資料轉換為iterator


12,aggregate()()第一個引數需要傳入一個初始值,第二個引數需要傳入兩個函式[每個函式都是2個引數(第一個引數:先對每個分割槽進行合併, 第二個:對個個分割槽合併後的結果再進行合併), 輸出一個引數]
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.aggregate(0)(_+_, _+_) 求和
rdd1.aggregate(0)(math.max(_, _), _ + _) 先取出每個分割槽的最大值,再求和
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2.aggregate("=")(_ + _, _ + _)
結果:==def=abc


13,groupBy() 對指定欄位進行分組


14,foreachPartition()對分割槽內的每個元素進行操作,
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreachPartition(x => println(x.reduce(_ + _)))
可以用如下代替
val rdd1 = sc.parallelize(List(2, 1, 3, 5, 4, 6, 7, 8, 9), 3)
    val rdd2 = rdd1.mapPartitions(x => {
      var result = List[Int]()
      result.::(x.toList.sum).iterator
    })

15, keys values
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect


16,filterByRange
val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
val rdd2 = rdd1.filterByRange("b", "d")
rdd2.collect