1. 程式人生 > >15 友盟專案--資原始檔工具類(ResourceUtil)、sql執行工具類(ExecSQLUtil)

15 友盟專案--資原始檔工具類(ResourceUtil)、sql執行工具類(ExecSQLUtil)

資原始檔工具類把sql指令碼轉換為String字串--->交給sql工具類ExecSQLUtil執行sql

1.資原始檔工具類(ResourceUtil)

  把sql指令碼轉換為String字串

/**
 * 資原始檔工具類
 */
public class ResourceUtil {
    /**
     * 以String方式讀取整個資源串
     */
    public static String readResourceAsString(String resource ,String charset) throws Exception {
        InputStream input 
= Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos = new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); }
return new String(baos.toByteArray() , charset) ; } /** * 以String方式讀取整個資源串 */ public static String readResourceAsString(String resource) throws Exception { InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos
= new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); } String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ; //替換掉註釋 sql = sql.replaceAll("--.*\r\n", "") ; return sql ; } /** * 將資原始檔讀取出來,形成list */ public static List<String> readResourceAsList(String resource) throws Exception { List<String> list = new ArrayList<String>() ; InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource); BufferedReader br = new BufferedReader(new InputStreamReader(input)) ; String line = null ; while((line = br.readLine()) != null){ if(!line.trim().equals("")){ list.add(line) ; } } return list ; } }
ResourceUtil -資原始檔工具類

 

2.sql執行工具類(ExecSQLUtil)

  執行sql

package com.oldboy.umeng.spark.stat;

import com.oldboy.umeng.common.util.ResourceUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * 執行指令碼工具類
 */
public class ExecSQLUtil {
    /**
     * 執行sql指令碼
     */
    public static void execSQLScript(SparkSession sess, String sqlScript) throws Exception {
        //資源工具類   把sql指令碼轉化為String
        String sqls = ResourceUtil.readResourceAsString(sqlScript);
        String arr[] = sqls.split(";");
        for (String sql : arr) {
            if (!sql.trim().equals("")) {
                sess.sql(sql).show(1000, false);
            }
        }
    }

    /**
     * 執行sqlsStr
     */
    public static void execSQLString(SparkSession sess, String sqlString) throws Exception {
        String arr[] = sqlString.split(";");
        for (String sql : arr) {
            if (!sql.trim().equals("")) {
                sess.sql(sql).show(1000, false);
            }
        }
    }
    /**
     * 執行sqlsStr
     */
    public static Dataset<Row> execSQLString2(SparkSession sess, String sqlString) throws Exception {
        String arr[] = sqlString.split(";");
        for (int i = 0 ; i< arr.length ; i ++) {
            if (!arr[i].trim().equals("")) {
                if(i != arr.length - 1){
                    sess.sql(arr[i]).show(); ;
                }
                else{
                    return sess.sql(arr[i]) ;
                }
            }
        }
        return null ;
    }

    /**
     * 註冊函式
     */
    public static void execRegisterFuncs(SparkSession sess) throws Exception {
        execSQLScript(sess, "funcs.sql");
    }




}
ExecSQLUtil 執行sql工具類

 

3.例如  清洗轉儲資料

/**
 * 清洗資料
 */
public class DataCleanJava {
    public static void main(String[] args) throws Exception {
        String log_sql_script_name = "data_clean_startup.sql" ;
        if(args != null && args.length > 0){
            log_sql_script_name = args[0] ;
        }
        SparkConf conf = new SparkConf();
        conf.setAppName("dataClean") ;
        conf.setMaster("local[4]") ;

        SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport( ).getOrCreate();
        //先註冊函式
        ExecSQLUtil.execRegisterFuncs(sess);
        //執行sql
        ExecSQLUtil.execSQLScript(sess , "data_clean_error.sql");

    }

}

 

 

 

SQL指令碼

funcs.sql註冊函式指令碼

use big12_umeng ;
drop function if exists forkstartuplogs ;
drop function if exists forkeventlogs ;
drop function if exists forkerrorlogs ;
drop function if exists forkpagelogs ;
drop function if exists forkusagelogs ;
drop function if exists formatbyday ;
drop function if exists formatbyweek ;
drop function if exists formatbymonth ;
create TEMPORARY function forkstartuplogs as 'com.oldboy.umeng.hive.udtf.ForkStartuplogsUDTF' ;
create TEMPORARY function forkeventlogs as 'com.oldboy.umeng.hive.udtf.ForkEventlogsUDTF' ;
create TEMPORARY function forkerrorlogs as 'com.oldboy.umeng.hive.udtf.ForkErrorlogsUDTF' ;
create TEMPORARY function forkpagelogs as 'com.oldboy.umeng.hive.udtf.ForkPagelogsUDTF' ;
create TEMPORARY function forkusagelogs as 'com.oldboy.umeng.hive.udtf.ForkUsagelogsUDTF' ;
create TEMPORARY function formatbyday as 'com.oldboy.umeng.hive.udf.FormatByDayUDF' ;
create TEMPORARY function formatbyweek as 'com.oldboy.umeng.hive.udf.FormatByWeekUDF' ;
create TEMPORARY function formatbymonth as 'com.oldboy.umeng.hive.udf.FormatByMonthUDF' ;