1. 程式人生 > >單個RDD去重、兩個RDD去重

單個RDD去重、兩個RDD去重

單個RDD去重

資料

{"name":"張三","age":24,"from":"HuBei"}
{"name":"李四","age":26,"from":"HuNan"}
{"name":"王五","age":26,"from":"HuBei"}
{"name":"張三","age":24,"from":"HuBei"}

SingleRDD 根據一行去重

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object SingleRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName
("SingleRDD").setMaster("local[8]") val sc = new SparkContext(conf) // 設定日誌等級 sc.setLogLevel("WARN") // jsonFile.txt內容如上圖 val data: RDD[String] = sc.textFile("jsonFile.txt") // SingleRDD 去重=>distinct API data.distinct().foreach(println) // 結果 //{"name":"李四","age":26,"from"
:"HuNan"} //{"name":"王五","age":26,"from":"HuBei"} //{"name":"張三","age":24,"from":"HuBei"} } }

PairRDD 根據某一欄位去重

import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object PairRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName
("PairRDD").setMaster("local[8]") val sc = new SparkContext(conf) // 設定日誌等級 sc.setLogLevel("WARN") // jsonFile.txt內容如上圖 val data: RDD[String] = sc.textFile("jsonFile.txt") // 根據age欄位去重 // fastjson解析json字串 val pairRDD: RDD[(Integer, String)] = data.map(line=>(JSON.parseObject(line).getInteger("age"),line)) // collectAsMap去重:根據key構造hashmap,key相同,後面的覆蓋前邊的 pairRDD.collectAsMap().foreach(println) // 結果 //(26,{"name":"王五","age":26,"from":"HuBei"}) //(24,{"name":"張三","age":24,"from":"HuBei"}) } }

兩個RDD去重

全部資料–allData

{"name":"張三","age":24,"from":"HuBei"}
{"name":"李四","age":26,"from":"HuNan"}
{"name":"王五","age":26,"from":"HuBei"}
{"name":"張三","age":24,"from":"HuBei"}

部分資料–partData

{"name":"張三","age":24,"from":"HuBei"}
{"name":"李四","age":26,"from":"HuNan"}
{"name":"張三","age":24,"from":"HuBei"}

聯合去重

import org.apache.spark.{SparkContext, SparkConf}
object UnionRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("PairRDD").setMaster("local[8]")
    val sc = new SparkContext(conf)
    // 設定日誌等級
    sc.setLogLevel("WARN")
    // allData  partData 內容如上圖
    val allData = sc.textFile("allData.txt")
    val partData = sc.textFile("partData.txt")
    // partData是allData的子集
    // 取partData的補集
    // RDD.collect()儘量少用,如果單機允許的話,就可以用
    val collect = partData.collect()
    allData.filter(!collect.contains(_)).foreach(println)
    // 結果
    //{"name":"王五","age":26,"from":"HuBei"}
  }
}