1. 程式人生 > >使用spark對hive表中的多列數據判重

使用spark對hive表中的多列數據判重

個數 stack duplicate house transient this dataframe except cti

本文處理的場景如下,hive表中的數據,對其中的多列進行判重deduplicate。

1、先解決依賴,spark相關的所有包,pom.xml

spark-hive是我們進行hive表spark處理的關鍵。

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>
1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.0</version> <
scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </
dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.19</version> </dependency> </dependencies>

2、spark-client

package com.xiaoju.kangaroo.duplicate;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;

import java.io.Serializable;

public class SparkClient implements Serializable{
    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;

    public SparkClient() {
        initSparkConf();
        javaSparkContext = new JavaSparkContext(sparkConf);
    }


    public SQLContext getSQLContext() {
        return new SQLContext(javaSparkContext);
    }

    public HiveContext getHiveContext() {
        return new HiveContext(javaSparkContext);
    }

    private void initSparkConf() {
        try {
            String warehouseLocation = System.getProperty("user.dir");
            sparkConf = new SparkConf()
                    .setAppName("duplicate")
                    .set("spark.sql.warehouse.dir", warehouseLocation)
                    .setMaster("yarn-client");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

3、判重流程

package com.xiaoju.kangaroo.duplicate;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SparkDuplicate implements Serializable  {

    private transient SparkClient sparkClient;
    private transient HiveContext hiveContext;
    private String db;
    private String tb;
    private String pt;
    private String cols;

    public SparkDuplicate(String db, String tb, String pt, String cols) {
        this.db = db;
        this.tb = tb;
        this.pt = pt;
        this.cols = cols;
        this.sparkClient = new SparkClient();
        this.hiveContext = sparkClient.getHiveContext();
    }

    public void duplicate() {
        String partition = formatPartition(pt);
        String query = String.format("select * from %s.%s where %s", db ,tb, partition);
        System.out.println(query);
        DataFrame rows = hiveContext.sql(query);
        JavaRDD<Row> rdd = rows.toJavaRDD();
        Map<String, Integer> repeatRetMap = rdd.flatMap(new FlatMapFunction<Row, String>() {
            public Iterable<String> call(Row row) throws Exception {
                HashMap<String, Object> rowMap = formatRowMap(row);
                List<String> sList = new ArrayList<String>();
                String[] colList = cols.split(",");
                for (String col : colList) {
                    sList.add(col + "@" + rowMap.get(col));
                }
                return sList;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);

            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).map(new Function<Tuple2<String,Integer>, Map<String, Integer>>() {
            public Map<String, Integer> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                Map<String, Integer> retMap = new HashMap<String, Integer>();
                if (stringIntegerTuple2._2 > 1) {
                    retMap.put(stringIntegerTuple2._1, stringIntegerTuple2._2);
                }
                return retMap;
            }
        }).reduce(new Function2<Map<String, Integer>, Map<String, Integer>, Map<String, Integer>>() {
            public Map<String, Integer> call(Map<String, Integer> stringIntegerMap, Map<String, Integer> stringIntegerMap2) throws Exception {
                stringIntegerMap.putAll(stringIntegerMap2);
                return stringIntegerMap;
            }
        });

        for (Map.Entry<String, Integer> entry : repeatRetMap.entrySet()) {
            if (entry.getValue() > 1) {
                System.out.println("重復值為:" + entry.getKey() + ", 重復個數" + entry.getValue());
            }
        }
    }

    private String formatPartition(String partition) {
        String format = "";
        if (partition.startsWith("pt") || partition.startsWith("dt")) {
            String[] items = partition.split("=");
            for (int i = 0; i < items.length; i++) {
                if (items[i].equals("pt") || items[i].equals("dt")) {
                    format += items[i];
                } else {
                    format += "=‘" + items[i] + "‘";
                }
            }
        } else {
            String[] keys;
            if (partition.contains("w=")){
                keys = new String[] {"year", "week"};
                partition = partition.replace("w=", "");
            }
            else{
                keys = new String[] {"year","month","day", "hour"};
            }
            String[] items = partition.split("/");
            for(int i=0; i<items.length; i++) {
                if (i == items.length-1) {
                    format += keys[i] + "=‘" + items[i] + "‘";
                } else {
                    format += keys[i] + "=‘" + items[i] + "‘ and ";
                }
            }
        }
        return format;
    }

    private HashMap<String, Object> formatRowMap(Row row){
        HashMap<String, Object> rowMap = new HashMap<String, Object>();
        try {
            for (int i=0; i<row.schema().fields().length; i++) {
                String colName = row.schema().fields()[i].name();
                Object colValue = row.get(i);
                rowMap.put(colName, colValue);

            }
        }catch (Exception ex) {
            ex.printStackTrace();
        }
        return rowMap;
    }

    public static void main(String[] args) {
        String db = args[0];
        String tb = args[1];
        String pt = args[2];
        String cols = args[3];
        SparkDuplicate sparkDuplicate = new SparkDuplicate(db, tb, pt, cols);
        sparkDuplicate.duplicate();
    }
}

4、運行方式

提交任務腳本

#!/bin/bash
source /etc/profile
source ~/.bash_profile
db=$1
table=$2
partition=$3
cols=$4
spark-submit     --queue=root.zhiliangbu_prod_datamonitor     --driver-memory 500M     --executor-memory 13G     --num-executors 50     spark-duplicate-1.0-SNAPSHOT-jar-with-dependencies.jar ${db} ${table} ${partition} ${cols}

運行:

sh run.sh gulfstream_ods g_order 2017/07/11 area,type

結果

重復值為:area@179, 重復個數225                                                  
重復值為:area@80, 重復個數7398
重復值為:area@82, 重復個數69823
重復值為:area@81, 重復個數98317
重復值為:area@84, 重復個數91775
重復值為:area@83, 重復個數72053
重復值為:area@180, 重復個數2362
重復值為:area@86, 重復個數264487
重復值為:area@181, 重復個數2927
重復值為:area@85, 重復個數230484
重復值為:area@88, 重復個數87527
重復值為:area@87, 重復個數74987
重復值為:area@89, 重復個數130297
重復值為:area@188, 重復個數24463
重復值為:area@189, 重復個數15699
重復值為:area@186, 重復個數13517
重復值為:area@187, 重復個數4774
重復值為:area@184, 重復個數5022
重復值為:area@185, 重復個數6737
重復值為:area@182, 重復個數12705
重復值為:area@183, 重復個數18961
重復值為:area@289, 重復個數20715
重復值為:area@168, 重復個數15179
重復值為:area@169, 重復個數1276
重復值為:area@91, 重復個數31664
重復值為:area@90, 重復個數61261
重復值為:area@93, 重復個數32496
重復值為:area@92, 重復個數55877
重復值為:area@95, 重復個數40933
重復值為:area@94, 重復個數32564
重復值為:area@290, 重復個數300
重復值為:area@97, 重復個數21405
重復值為:area@170, 重復個數37696
重復值為:area@291, 重復個數212
重復值為:area@96, 重復個數12442
重復值為:area@99, 重復個數2526
重復值為:area@98, 重復個數17456
重復值為:area@298, 重復個數12688
重復值為:area@177, 重復個數17285
重復值為:area@178, 重復個數11511
重復值為:area@299, 重復個數6622
重復值為:area@175, 重復個數9573
重復值為:area@296, 重復個數2416
重復值為:area@176, 重復個數8109
重復值為:area@297, 重復個數27915
重復值為:area@173, 重復個數58942
重復值為:area@294, 重復個數18842
重復值為:area@295, 重復個數3482
重復值為:area@174, 重復個數31452
重復值為:area@292, 重復個數11436
重復值為:area@171, 重復個數656
重復值為:area@172, 重復個數31557
重復值為:area@293, 重復個數1726
重復值為:type@1, 重復個數288479
重復值為:type@0, 重復個數21067365

使用spark對hive表中的多列數據判重