1. 程式人生 > >簡單點,搬磚的方式簡單點,hive mysql 匯入資料的UDF,分享給小夥伴們

簡單點,搬磚的方式簡單點,hive mysql 匯入資料的UDF,分享給小夥伴們

套用一下薛之謙的<演員>,來一首<程式設計師>:簡單點,搬磚的方式簡單點。

我們使用hive一般是執行離線統計分析,然後將執行的結果匯入到Mysql的表中供前端報表視覺化展現來查詢。

導回mysql的方式有許多,以前是用sqoop導回Mysql,還有人用hive jdbc查詢然後將結果拉回到程式碼層面,再在程式碼層面用mysql jdbc寫回Mysql資料庫。

但是這兩種方式都會有一個二次處理環節(雖然我們以前實現了SQL的解析可以把sqoop的操作對使用者來說透明化,比如insert into mysql.table1 select * from hive.table2這樣的sql會將hive查詢出來的結果插入mysql,但是實現起來複雜度比較高)。

這次介紹另外一種處理方式,直接將Mysql的操作整合在udf中,這樣直接寫一個hql查詢語句就可以了。

package brickhouse.udf.mysql;


import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;

import javax.sql.DataSource;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

@Description(name = "mysql_import",
        value = "_FUNC_(config_path, sql,args1,[args2,...]) - Return ret "
)
public class MysqlImportUDF extends GenericUDF {
    private IntObjectInspector retValInspector;
    private DataSource dataSource;
    private String sql;
    private PrimitiveObjectInspector[] paramsInspectors;

    @Override
    public Object evaluate(DeferredObject[] arg0) throws HiveException {

        try (Connection connection = dataSource.getConnection();
             PreparedStatement stmt = connection.prepareStatement(sql)) {
            System.out.println("execute sql:" + System.currentTimeMillis());
            for (int i = 2; i < arg0.length; i++) {
                Object param = paramsInspectors[i - 2].getPrimitiveJavaObject(arg0[i].get());
                stmt.setObject(i - 1, param);
            }
            int ret = stmt.executeUpdate();
            IntWritable iw = new IntWritable(ret);
            return retValInspector.getPrimitiveWritableObject(iw);
        } catch (SQLException e) {
            e.printStackTrace();
            throw new HiveException(e);
        }

    }


    @Override
    public void close() throws IOException {
        try {
            BasicDataSource bds = (BasicDataSource) dataSource;
            bds.close();
        } catch (SQLException e) {
            e.printStackTrace();
            throw new IOException(e);
        }
    }

    @Override
    public String getDisplayString(String[] arg0) {
        return "mysql_import(config_path, sql,args1[,args2,...argsN])";
    }


    @Override
    public ObjectInspector initialize(ObjectInspector[] arg0)
            throws UDFArgumentException {
        if (arg0.length < 3) {
            throw new UDFArgumentException(" Expecting  at least three  arguments ");
        }
        if (arg0[0].getCategory() == Category.PRIMITIVE
                && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            if (!(arg0[0] instanceof ConstantObjectInspector)) {
                throw new UDFArgumentException("mysql connection pool config path  must be constant");
            }
            ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0];

            String configPath = propertiesPath.getWritableConstantValue().toString();
            Properties properties = new Properties();
            Configuration conf = new Configuration();
            Path path = new Path(configPath);
            try (FileSystem fs = FileSystem.get(path.toUri(), conf);
                 InputStream in = fs.open(path)) {

                properties.load(in);
                this.dataSource = BasicDataSourceFactory.createDataSource(properties);
            } catch (FileNotFoundException ex) {
                throw new UDFArgumentException("在檔案系統中或者是HDFS上沒有找到對應的配置檔案");
            } catch (Exception e) {
                e.printStackTrace();
                throw new UDFArgumentException(e);
            }
        }
        if (arg0[1].getCategory() == Category.PRIMITIVE
                && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            if (!(arg0[1] instanceof ConstantObjectInspector)) {
                throw new UDFArgumentException("the second arg   must be a sql string constant");
            }
            ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1];
            this.sql = sqlInsp.getWritableConstantValue().toString();
            if (this.sql == null || this.sql.trim().length() == 0) {
                throw new UDFArgumentException("the second arg   must be a sql string constant and not nullable");
            }
        }
        paramsInspectors = new PrimitiveObjectInspector[arg0.length - 2];
        for (int i = 2; i < arg0.length; i++) {
            paramsInspectors[i - 2] = (PrimitiveObjectInspector) arg0[i];
        }
        retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector;

        return retValInspector;
    }

}

