使用spark對hive表中的多列數據判重
阿新 • • 發佈:2017-07-23
個數 stack duplicate house transient this dataframe except cti
本文處理的場景如下,hive表中的數據,對其中的多列進行判重deduplicate。
1、先解決依賴,spark相關的所有包,pom.xml
spark-hive是我們進行hive表spark處理的關鍵。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.19</version> </dependency> </dependencies>
2、spark-client
package com.xiaoju.kangaroo.duplicate; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import java.io.Serializable; public class SparkClient implements Serializable{ private SparkConf sparkConf; private JavaSparkContext javaSparkContext; public SparkClient() { initSparkConf(); javaSparkContext = new JavaSparkContext(sparkConf); } public SQLContext getSQLContext() { return new SQLContext(javaSparkContext); } public HiveContext getHiveContext() { return new HiveContext(javaSparkContext); } private void initSparkConf() { try { String warehouseLocation = System.getProperty("user.dir"); sparkConf = new SparkConf() .setAppName("duplicate") .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster("yarn-client"); } catch (Exception ex) { ex.printStackTrace(); } } }
3、判重流程
package com.xiaoju.kangaroo.duplicate; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import scala.Tuple2; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class SparkDuplicate implements Serializable { private transient SparkClient sparkClient; private transient HiveContext hiveContext; private String db; private String tb; private String pt; private String cols; public SparkDuplicate(String db, String tb, String pt, String cols) { this.db = db; this.tb = tb; this.pt = pt; this.cols = cols; this.sparkClient = new SparkClient(); this.hiveContext = sparkClient.getHiveContext(); } public void duplicate() { String partition = formatPartition(pt); String query = String.format("select * from %s.%s where %s", db ,tb, partition); System.out.println(query); DataFrame rows = hiveContext.sql(query); JavaRDD<Row> rdd = rows.toJavaRDD(); Map<String, Integer> repeatRetMap = rdd.flatMap(new FlatMapFunction<Row, String>() { public Iterable<String> call(Row row) throws Exception { HashMap<String, Object> rowMap = formatRowMap(row); List<String> sList = new ArrayList<String>(); String[] colList = cols.split(","); for (String col : colList) { sList.add(col + "@" + rowMap.get(col)); } return sList; } }).mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).map(new Function<Tuple2<String,Integer>, Map<String, Integer>>() { public Map<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { Map<String, Integer> retMap = new HashMap<String, Integer>(); if (stringIntegerTuple2._2 > 1) { retMap.put(stringIntegerTuple2._1, stringIntegerTuple2._2); } return retMap; } }).reduce(new Function2<Map<String, Integer>, Map<String, Integer>, Map<String, Integer>>() { public Map<String, Integer> call(Map<String, Integer> stringIntegerMap, Map<String, Integer> stringIntegerMap2) throws Exception { stringIntegerMap.putAll(stringIntegerMap2); return stringIntegerMap; } }); for (Map.Entry<String, Integer> entry : repeatRetMap.entrySet()) { if (entry.getValue() > 1) { System.out.println("重復值為:" + entry.getKey() + ", 重復個數" + entry.getValue()); } } } private String formatPartition(String partition) { String format = ""; if (partition.startsWith("pt") || partition.startsWith("dt")) { String[] items = partition.split("="); for (int i = 0; i < items.length; i++) { if (items[i].equals("pt") || items[i].equals("dt")) { format += items[i]; } else { format += "=‘" + items[i] + "‘"; } } } else { String[] keys; if (partition.contains("w=")){ keys = new String[] {"year", "week"}; partition = partition.replace("w=", ""); } else{ keys = new String[] {"year","month","day", "hour"}; } String[] items = partition.split("/"); for(int i=0; i<items.length; i++) { if (i == items.length-1) { format += keys[i] + "=‘" + items[i] + "‘"; } else { format += keys[i] + "=‘" + items[i] + "‘ and "; } } } return format; } private HashMap<String, Object> formatRowMap(Row row){ HashMap<String, Object> rowMap = new HashMap<String, Object>(); try { for (int i=0; i<row.schema().fields().length; i++) { String colName = row.schema().fields()[i].name(); Object colValue = row.get(i); rowMap.put(colName, colValue); } }catch (Exception ex) { ex.printStackTrace(); } return rowMap; } public static void main(String[] args) { String db = args[0]; String tb = args[1]; String pt = args[2]; String cols = args[3]; SparkDuplicate sparkDuplicate = new SparkDuplicate(db, tb, pt, cols); sparkDuplicate.duplicate(); } }
4、運行方式
提交任務腳本
#!/bin/bash source /etc/profile source ~/.bash_profile db=$1 table=$2 partition=$3 cols=$4 spark-submit --queue=root.zhiliangbu_prod_datamonitor --driver-memory 500M --executor-memory 13G --num-executors 50 spark-duplicate-1.0-SNAPSHOT-jar-with-dependencies.jar ${db} ${table} ${partition} ${cols}
運行:
sh run.sh gulfstream_ods g_order 2017/07/11 area,type
結果
重復值為:area@179, 重復個數225 重復值為:area@80, 重復個數7398 重復值為:area@82, 重復個數69823 重復值為:area@81, 重復個數98317 重復值為:area@84, 重復個數91775 重復值為:area@83, 重復個數72053 重復值為:area@180, 重復個數2362 重復值為:area@86, 重復個數264487 重復值為:area@181, 重復個數2927 重復值為:area@85, 重復個數230484 重復值為:area@88, 重復個數87527 重復值為:area@87, 重復個數74987 重復值為:area@89, 重復個數130297 重復值為:area@188, 重復個數24463 重復值為:area@189, 重復個數15699 重復值為:area@186, 重復個數13517 重復值為:area@187, 重復個數4774 重復值為:area@184, 重復個數5022 重復值為:area@185, 重復個數6737 重復值為:area@182, 重復個數12705 重復值為:area@183, 重復個數18961 重復值為:area@289, 重復個數20715 重復值為:area@168, 重復個數15179 重復值為:area@169, 重復個數1276 重復值為:area@91, 重復個數31664 重復值為:area@90, 重復個數61261 重復值為:area@93, 重復個數32496 重復值為:area@92, 重復個數55877 重復值為:area@95, 重復個數40933 重復值為:area@94, 重復個數32564 重復值為:area@290, 重復個數300 重復值為:area@97, 重復個數21405 重復值為:area@170, 重復個數37696 重復值為:area@291, 重復個數212 重復值為:area@96, 重復個數12442 重復值為:area@99, 重復個數2526 重復值為:area@98, 重復個數17456 重復值為:area@298, 重復個數12688 重復值為:area@177, 重復個數17285 重復值為:area@178, 重復個數11511 重復值為:area@299, 重復個數6622 重復值為:area@175, 重復個數9573 重復值為:area@296, 重復個數2416 重復值為:area@176, 重復個數8109 重復值為:area@297, 重復個數27915 重復值為:area@173, 重復個數58942 重復值為:area@294, 重復個數18842 重復值為:area@295, 重復個數3482 重復值為:area@174, 重復個數31452 重復值為:area@292, 重復個數11436 重復值為:area@171, 重復個數656 重復值為:area@172, 重復個數31557 重復值為:area@293, 重復個數1726 重復值為:type@1, 重復個數288479 重復值為:type@0, 重復個數21067365
使用spark對hive表中的多列數據判重