單個RDD去重、兩個RDD去重
阿新 • • 發佈:2019-01-28
單個RDD去重
資料
{"name":"張三","age":24,"from":"HuBei"} {"name":"李四","age":26,"from":"HuNan"} {"name":"王五","age":26,"from":"HuBei"} {"name":"張三","age":24,"from":"HuBei"}
SingleRDD 根據一行去重
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object SingleRDD { def main(args: Array[String]) { val conf = new SparkConf().setAppName
("SingleRDD").setMaster("local[8]") val sc = new SparkContext(conf) // 設定日誌等級 sc.setLogLevel("WARN") // jsonFile.txt內容如上圖 val data: RDD[String] = sc.textFile("jsonFile.txt") // SingleRDD 去重=>distinct API data.distinct().foreach(println) // 結果 //{"name":"李四","age":26,"from":"HuNan"} //{"name":"王五","age":26,"from":"HuBei"} //{"name":"張三","age":24,"from":"HuBei"} } }PairRDD 根據某一欄位去重
import com.alibaba.fastjson.JSON import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object PairRDD { def main(args: Array[String]) { val conf = new SparkConf().setAppName
("PairRDD").setMaster("local[8]") val sc = new SparkContext(conf) // 設定日誌等級 sc.setLogLevel("WARN") // jsonFile.txt內容如上圖 val data: RDD[String] = sc.textFile("jsonFile.txt") // 根據age欄位去重 // fastjson解析json字串 val pairRDD: RDD[(Integer, String)] = data.map(line=>(JSON.parseObject(line).getInteger("age"),line)) // collectAsMap去重:根據key構造hashmap,key相同,後面的覆蓋前邊的 pairRDD.collectAsMap().foreach(println) // 結果 //(26,{"name":"王五","age":26,"from":"HuBei"}) //(24,{"name":"張三","age":24,"from":"HuBei"}) } }兩個RDD去重
全部資料–allData
{"name":"張三","age":24,"from":"HuBei"} {"name":"李四","age":26,"from":"HuNan"} {"name":"王五","age":26,"from":"HuBei"} {"name":"張三","age":24,"from":"HuBei"}
部分資料–partData
{"name":"張三","age":24,"from":"HuBei"} {"name":"李四","age":26,"from":"HuNan"} {"name":"張三","age":24,"from":"HuBei"}
聯合去重
import org.apache.spark.{SparkContext, SparkConf} object UnionRDD { def main(args: Array[String]) { val conf = new SparkConf().setAppName("PairRDD").setMaster("local[8]") val sc = new SparkContext(conf) // 設定日誌等級 sc.setLogLevel("WARN") // allData partData 內容如上圖 val allData = sc.textFile("allData.txt") val partData = sc.textFile("partData.txt") // partData是allData的子集 // 取partData的補集 // RDD.collect()儘量少用,如果單機允許的話,就可以用 val collect = partData.collect() allData.filter(!collect.contains(_)).foreach(println) // 結果 //{"name":"王五","age":26,"from":"HuBei"} } }