上傳jar包,註冊udf:

CREATE FUNCTION default.mysql_import4 AS 'brickhouse.udf.mysql.MysqlImportUDF' USING JAR 'hdfs://name84:8020/tmp/jar/brickhouse-0.7.1.jar';

然後寫一個HQL測試一下:

select default.mysql_import4('hdfs://name84:8020/user/hive/udf/conf/mysql.properties','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',b.ds,b.type,b.pv,b.uv,b.pv,b.uv) from (
select ds,type,count(did) as pv,count(distinct did) as uv 
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) b

內層子查詢是一個聚合查詢,業務邏輯是計算每天每個type的pv,uv,然後外層包一層,用上面註冊的udf,將計算結果插入mysql。

UDF第一個引數是靜態引數,是一個配置檔案路徑,裡面配置瞭如何開啟連線池連線哪個資料庫什麼的。

第二個引數是一個mysql的sql語句,描述入庫方式,然後後面的引數就不固定了,一一對應mysql語句中的佔位符,比如我上面有6個佔位符,然後我後面就跟了6個引數。

附一個mysql.properties配置檔案的內容:

driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://192.168.78.26:3306/db_stat?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&connectTimeout=60000&socketTimeout=60000
username=xyzs
password=xxxxxx
initialSize=1
maxActive=20
minIdle=5
maxIdle=15
connectionTimeoutMillis=5000
maxWait=60000
validationQuery=select 1 from dual
validationQueryTimeout=1
removeAbandoned=true
removeAbandonedTimeout=180
timeBetweenEvictionRunsMillis=30000
numTestsPerEvictionRun=20
testWhileIdle=true
testOnBorrow=false
testOnReturn=false

TODO:目前這個udf是每條結果單獨執行一個sql插入,準備寫一個batch插入的,將查詢結果先collect_list變成一個數組,然後一次批量插入資料庫。

再附上一個批量插入的udf:

package brickhouse.udf.mysql;


import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

@Description(name = "mysql_batch_import",
        value = "_FUNC_(config_path, sql,array<struct>) - Return ret "
)
public class MysqlBatchImportUDF extends GenericUDF {

    public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/mysqludf/";
    public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties";
    private StandardListObjectInspector retValInspector;
    private Properties properties;
    private String sql;
    private StandardListObjectInspector paramsListInspector;
    private StandardStructObjectInspector paramsElementInspector;

    @Override
    public Object evaluate(DeferredObject[] arg0) throws HiveException {

        //batch import由於是一次性插入,所以不建立連線池了,直接建立一個連線執行
        try (Connection connection = DriverManager.getConnection(properties.getProperty("url"), properties.getProperty("username"), properties.getProperty("password"));
             PreparedStatement stmt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);

            for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
                Object row = paramsListInspector.getListElement(arg0[2].get(), i);
                for (int j = 0; j < paramsElementInspector.getAllStructFieldRefs().size(); j++) {
                    StructField structField = paramsElementInspector.getAllStructFieldRefs().get(j);
                    Object col = paramsElementInspector.getStructFieldData(row, structField);
                    Object param = ((PrimitiveObjectInspector) structField.getFieldObjectInspector()).getPrimitiveJavaObject(col);
                    stmt.setObject(j + 1, param);
                }
                stmt.addBatch();
            }
            int[] ret = stmt.executeBatch();
            connection.commit();

