1. 程式人生 > >spark RDD常用運算元(一)

spark RDD常用運算元(一)


- filter

  • 演算法解釋
    filter 函式功能是對元素進行過濾,對每個 元 素 應 用 f 函 數, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 FilteredRDD(this,sc.clean(f))。
  • 原始檔
    在這裡插入圖片描述
  • 過濾檔案中的INFO日誌 scala程式碼
    var rdd = sc.textFile("D:\\logs\\system.log")
    var line = rdd.filter(lines => lines.contains("ERROR")).foreach(line => println(line))
    
  • 過濾結果
    在這裡插入圖片描述

- map

  • 演算法解釋
    將原來 RDD 的每個資料項通過 map 中的使用者自定義函式 f 對映轉變為一個新的元素。原始碼中 map 運算元相當於初始化一個 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。
  • 原始檔
    | 同上 |
  • scala程式碼
    var rdd = sc.textFile("D:\\logs\\system.log").cache()
    var line = rdd.filter(lines => lines.contains("ERROR"))
    var mspLine = line.map(line => (line.split(" ")(0),line)).foreach(l=>println(l))
    
  • 過濾結果
    返回以時間為key,以內容為內容的的元組(tuple)
    在這裡插入圖片描述

- flatMap

  • 演算法解釋
    有時候,我們希望對某個元素生成多個元素,實現該功能的操作叫作 flatMap()
    faltMap的函式應用於每一個元素,對於每一個元素返回的是多個元素組成的迭代器(想要了解更多,請參考scala的flatMap和map用法)
  • 原始檔
    | 同上 |
  • scala程式碼
    var flatMapLine = mspLine.flatMap(line => {
    	for (i <- 0 until line._2.length) yield (line._1, line._2, i)
    }).first()
    
  • 過濾結果
    (2018-05-04,2018-05-04 09:39:27.286 [main] 14 ERROR - com.test.dao.Test - XmlFileLoader: 111,0)
    

- distinct

  • 演算法解釋
    distinct將RDD中的元素進行去重操作。圖9中的每個方框代表一個RDD分割槽,通過distinct函式,將資料去重。 例如,重複資料V1、 V1去重後只保留一份V1。
  • 原始檔
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    
  • scala程式碼
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var rdd = sc.parallelize(array)
    var rddDistinct = rdd.distinct()
    println(rddDistinct.collect().mkString(","))
    
  • 過濾結果
    4,6,2,1,3,5
    

- union

  • 演算法解釋
    使用 union 函式時需要保證兩個 RDD 元素的資料型別相同,返回的 RDD 資料型別和被合併的 RDD 元素資料型別相同,並不進行去重操作,儲存所有元素。如果想去重可以使用 distinct()。同時 Spark 還提供更為簡潔的使用 union 的 API,通過 ++ 符號相當於 union 函式操作
  • 原始檔
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
    
  • scala程式碼
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var rdd = sc.parallelize(array)
    var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
    var rdd2 = sc.parallelize(array)
    var rddDistinct = rdd ++ rdd2 // or rdd.union(rdd2)    
    println(rddDistinct.collect().mkString(","))
    
  • 過濾結果
    1,2,3,4,5,6,1,1,1,2,3,4,5,6,1,1
    

- intersection

  • 演算法解釋
    該函式返回兩個RDD的交集,並且去重。intersection 需要混洗資料,比較浪費效能
  • 原始檔
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
    
  • scala程式碼
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var rdd = sc.parallelize(array)
    var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
    var rdd2 = sc.parallelize(array)
    var rddIntersection = rdd.intersection(rdd2) 
    println(rddIntersection .collect().mkString(","))
    
  • 過濾結果
    4,6,2,1,3,5
    

- subtract

  • 演算法解釋
    該函式類似於intersection,但返回在RDD中出現,並且不在otherRDD中出現的元素,不去重
  • 原始檔
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
    
  • scala程式碼
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    val rdd2 = sc.parallelize(Array(1, 2, 3))
    var substractRDD = rdd .subtract(rdd2)
    println(substractRDD.collect().mkString(","))
    
  • 過濾結果
    4,5
    

- cartesian

  • 演算法解釋
    對 兩 個 RDD 內 的 所 有 元 素 進 行 笛 卡 爾 積 操 作。 操 作 後, 內 部 實 現 返 回CartesianRDD。開銷大
    例 如: V1 和 另 一 個 RDD 中 的 W1、 W2、 Q5 進 行 笛 卡 爾 積 運 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)
  • 原始檔
    var array = List(1, 2, 3, 4, 5, 6, 1, 1)
    var array2 = List(1, 2, 3, 4, 5, 6, 1, 1)
    
  • scala程式碼
    val rdd = sc.parallelize(Array("a", "b", "c", "d", "e"))
    val rdd2 = sc.parallelize(Array(1, 2, 3))
    var cartesianRDD = rdd.cartesian(rdd2)
    println(substractRDD.collect().mkString(","))
    
  • 過濾結果
    (a,1),(b,1),(a,2),(a,3),(b,2),(b,3),(c,1),(d,1),(e,1),(c,2),(c,3),(d,2),(d,3),(e,2),(e,3)
    

- mapToPair

  • 演算法解釋
    scala是沒有mapToPair函式的,scala版本map。
  • 原始檔
    val rdd = sc.parallelize(Array("a", "b", "c", "d", "e"))
    
  • scala程式碼
    val rdd = sc.parallelize(Array("a", "b", "c", "d", "e"))
    var mapToPairRDD = rdd.map(a => (a(0), 1))
    println(mapToPairRDD.collect().mkString(","))
    
  • 過濾結果
    (a,1),(b,1),(c,1),(d,1),(e,1)
    

- flatMapToPair

  • 演算法解釋
    類似於xxx連線 mapToPair是一對一,一個元素返回一個元素,而flatMapToPair可以一個元素返回多個,相當於先flatMap,在mapToPair
    例子: 將每一個單詞都分成鍵為
  • 原始檔
    val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee"))
    
  • scala程式碼
    val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee"))
    val flatRDD = rdd.flatMap(f => f).map(l => (l, 1))
    println(flatRDD.collect().mkString(","))
    
  • 過濾結果
    (a,1),(a,1),(b,1),(b,1),(c,1),(c,1),(d,1),(d,1),(e,1),(e,1)
    

- flatMapToPair

  • 演算法解釋
    類似於xxx連線 mapToPair是一對一,一個元素返回一個元素,而flatMapToPair可以一個元素返回多個,相當於先flatMap,在mapToPair
    例子: 將每一個單詞都分成鍵為
  • 原始檔
    val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee"))
    
  • scala程式碼
    val rdd = sc.parallelize(Array("aa", "bb", "cc", "dd", "ee"))
    val flatRDD = rdd.flatMap(f => f).map(l => (l, 1))
    println(flatRDD.collect().mkString(","))
    
  • 過濾結果
    (a,1),(a,1),(b,1),(b,1),(c,1),(c,1),(d,1),(d,1),(e,1),(e,1)