            Object returnlist = retValInspector.create(ret.length);
            for (int i = 0; i < ret.length; i++) {
                retValInspector.set(returnlist, i, ret[i]);
            }
            return returnlist;

        } catch (SQLException e) {
            e.printStackTrace();
            throw new HiveException(e);
        }

    }


    @Override
    public String getDisplayString(String[] arg0) {
        return "mysql_batch_import(config_path, sql,array<struct>)";
    }


    @Override
    public ObjectInspector initialize(ObjectInspector[] arg0)
            throws UDFArgumentException {
        if (arg0.length != 3) {
            throw new UDFArgumentException(" Expecting   three  arguments ");
        }
        //第一個引數校驗
        if (arg0[0].getCategory() == Category.PRIMITIVE
                && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            if (!(arg0[0] instanceof ConstantObjectInspector)) {
                throw new UDFArgumentException("mysql connection pool config path  must be constant");
            }
            ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0];

            String fileName1 = propertiesPath.getWritableConstantValue().toString();
            Path path1 = new Path(fileName1);
            if (path1.toUri().getScheme() == null) {
                if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) {
                    throw new UDFArgumentException("不支援的副檔名,目前只支援properties檔案!");
                }
                //如果是相對路徑,補齊根路徑
                if (!fileName1.startsWith("/")) {
                    fileName1 = MysqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1;
                }
            }
            //如果只寫了檔案字首的話,補上字尾
            if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) {
                fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX;
            }
            Properties properties = new Properties();
            Configuration conf = new Configuration();
            Path path2 = new Path(fileName1);

            try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //這裡不能用FileSystem.get(path2.toUri(), conf),必須得重新newInstance,get出來的是共享的連線,這邊關閉的話,會導致後面執行完之後可能出現FileSystem is closed的異常
                 InputStream in = fs.open(path2)) {
                properties.load(in);
                this.properties = properties;
            } catch (FileNotFoundException ex) {
                throw new UDFArgumentException("在檔案系統中或者是HDFS上沒有找到對應的配置檔案");
            } catch (Exception e) {
                e.printStackTrace();
                throw new UDFArgumentException(e);
            }
        }
        //第二個引數校驗,必須是一個非空的sql語句
        if (arg0[1].getCategory() == Category.PRIMITIVE
                && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            if (!(arg0[1] instanceof ConstantObjectInspector)) {
                throw new UDFArgumentException("the second arg   must be a sql string constant");
            }
            ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1];
            this.sql = sqlInsp.getWritableConstantValue().toString();
            if (this.sql == null || this.sql.trim().length() == 0) {
                throw new UDFArgumentException("the second arg   must be a sql string constant and not nullable");
            }
        }

        //第三個引數校驗
        if (arg0[2].getCategory() != Category.LIST) {
            throw new UDFArgumentException(" Expecting an array<struct> field as third argument ");
        }
        ListObjectInspector third = (ListObjectInspector) arg0[2];
        if (third.getListElementObjectInspector().getCategory() != Category.STRUCT) {
            throw new UDFArgumentException(" Expecting an array<struct> field as third argument ");
        }
        paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
        paramsElementInspector = (StandardStructObjectInspector) third.getListElementObjectInspector();
        retValInspector = ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaIntObjectInspector);

        return retValInspector;
    }

}

前面兩個引數和上面的mysql_import_ext一樣.

引數3:傳遞一個array<struct>型別的欄位,array中的元素必須是一個struct型別,並且struct中的field個數必須與sql佔位符一致

示例demo將查詢結果集再進行一次collect_list操作獲得一個array欄位作為udf的第三個引數:

select default.mysql_batch_import('mysql_78_26','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',collect_list(struct(ds,type,pv,uv,pv,uv))) from

(

select ds,type,count(did) as pv,count(distinct did) as uv

from dd_xyzs_pc_action_detail

where ds='2016-10-23'

group by ds,type

) a