Java api 呼叫Sqoop2進行MySQL-->Hive的資料同步
阿新 • • 發佈:2018-11-14
1.相關jar包
2.一些需要的引數定義在message.properties中
3.建立DataSynMysqlAndHiveService,HiveService,JDBCService三個service並實現,jdbcHiveUrl=jdbc:hive2://10.1.9.91:10000 //hive地址 jdbcHiveDriver=org.apache.hive.jdbc.HiveDriver // hive驅動 jdbc_mysql_driver=com.mysql.jdbc.Driver //MySQL驅動 hiveUser=hive //hive使用者名稱 hivePwd=123456 //hive密碼 hiveType=star hiveDbName=default mapred.reduce.tasks=1 sqoopServerUrl=http://10.1.9.91:12000/sqoop/ //sqoop伺服器地址 ##polling interval time init(ms) polling_interval_time = 86400000 //定時的時間間隔,參照上一篇 #Project start day hour:min:second polling_start_time = 10:54:10 //定時的啟動時間 outputFormat=TEXT_FILE storageType=HDFS sqoopOutput = /user/outputHive/ HDFSUrl = export_target_database_url=jdbc:mysql://10.1.35.13:3306/tsinghuadaxue //MySQL資料庫地址 export_target_database_username=root //MySQL使用者名稱 export_target_database_password=root //MySQL使用者密碼
package com.scheduler.service.impl; import java.io.IOException; import java.sql.*; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.Date; import com.scheduler.service.JDBCService; import com.scheduler.util.*; import org.apache.sqoop.submission.SubmissionStatus; import com.scheduler.service.HiveService; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.client.SubmissionCallback; import org.apache.sqoop.model.*; import org.apache.sqoop.validation.Status; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.mysql.jdbc.Statement; import com.scheduler.service.DataSynMysqlAndHive; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MConnectionForms; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJobForms; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; import org.apache.sqoop.submission.counter.Counters; @Service("DataSynMysqlAndHiveImpl") public class DataSynMysqlAndHiveImpl implements DataSynMysqlAndHive { protected Logger log = Logger.getLogger(DataSynMysqlAndHiveImpl.class); private static String jdbcHiveDriver = Messages.getString("jdbcHiveDriver"); private static String jdbcHiveUrl = Messages.getString("jdbcHiveUrl"); private static String hiveUser = Messages.getString("hiveUser"); private static String hivePwd = Messages.getString("hivePwd"); private static String exportDatabase = Messages.getString("export_target_database_url"); private static String exportUsername = Messages.getString("export_target_database_username"); private static String exportPassword = Messages.getString("export_target_database_password"); private static String jdbcMysqlDriver = Messages.getString("jdbc_mysql_driver"); private static String pollingStartTime = Messages.getString("polling_start_time"); private static SimpleDateFormat yMd = new SimpleDateFormat("yyyy-MM-dd"); private static SimpleDateFormat yMdHms = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static Date polling_start_time = null; //輪詢開始時間 private static String sqoopServerUrl = Messages.getString("sqoopServerUrl"); private SqoopClient sqoopClient;// sqoop客戶端物件 @Autowired private JDBCService jdbcService; // 增加JDBC服務 @Autowired private HiveService hfs; @Override public String exportHiveData(String tableName) { String flag = "success"; try { Class.forName(jdbcHiveDriver); } catch (ClassNotFoundException e) { flag = "error"; e.printStackTrace(); log.error("hive連結出錯", e); } //獲取當天時間以及前一天的時間 Date nowDate = new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(nowDate); calendar.add(Calendar.DAY_OF_MONTH, -1); Date predate = calendar.getTime(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); String predateString = dateFormat.format(predate) + " " + pollingStartTime; String nowdateString = dateFormat.format(nowDate) + " " + pollingStartTime; String sql = "select * from " + tableName + " where resource_flag = 1 and create_time <= \'" + nowdateString +"\' and create_time >\'" + predateString +"\'"; log.info("sql:" + sql); System.out.println("sql:" + sql); try { Connection con = DriverManager.getConnection(jdbcHiveUrl, hiveUser, hivePwd); java.sql.Statement stmt = con.createStatement(); ResultSet resultSet = stmt.executeQuery(sql); if (resultSet.next()) {//如果查詢hive有資料則進行更新,如果沒有資料那麼不更新 String exportSql = generateExportSql(sql,tableName); ResultSet set = stmt.executeQuery(exportSql); System.out.println("匯出sql為:"+exportSql); if (set.next()) { int result = set.getInt(1); if (result == 1) { flag = "error"; } } } closeConnection(con, stmt, resultSet); } catch (SQLException e) { e.printStackTrace(); flag = "error"; } return flag; } /** * @param sql * @param tableName * @return */ private String generateExportSql(String selectSql, String tableName) { //拼接sql,使用udf函式匯出 StringBuffer buffer = new StringBuffer(); buffer.append("select dboutput(\'"); buffer.append(exportDatabase); buffer.append("\',\'"); buffer.append(exportUsername); buffer.append("\',\'"); buffer.append(exportPassword); buffer.append("\',\'"); //定義資料庫連結 Connection conn = null; //定義資料庫查詢結果劫 ResultSet rs = null; try { //設定編碼 /*if (exportDatabase.contains("jdbc:mysql") && !exportDatabase.contains("characterEncoding")) { exportDatabase = exportDatabase + "?characterEncoding=UTF-8";//設定utf-8編碼 }*/ //獲取資料庫連結 conn=getConnection(jdbcMysqlDriver, exportDatabase, exportUsername, exportPassword); //獲取結果 rs=conn.getMetaData().getColumns(null, null, tableName, null); //迴圈獲取所有結果 String columnNames = ""; String value = ""; while(rs.next()){ if (!StringUtils.equals("id", rs.getString("COLUMN_NAME"))) { columnNames = columnNames + rs.getString("COLUMN_NAME") + ","; value = value + "?,"; } } columnNames = columnNames.substring(0, columnNames.length()-1); value = value.substring(0, value.length()-1); String insertSql = "insert into " + tableName + "(" + columnNames +") values(" +value + ")"; buffer.append(insertSql+"\',"); buffer.append(columnNames); buffer.append(") from "); buffer.append("("+selectSql.replace("*", columnNames)+")"); } catch (Exception e) { e.printStackTrace(); } closeConnection(conn, null, rs); System.out.println("匯出的sql為:"+buffer.toString()); return buffer.toString(); } public void closeConnection(Connection connection, java.sql.Statement pStatement, ResultSet resultSet){ try { if (resultSet != null) { resultSet.close(); } if (pStatement != null) { pStatement.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } public Connection getConnection(String driver, String url, String userName, String password) { //定義連結 Connection connection = null; //載入資料庫驅動 try { Class.forName(driver); } catch (ClassNotFoundException e) { e.printStackTrace(); System.out.println("The Driver loaded error,please contact to your Software Designer!"); } //得到資料庫連結 try { Properties props =new Properties(); props.put("remarksReporting","true"); props.put("user", userName); props.put("password", password); connection = DriverManager.getConnection(url, props); //connection = DriverManager.getConnection(url, userName, password); } catch (SQLException e) { e.printStackTrace(); } return connection; } /** * <p> * Description:[mysql向hive中匯入] * */ @Override public String importHiveData(String sourceTableName) { //判斷有沒有資料更新 try { Date nowTime = yMdHms.parse(yMdHms.format(new Date())); //前一天時間 String preDate = yMdHms.format(TimeHelper.dateAddDay(nowTime,-1)); // Timestamp aftTimestamp = getAfterMaxTimestamp(sourceTableName,preDate,"create_time"); if (null == aftTimestamp ){ return "檢測沒有新資料"; } } catch (ParseException e) { e.printStackTrace(); } //定義全域性變數監控抽取過程是否出現錯誤 boolean hasError = false; //1.初始化sqoop客戶端並且得到sqoop連線 MConnection con =initSqoop(); //如果得到的連線為空,列印日誌,結束該任務 if (con == null) { System.out.print("連線為空"); return "error"; } //2.建立sqoop任務,任務型別為匯入任務 MJob newjob = sqoopClient.newJob(con.getPersistenceId(),org.apache.sqoop.model.MJob.Type.IMPORT); CallBack callback = new CallBack(sourceTableName); //獲取該表的表資訊 List<TableVO> tableVOs = jdbcService.getTables(exportDatabase, exportUsername, exportPassword, null, null, sourceTableName, null); //獲取該表的列資訊 List<ColumnVO> columnVOs = jdbcService.getColumns(exportDatabase, exportUsername, exportPassword, sourceTableName); boolean isFirst = true; String primaryKey = jdbcService.getPrimaryKey(exportDatabase,exportUsername,exportPassword,null,null,sourceTableName); String hdfsFilePath= ""; hdfsFilePath=updateIncrementSqoopJob(newjob,sourceTableName,columnVOs); //啟用執行緒監控sqoop採集時長 Thread thread = monitorRuntime(sqoopClient,3*60*60,newjob); //定義任務開始時間變數 long startTime = System.currentTimeMillis(); //開始sqoop任務採集,並返回sqoop任務採集狀態 MSubmission submission = startSqoopTask(0,newjob,thread,callback); //將sqoop匯入時間欄位新增到column中 columnVOs=addSqoopTimeColumn(columnVOs); if (submission.getStatus().compareTo(SubmissionStatus.SUCCEEDED) == 0) {// 任務執行成功,則把資料寫入到hive中 hasError=createOrcHiveAfterSqoop(sourceTableName,columnVOs, hdfsFilePath, startTime, startTime, false); } if (submission.getStatus().compareTo(SubmissionStatus.FAILED) == 0|| submission.getExceptionInfo() != null) {// 任務執行出錯,打印出錯資訊,並記錄到任務日誌中 System.out.println(submission.getExceptionInfo()); //出現錯誤,記錄日誌,刪除hdfs檔案 addLogCaseSqoopFail(submission,newjob,hdfsFilePath,thread); //標記發生錯誤 hasError = true; return "error"; } //afterFinishTask(hasError); return "success"; } /** * <p> * Description:[初始化sqoop客戶端,得到sqoop連結] * </p> * @return MConnection sqoop連線 */ public MConnection initSqoop(){ //初始化客戶端 this.sqoopClient = new SqoopClient(sqoopServerUrl); //獲取該資料來源的sqoop連結id Long conId = createSqoopConnection("zheda",exportDatabase,exportUsername,exportPassword,jdbcMysqlDriver); //根據sqoop xid 獲得連結 MConnection con =sqoopClient.getConnection(conId); //將該連結返回 return con; } public long createSqoopConnection(String resourceName, String jdbcUrl, String name, String passwd, String driver) { SqoopClient sqoopClient = new SqoopClient(Messages.getString("sqoopServerUrl")); MConnection newCon = sqoopClient.newConnection(1); MConnectionForms conForms = newCon.getConnectorPart(); MConnectionForms frameworkForms = newCon.getFrameworkPart(); newCon.setName(resourceName); conForms.getStringInput("connection.connectionString").setValue(jdbcUrl);// 資料庫連線url字串 conForms.getStringInput("connection.jdbcDriver").setValue(driver);// 資料庫驅動 conForms.getStringInput("connection.username").setValue(name);// 資料庫使用者名稱 conForms.getStringInput("connection.password").setValue(passwd);// 資料庫密碼 frameworkForms.getIntegerInput("security.maxConnections").setValue(0);// sqoop的最大連線數 try { Status status = sqoopClient.createConnection(newCon); if (status.canProceed()) { return newCon.getPersistenceId(); } else { log.info("Check for status and forms error "); System.out.println("Check for status and forms error "); return -1; } } catch (Exception e) { log.error("建立連接出錯!:"+e.getMessage()); System.out.println(e.getMessage()); return -1; } } /** * <p> * Description:[初始化sqoop客戶端,得到sqoop連結] * </p> * */ // sqoop任務執行回撥內部類 class CallBack implements SubmissionCallback { private String tableName; public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public CallBack() { super(); } public CallBack(String tableName){ super(); this.tableName= tableName; } @Override public void submitted(MSubmission mSubmission) { } @Override public void updated(MSubmission mSubmission) { } // sqoop任務完成回撥函式 @Override public void finished(MSubmission arg0) { } } /** * <p> * Description:[啟用執行緒監控sqoop任務執行時長,如果超過執行時長,停止執行該任務] * </p> * * @param SqoopClient sqoop客戶端 * @param int 任務執行時長 * @param final long sqoop任務Id * @return Thread 當前的監控執行緒 */ public Thread monitorRuntime(SqoopClient sqc,int taskTime,final MJob sJob){ //獲取監聽時間,如果沒有指定監聽時間,預設為24小時 final int job_timeout_time = taskTime != 0 ? taskTime :20; // 啟用一個執行緒,用於監聽sqoop執行任務的時間,如果時間超過最大執行時間,則停止掉該任務 Thread thread = new Thread(new Runnable() { @Override public void run() { try { //監聽任務執行時長,如果超過最大時間,停掉sqoop任務 Thread.sleep(job_timeout_time * 60 * 60 * 1000); sqoopClient.stopSubmission(sJob.getPersistenceId()); } catch (InterruptedException e) { log.error("sqoop全量任務發生異常!",e); } } }); thread.start(); //將該執行緒返回 return thread; } /** * <p> * Description:[任務採集後,根據原表中的欄位資訊以及hdfs檔案地址建立hive表] * </p> * * @param tableName 表名稱 * @param columnVOs 表字段 * @param hdfsPath hdfs檔案地址 * @return boolean 是否建立成功 */ public boolean createHiveTable(String tableName,List<ColumnVO> columnVOs,String hdfsPath){ boolean hasError = false; //組裝sql StringBuffer createSql = new StringBuffer("create table " + tableName + "("); for (int i = 0; i < columnVOs.size(); i++) { if (i == 0) { createSql.append("`" + columnVOs.get(i).getColumnName()+ "` string"); } else { createSql.append(",`"+ columnVOs.get(i).getColumnName()+ "` string"); } } createSql.append(") ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LOCATION "); createSql.append(" '" + hdfsPath + "'"); log.info("createSql:" + createSql); String sql = createSql.toString().trim(); //建立表 try { boolean success = hfs.createHiveTable(tableName, sql); //如果返回的結果有錯誤,則標記hive創建出現錯誤 if(!success){ hasError = true; } } catch (Exception e) { e.printStackTrace(); hasError =true; } //返回結果 return hasError; } /** * <p> * Description:[hive表建立失敗後,記錄日誌並且刪除對應的hdfs檔案] * </p> * * @param tableName 表名稱 * @param hdfsPath hdfs檔案地址 * @param jobId sqoopJobid */ public void addLogCaseCreatehiveTableError(String tableName,String hdfsPath,long jobId){ //記錄日誌, //addTaskLog("create hiveTable "+tableName+" failed!", jobId); //刪除hdfs檔案 deleteHdfsHiveTable(hdfsPath,tableName); } /** * <p> * Description:[啟動sqoop採集任務] * </p> * @param loopTime 任務執行次數標識,用於判斷建立或者更新任務 * @param newjob sqoopJob實體 * @param Thread 監控任務執行時長的執行緒 * @param callback sqoop回撥類 * @return MSubmission Sqoop提交結果 */ public MSubmission startSqoopTask(int loopTime,MJob newjob,Thread thread,CallBack callback){ MSubmission submission= null; //第一次執行,則建立新的任務,否則,更新任務 if (loopTime == 0) { sqoopClient.createJob(newjob); } else { sqoopClient.updateJob(newjob); } //執行sqoop任務 try { submission = sqoopClient.startSubmission(newjob.getPersistenceId(), callback, 100); } catch (InterruptedException e1) { // 發生異常停止掉 if (thread.isAlive()) { thread.interrupt(); } log.error("sqoop提交全量任務出錯!:",e1); } //返回結果 return submission; } /** * <p> * Description:[sqoop任務失敗時,新增日誌,刪除hdfs檔案等] * </p> * @param MSubmission Sqoop提交結果 * @param MJob sqoopJob實體 * @param String hdfs檔案地址 * @param Thread 監控任務執行時長的執行緒 * @return void */ public void addLogCaseSqoopFail(MSubmission submission,MJob sJob,String hdfsUrl,Thread thread){ //後臺打印出錯誤資訊 System.out.println(submission.getExceptionInfo()); // 刪除hdfs檔案 deleteHdfsFiles(hdfsUrl); //如果監控執行緒還在繼續,則停止執行緒 if (thread.isAlive()) { thread.interrupt();// 發生異常停止掉 } } /** * <p> * Description:[根據傳入的表名和列資訊,組裝成建立表的sql] * </p> * @param tableName 表名稱 * @param columnVOs 表字段 * @return String 生成的sql */ public String getCreateTableSQL(String tableName,List<ColumnVO> columnVOs,boolean isText){ //組裝sql StringBuffer createSql = new StringBuffer("create table " + tableName + "("); for (int i = 0; i < columnVOs.size(); i++) { if (i == 0) { createSql.append("`" + columnVOs.get(i).getColumnName()+ "` string"); } else { createSql.append(",`"+ columnVOs.get(i).getColumnName()+ "` string"); } } createSql.append(")"); if (isText) { createSql.append(" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' "); } log.info("createSql:" + createSql); String sql = createSql.toString().trim(); //返回結果 return sql; } /** * <p> * Description:[根據傳入列物件,組裝列資訊] * </p> * * @param columnVOs 表字段 * @return String 生成的sql */ public String getColumns(List<ColumnVO> columnVOs){ //組裝sql StringBuffer columns = new StringBuffer(""); for (int i = 0; i < columnVOs.size(); i++) { if (i == 0) { columns.append("`" + columnVOs.get(i).getColumnName()+ "` string"); } else { columns.append(",`"+ columnVOs.get(i).getColumnName()+ "` string"); } } log.info("createSql:" + columns); String column = columns.toString().trim(); //返回結果 return column; } /** * <p> * Description:[增量sqoop匯入完成之後,建立hiveorc表,插入orc資料,實現增量,儲存源資料資訊] * </p> * * @param tableVOs 源表資訊 * @param columnVOs 源表字段資訊 * @param hdfsFilePath sqoop匯入成功後hdfs檔案地址 * @param jobId sqoopJobid 用於儲存任務日誌資訊 * @param startTime 任務開始時間用於儲存該任務總共花費時間 * @return boolean 整個過程是否發生錯誤,true 存在錯誤, false 正常執行,不存在錯誤 */ public boolean createOrcHiveAfterSqoop(String table, List<ColumnVO> columnVOs,String hdfsFilePath,long jobId,long startTime,boolean isFirst){ boolean hasError = false; // 定義表名 String orcTableName = table; String sourceTableName= table; String primaryKey = jdbcService.getPrimaryKey(exportDatabase,exportUsername,exportPassword,null,null,sourceTableName); try { if(primaryKey == null || primaryKey.trim().equals("")) { primaryKey = columnVOs.get(0).getColumnName(); } //textfileTable在這裡表示 增量資料臨時表的表名,先將增量資料放在臨時表,再將臨時表的資料匯入目標表 String textfileTable = orcTableName+"_temp"; //獲取sql String sql = getCreateTableSQL(textfileTable,columnVOs,true); // 建立hive表,並把增量的資料匯入到hive表中 hfs.createHiveTempTable(textfileTable, sql,hdfsFilePath); // 非第一次匯入,先將hive中相關的資料刪除,再插入相關資料 long incrementInsertTime = System.currentTimeMillis(); hfs.deleteIncrementDataExistInOrcTable(textfileTable, orcTableName, primaryKey, jdbcHiveUrl); hfs.insertIntoHiveOrcTable(textfileTable, orcTableName, jdbcHiveUrl); long incrementInsertTimeEnd = System.currentTimeMillis(); System.out.println("orc增量新增和更新資料到orc表所用時間:" + (incrementInsertTimeEnd - incrementInsertTime)); log.info("orc增量新增和更新資料到orc表所用時間:" + (incrementInsertTimeEnd - incrementInsertTime)); } catch (Exception e) { hasError = true; log.error("全量任務建立hive表出錯!",e); } return hasError; } /** * <p> * Description:[在獲取的源表的欄位列表中加入sqoop的loadtime欄位,欄位名稱為“load_bigdata_time”] * </p> * @param List<ColumnVO> 源表字段資訊 * @return List<ColumnVO> */ public List<ColumnVO> addSqoopTimeColumn(List<ColumnVO> cVos){ ColumnVO cVo= new ColumnVO(); cVo.setColumnName("load_bigdata_time"); cVo.setComment("Sqoop匯入時間"); cVo.setType("datetime"); cVos.add(cVo); return cVos; } /** * 在sqoop匯入時出現問題,刪除已經生成的hdfs檔案,hive在建立表時出現問題,刪除已經建立的表和hdfs檔案 * * @param HDFSPath * @param HiveTableName */ private void deleteHdfsHiveTable(String HDFSPath, String HiveTableName) { String HDFSUrl = Messages.getString("HDFSUrl"); String HDFSFilePath = HDFSUrl + HDFSPath; System.setProperty("HADOOP_USER_NAME", Messages.getString("hiveUser")); try { try { hfs.deleteFdfsByHiveTable(HiveTableName); hfs.deleteHiveTrueTable(HiveTableName); } catch (ClassNotFoundException e1) { e1.printStackTrace(); } // 如果表存在,刪除表 // 刪除hdfs檔案 Path p = new Path(HDFSFilePath); Configuration conf = new Configuration(); try { FileSystem fs = p.getFileSystem(conf); boolean isHad = fs.exists(p); if (isHad) { fs.delete(p, true); } // boolean b = fs.createNewFile(p); fs.close(); } catch (IOException e) { e.printStackTrace(); } } catch (SQLException e) { e.printStackTrace(); } } public void deleteHdfsFiles(String hdfsPath) { String HDFSFilePath = jdbcHiveUrl + hdfsPath; System.setProperty("HADOOP_USER_NAME", hiveUser); try { // 刪除hdfs檔案 Path p = new Path(HDFSFilePath); Configuration conf = new Configuration(); FileSystem fs = p.getFileSystem(conf); boolean isHad = fs.exists(p); if (isHad) { fs.delete(p, true); } fs.close(); } catch (Exception e) { e.printStackTrace(); } } //判斷從上一次更新之後資料的最大時間 public Timestamp getAfterMaxTimestamp( String tableName, String preTimestamp, String columnName) { Timestamp timestamp = null; Connection connection = JdbcConnection.getConnection(jdbcMysqlDriver, exportDatabase, exportUsername, exportPassword); PreparedStatement pStatement = null; ResultSet resultSet = null; String sql = "select max(date_format(" + columnName + ",'%Y-%m-%d %H:%i:%S')) from " + "(select * from " + tableName + " where date_format(" + columnName + ",'%Y-%m-%d %H:%i:%S') > '" + preTimestamp + "') as increment"; /*如果是Oracle { sql = "select max(to_char(" + columnName + ",'yyyy-MM-dd hh24:mi:ss')) from (" + "select * from " + tableName + " where to_char(" + columnName + ",'yyyy-MM-dd hh24:mi:ss') > '" + preTimestamp + "')"; } 如果是Sybase { sql = "select * from " + tableName; } 如果是sql server { sql = "select max(Convert(varchar," + columnName + ",120)) from (" + "select * from " + tableName + " where Convert(varchar," + columnName + ",120) > '" + preTimestamp + "') as increment"; }*/ try { pStatement = connection.prepareStatement(sql); resultSet = pStatement.executeQuery(); if (resultSet.next()) { //timestamp = changeToTimestamp(resultSet.getString(1)); if(resultSet.getString(1) == null) { return timestamp; } timestamp =Timestamp.valueOf(resultSet.getString(1)); } } catch (SQLException e) { e.printStackTrace(); } finally { JdbcConnection.closeConnection(connection, pStatement, resultSet); } return timestamp; } /** * 1111更新increment sqoop Job配置 */ private String updateIncrementSqoopJob(MJob newjob, String tableName, List<ColumnVO> columns) { MJobForms connectorForm = newjob.getConnectorPart(); MJobForms frameworkForm = newjob.getFrameworkPart(); newjob.setName("ImportJob_zheda"); //獲取源表的主鍵 String primaryKey = jdbcService.getPrimaryKey(exportDatabase,exportUsername,exportPassword,null,null,tableName); //如果主鍵不為空,設定“partitionColumn”引數為主鍵,並且設定任務執行的map數為10 if(primaryKey != null && !primaryKey.trim().equals("")) { frameworkForm.getIntegerInput("throttling.extractors").setValue(10);// 指定map的個數 connectorForm.getStringInput("table.partitionColumn").setValue(primaryKey); //如果主鍵為空,選取不為時間型別的欄位為“partitionColumn”引數,並指定map數為1 }else { //選取不為時間型別的欄位 for(int i=0;i<columns.size();i++){ if (!columns.get(i).getType().toUpperCase().contains("TIME")&&!columns.get(i).getType().toUpperCase().contains("DATE")) { primaryKey = columns.get(i).getColumnName(); break; } } //設定“partitionColumn”引數 connectorForm.getStringInput("table.partitionColumn").setValue(primaryKey); // 指定map的個數 frameworkForm.getIntegerInput("throttling.extractors").setValue(1); } // 控制增量匯入 //獲取當天時間以及前一天的時間 Date nowDate = new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(nowDate); calendar.add(Calendar.DAY_OF_MONTH, -1); Date predate = calendar.getTime(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); String predateString = dateFormat.format(predate) + " " + pollingStartTime; String nowdateString = dateFormat.format(nowDate) + " " + pollingStartTime; String sql = "select * FROM " + tableName + " where "; String charStr = " Convert(varchar,"+"create_time"+",120) "; charStr = "date_format(" +"create_time" + ",'%Y-%m-%d %H:%i:%S') "; sql += charStr + " > '" + predateString + "' and " + charStr + " <= '" + nowdateString + "' and ${CONDITIONS}"; System.out.println("SQL ::"+sql); connectorForm.getStringInput("table.sql").setValue(sql); String hdfdFilePath = Messages.getString("sqoopOutput") + new Date().getTime() + tableName; frameworkForm.getEnumInput("output.storageType").setValue(Messages.getString("storageType")); frameworkForm.getEnumInput("output.outputFormat").setValue(Messages.getString("outputFormat")); frameworkForm.getStringInput("output.outputDirectory").setValue(hdfdFilePath); frameworkForm.getIntegerInput("throttling.extractors").setValue(1);// 指定map的個數 return hdfdFilePath; } }
package com.scheduler.service.impl;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.scheduler.util.Constant;
import com.scheduler.util.Messages;
import com.scheduler.util.ColumnVO;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.stereotype.Service;
import com.scheduler.service.HiveService;
import org.apache.sqoop.client.SubmissionCallback;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.Logger;
import javax.annotation.Resource;
/**
* <p>
* Title: manageplatform_[Hive]
* </p>
* <p>
* Description: [HiveService實現層]
* </p>
*
* @author GLJ
* @author (latest modification by $Author$)
* @version $Revision$ 2015-03-18
* @since 20130601
*/
@Service("hiveServiceImpl")
public class HiveServiceImpl implements HiveService {
protected Logger log = Logger.getLogger(DataSynMysqlAndHiveImpl.class);
private static String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
private static String jdbcHiveUrl = Messages.getString("jdbcHiveUrl");
private static String hiveUser = Messages.getString("hiveUser");
private static String hivePwd = Messages.getString("hivePwd");
private static String exportDatabase = Messages.getString("export_target_database_url");
private static String exportUsername = Messages.getString("export_target_database_username");
private static String exportPassword = Messages.getString("export_target_database_password");
private static String jdbcMysqlDriver = Messages.getString("jdbc_mysql_driver");
public HiveServiceImpl() {
}
@Override
public boolean existTable(String table) throws SQLException {
boolean flag = false;
try {
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
log.error("hive連結出錯", e);
}
Connection con = DriverManager.getConnection(jdbcHiveUrl, hiveUser,
hivePwd);
java.sql.Statement stmt = con.createStatement();
String sql = "show tables '" + table + "'";
log.info("sql:" + sql);
ResultSet set = stmt.executeQuery(sql);
while (set.next()) {
String reTableName = set.getString(1);
if ((table.toLowerCase()).equals(reTableName.toLowerCase())) {
flag = true;
break;
}
}
return flag;
}
@Override
public boolean createTableAsSelect(String targetTableName, String select)
throws SQLException {
String create = "CREATE TABLE " + targetTableName;
String option = " row format delimited fields terminated by '\001' "; // you
// can
// change
// it
String as = " AS " + select; // here you can decide which column, table
// to select, join table or more
// comprehension clause
String sql = create + option + as;
log.info("建立資料表sql:" + sql);
System.out.println("Running: " + sql);
try {
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結出錯", e);
e.printStackTrace();
}
Connection con = DriverManager.getConnection(jdbcHiveUrl, hiveUser,
hivePwd);
java.sql.Statement stmt = con.createStatement();
stmt.execute(sql);
stmt.close();
con.close();
return true;
}
//11111111111111
@Override
public void deleteHiveTrueTable(String tableName) throws SQLException {
String deleteSql = "drop table if exists " + tableName;
System.out.println("Running: " + deleteSql);
log.info("刪除資料表sql:" + deleteSql);
try {
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結出錯", e);
e.printStackTrace();
}
Connection con = DriverManager.getConnection(jdbcHiveUrl, hiveUser,
hivePwd);
java.sql.Statement stmt = con.createStatement();
stmt.execute(deleteSql);
stmt.close();
con.close();
}
@Override
public List<Map<String, String>> getHiveColunmsByTableName(String hiveurl,
String userName, String password, String tableName) {
List<Map<String, String>> colsAndType = new ArrayList<Map<String, String>>();
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結出錯", e);
e.printStackTrace();
}
Connection con;
try {
con = DriverManager.getConnection(hiveurl, userName, password);
Statement stmt = con.createStatement();
String sql = "desc " + tableName;
log.info("獲取表字段sql" + sql);
ResultSet resultSet = stmt.executeQuery(sql);
while (resultSet.next()) {
Map<String, String> map = new HashMap<String, String>();
String colunm = resultSet.getString(1);
String type = resultSet.getString(2);
map.put("column", colunm);
map.put("type", type);
colsAndType.add(map);
}
stmt.close();
con.close();
} catch (SQLException e) {
e.printStackTrace();
log.error("sql執行出錯", e);
}
return colsAndType;
}
@Override
public List<String> getColumnValues(String tableName, String colName) {
String jdbcHiveUrl = Messages.getString("jdbcHiveUrl");
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
String sql = "select distinct " + colName + " from " + tableName;
try {
final String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
Connection con;
con = DriverManager.getConnection(jdbcHiveUrl, hiveUser, hivePwd);
final Statement stmt = con.createStatement();
log.info("sql:" + sql);
final ResultSet datSet = stmt.executeQuery(sql);
List<String> values = new ArrayList<String>();
while (datSet.next()) {
values.add(datSet.getString(1));
}
return values;
} catch (final ClassNotFoundException e) {
log.error("hive連結出錯", e);
e.printStackTrace();
return null;
} catch (SQLException e) {
log.error("sql執行出錯", e);
e.printStackTrace();
return null;
}
}
/*
* 得到所有表
*/
/*private ArrayList<String> getTables() throws SQLException {
try {
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
log.error("hive連結出錯",e);
}
Connection con = DriverManager.getConnection(jdbcHiveUrl, hiveUser,
hivePwd);
java.sql.Statement stmt = con.createStatement();
if (stmt == null)
return null;
String sql = "show tables";
ArrayList<String> result = new ArrayList<String>();
log.info("sql:"+sql);
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
result.add(res.getString(1));
}
stmt.close();
con.close();
return result;
}*/
@Override
public List<String> getTablesColName(String url, long resourceId,
String userName, String password, String goOnTableName) {
List<String> tableList = new LinkedList<String>();
if (url.contains("jdbc:sybase:Tds")) {
tableList = this.getColNameOfSybase(url, resourceId, userName,
password, goOnTableName);
return tableList;
}
try {
String jdbcMysqlDriver = Messages.getString("jdbc_mysql_driver");
if (url.contains("jdbc:oracle")) {
jdbcMysqlDriver = Messages.getString("jdbc_oracle_driver");
} else if (url.contains("jdbc:sqlserver")) {
jdbcMysqlDriver = Messages.getString("jdbc_sqlserver_driver");
}
Class.forName(jdbcMysqlDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結異常", e);
// TODO Auto-generated catch block
e.printStackTrace();
}
Connection con;
try {
con = DriverManager.getConnection(url, userName, password);
Statement stmt = con.createStatement();
ResultSet tableSet = null;
PreparedStatement pStatement = null;
if (url.contains("jdbc:oracle")) {
String sql1 = Messages.getString("oracle_show_tables");
log.info("sql:" + sql1);
pStatement = con.prepareStatement(sql1);
tableSet = pStatement.executeQuery();
} else if (url.contains("jdbc:sqlserver")) {
String sql2 = Messages.getString("sqlserver_show_tables");
log.info("sql:" + sql2);
pStatement = con.prepareStatement(sql2);
tableSet = pStatement.executeQuery();
} else {
String[] type = {"TABLE"};
tableSet = con.getMetaData().getTables("", "", "", type);
}
Boolean id = false;
while (tableSet.next()) {
String tableName = null;
if (url.contains("jdbc:oracle")) {
tableName = tableSet.getString(1);
} else if (url.contains("jdbc:sqlserver")) {
tableName = tableSet.getString(1);
} else {
tableName = tableSet.getString("TABLE_NAME");
}
if (goOnTableName == null || goOnTableName.equals("")
|| goOnTableName.equals(" ")) {
id = true;
} else {
if (tableName.equals(goOnTableName))
id = true;
}
if (id) {
tableList.add(tableName);
}
}
stmt.close();
con.close();
} catch (SQLException e) {
log.error("SQL執行異常", e);
e.printStackTrace();
}
return tableList;
}
private List<String> getColNameOfSybase(String url, long resourceId,
String userName, String password, String goOnTableName) {
List<String> tableList = new LinkedList<String>();
String jdbcMysqlDriver = Messages.getString("jdbc_sybase_driver");
try {
Class.forName(jdbcMysqlDriver);
String sql = Messages.getString("sybase_show_tables");
Connection con = DriverManager.getConnection(url, userName,
password);
Statement stmt = con.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
log.info("sql:" + sql);
PreparedStatement pStatement = con.prepareStatement(sql);
ResultSet tableSet = pStatement.executeQuery();
Boolean id = false;
while (tableSet.next()) {
String tableName = tableSet.getString("TABLE_NAME");
if (goOnTableName == null || goOnTableName.equals("")
|| goOnTableName.equals(" ")) {
id = true;
} else {
if (tableName.equals(goOnTableName))
id = true;
}
if (id) {
tableList.add(tableName);
}
}
stmt.close();
con.close();
} catch (ClassNotFoundException e) {
e.printStackTrace();
log.error("hive連結出錯", e);
} catch (SQLException e) {
e.printStackTrace();
log.error("SQL執行異常", e);
}
return tableList;
}
@Override
public List<String> getViewsColName(String url, long resourceId,
String userName, String password, String schemaName,
String goOnViewName) {
List<String> viewList = new LinkedList<String>();
if (url.contains("jdbc:sybase:Tds")) {
viewList = getSybaseView(url, resourceId, userName, password,
goOnViewName);
return viewList;
}
try {
String jdbcMysqlDriver = Messages.getString("jdbc_mysql_driver");
if (url.contains("jdbc:oracle")) {
jdbcMysqlDriver = Messages.getString("jdbc_oracle_driver");
} else if (url.contains("jdbc:sqlserver")) {
jdbcMysqlDriver = Messages.getString("jdbc_sqlserver_driver");
}
Class.forName(jdbcMysqlDriver);
} catch (ClassNotFoundException e) {
log.error("jdbc連結異常", e);
e.printStackTrace();
}
Connection con;
try {
con = DriverManager.getConnection(url, userName, password);
Statement stmt = con.createStatement();
ResultSet viewSet = null;
PreparedStatement pStatement = null;
String vn = "name";
if (url.contains("jdbc:oracle")) {
String sql1 = Messages.getString("oracle_show_views");
log.info("sql:" + sql1);
pStatement = con.prepareStatement(sql1);
viewSet = pStatement.executeQuery();
vn = "VIEW_NAME";
} else if (url.contains("jdbc:sqlserver")) {
String sql2 = Messages.getString("sqlserver_show_views");
log.info("sql:" + sql2);
pStatement = con.prepareStatement(sql2);
viewSet = pStatement.executeQuery();
} else {
String sql3 = Messages.getString("mysql_show_views") + "'"
+ schemaName + "'";
log.info("sql:" + sql3);
pStatement = con.prepareStatement(sql3);
viewSet = pStatement.executeQuery();
vn = "table_name";
}
Boolean id = false;
while (viewSet.next()) {
String tableName = viewSet.getString(vn);
if (goOnViewName == null || goOnViewName.equals("")
|| goOnViewName.equals(" ")) {
id = true;
} else {
if (tableName.equals(goOnViewName))
id = true;
}
if (id) {
viewList.add(tableName);
}
}
stmt.close();
con.close();
} catch (SQLException e) {
log.error("SQL執行異常", e);
e.printStackTrace();
}
return viewList;
}
private List<String> getSybaseView(String url, long resourceId,
String userName, String password, String goOnTableName) {
List<String> viewList = new LinkedList<String>();
String jdbcMysqlDriver = Messages.getString("jdbc_sybase_driver");
try {
Class.forName(jdbcMysqlDriver);
String sql = Messages.getString("sybase_show_views")
+ "'sysquerymetrics'";
Connection con = DriverManager.getConnection(url, userName,
password);
Statement stmt = con.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
log.info("sql:" + sql);
PreparedStatement pStatement = con.prepareStatement(sql);
ResultSet tableSet = pStatement.executeQuery();
Boolean id = false;
while (tableSet.next()) {
String tableName = tableSet.getString("name");
if (goOnTableName == null || goOnTableName.equals("")
|| goOnTableName.equals(" ")) {
id = true;
} else {
if (tableName.equals(goOnTableName))
id = true;
}
if (id) {
viewList.add(tableName);
}
}
stmt.close();
con.close();
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
} catch (SQLException e) {
log.error("SQL執行異常", e);
e.printStackTrace();
}
return viewList;
}
//111111111111111111111
@Override
public boolean createHiveTable(String tableName,String sql) throws SQLException {
boolean success= true;
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
String hiveUrl=Messages.getString("jdbcHiveUrl");
System.setProperty("HADOOP_USER_NAME", hiveUser);
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
success = false;
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
try {
deleteFdfsByHiveTable(tableName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
log.error("hive連線異常", e);
} // 同時刪除對應的hdfs檔案,因為是建外表
long startTime = System.currentTimeMillis();
String dropIfExistsTable = "drop table if exists " + tableName;
long endTime = System.currentTimeMillis();
System.out.println("刪除已存在的表所花時間(針對全量匯入):" + (endTime - startTime));
stmt.execute(dropIfExistsTable);
log.info("createSql:" + sql);
stmt.execute(sql);
stmt.close();
con.close();
return success;
}
/**
* 根據表名刪除該hive表對應的hdfs檔案,主要針對hive中的外表
*11111111111111
* @param tableName
* @return
* @throws ClassNotFoundException
* @throws SQLException
*/
public boolean deleteFdfsByHiveTable(String tableName)
throws ClassNotFoundException, SQLException {
boolean b = false;
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
String jdbcHiveUrl = Messages.getString("jdbcHiveUrl");
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
String sqoopOutput = Messages.getString("sqoopOutput");
String HDFSpath = Messages.getString("HDFSpath");
System.setProperty("HADOOP_USER_NAME", Messages.getString("hiveUser"));
String rootPath = Messages.getString("HDFSUrl");
Class.forName(jdbcHiveDriver);
String path = null;
Connection con = DriverManager.getConnection(jdbcHiveUrl, hiveUser,
hivePwd);
java.sql.Statement stmt = con.createStatement();
// 判斷該表是否存在
String sqlHad = "show tables '" + tableName + "'";
ResultSet had = stmt.executeQuery(sqlHad);
if (!had.next()) {
return true;
}
String sql = "describe formatted " + tableName;
log.info("sql:" + sql);
ResultSet set = stmt.executeQuery(sql);
while (set.next()) {
String location = set.getString(1);
if (location != null
&& "Location:".equals(location.replace(" ", "")))
path = set.getString(2);
}
set.close();
stmt.close();
con.close();
if (path != null) {
String[] paths = null;
if (path.contains(sqoopOutput)) {
paths = path.split(sqoopOutput);
} else if (path.contains(HDFSpath)) {
paths = path.split(HDFSpath);
}
if (paths != null && paths.length > 0) {
String dfs = paths[0];
path = path.replace(dfs, rootPath);
Path p = new Path(path);
Configuration conf = new Configuration();
try {
FileSystem fs = p.getFileSystem(conf);
boolean isHad = fs.exists(p);
if (isHad) {
b = fs.delete(p, true);
} else {
b = true;
}
// boolean b = fs.createNewFile(p);
fs.close();
} catch (IOException e) {
log.error("HDFS檔案讀取異常", e);
e.printStackTrace();
}
}
}
return b;
}
@Override
public boolean isExistHiveTable(String tableName) throws SQLException {
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
String hiveUrl = Messages.getString("jdbcHiveUrl");
System.setProperty("HADOOP_USER_NAME", hiveUser);
boolean exist = false;
if (tableName == null || tableName.trim().equals(""))
return false;
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結異常", e);
e.printStackTrace();
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
String showTablesql = "show tables '" + tableName + "'";
log.info("showTablesql:" + showTablesql);
ResultSet tableSet = stmt.executeQuery(showTablesql);
if (tableSet.next()) {
exist = true;
}
return exist;
}
/**
* 建立Hive textfiled表
*/
public String createHiveTempTable(String tableName,String sql, String HDFSPAth) throws SQLException {
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
String hiveUrl = Messages.getString("jdbcHiveUrl");
System.setProperty("HADOOP_USER_NAME", hiveUser);
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結異常", e);
e.printStackTrace();
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
String dropIfExistsTable = "drop table if exists " + tableName;
log.info("dropIfExistsTable:" + dropIfExistsTable);
stmt.execute(dropIfExistsTable);
log.info("createSql:" + sql);
stmt.execute(sql);
String loadData = "LOAD DATA INPATH '" + HDFSPAth + "' INTO TABLE " + tableName;
log.info("loadData:" + loadData);
stmt.execute(loadData);
stmt.close();
con.close();
return tableName;
}
/**
* 建立hive表 add by yangqi 2015/10/10
*/
@Override
public String createHiveORCTable(String tableName,String primaryKey, String sql) throws SQLException {
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
String hiveUrl = Messages.getString("jdbcHiveUrl");
System.setProperty("HADOOP_USER_NAME", hiveUser);
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
try {
deleteFdfsByHiveTable(tableName);
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
} // 同時刪除對應的hdfs檔案,因為是建外表
String dropIfExistsTable = "drop table if exists " + tableName;
log.info("dropIfExistsTable:" + dropIfExistsTable);
stmt.execute(dropIfExistsTable);
stmt.execute("set ngmr.partition.automerge = true");
String createSql= sql+" CLUSTERED BY ("
+ primaryKey
+ ") INTO "
+ "100"
+ " BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS ORC TBLPROPERTIES "
+ "('transactional'='true')";
System.out.println(createSql);
log.info("createSql:" + createSql);
stmt.execute(createSql.toString().trim());
stmt.close();
con.close();
//
return tableName;
}
/**
* 建立hiveorc表 add by yangqi 2015/10/10
*///1111111111111111
// 將資料從hive的textFile表匯入到orc表中
@Override
public void insertIntoHiveOrcTable(String textfileTableName,
String orcTableName, String hiveUrl) throws SQLException {
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
System.setProperty("HADOOP_USER_NAME", hiveUser);
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
//獲取text表的大小,根據這個大小來判斷task的量
Map<String,String> map = getCountAndSize(textfileTableName, Messages.getString("jdbcHiveUrl"));
stmt.execute("set ngmr.partition.automerge = true");
long count = Long.parseLong(map.get("count"));
if(count>=50000000){
stmt.execute("set mapred.reduce.tasks=100");
}else if(10000000<=count&&count<=50000000){
stmt.execute("set mapred.reduce.tasks=20");
}else{
stmt.execute("set mapred.reduce.tasks=10");
}
String insertSql = "insert into table " + orcTableName
+ " select * from " + textfileTableName + " where resource_flag = 0 distribute by rand()";
log.info("insertSql:" + insertSql);
stmt.execute(insertSql);
stmt.close();
con.close();
}
/**
* 根據表名統計資料表的記錄數和檔案大小
*
* @author ZYY
* @since 2015/1/14
*/
@Override
public Map<String, String> getCountAndSize(String tableName, String hiveUrl)
throws SQLException {
Map<String, String> map = new HashMap<String, String>(); //返回結果map
String[] pathAndSize = new String[2]; //儲存資料大小,地址陣列變數
String count = ""; //資料表記錄量變數
/*
* 獲取使用者名稱,密碼,得到jdbchive連結
* */
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
System.setProperty("HADOOP_USER_NAME", Messages.getString("hiveUser"));
String rootPath = Messages.getString("HDFSUrl");
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連結異常", e);
e.printStackTrace();
}
Connection con = DriverManager
.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
//定義獲取資料表記錄總量的sql
String countSql = "select count(*) from " + tableName;
log.info("獲取資料表記錄總量的sql" + countSql);
try {
ResultSet rs = stmt.executeQuery(countSql);
if (rs.next()) {
count = rs.getString(1);
}
} catch (Exception e) {
log.error("SQL執行異常", e);
e.printStackTrace();
}
//定義獲取hive中資料大小和地址sql
String sizesql = "describe formatted " + tableName;
ResultSet set = stmt.executeQuery(sizesql);
while (set.next()) {
String location = set.getString(1);
if (location != null
&& "Location:".equals(location.replace(" ", "")))
pathAndSize[0] = set.getString(2);
String totalSize = set.getString(2);
if (totalSize != null
&& "totalSize".equals(totalSize.replace(" ", "")))
pathAndSize[1] = set.getString(3);
}
// 由於hive建立的是外表,對path和siz進行處理
// 將path中的節點資訊改為port
if (pathAndSize[0] != null && !pathAndSize[0].contains(rootPath)) {
String path = pathAndSize[0];
String[] paths = path.split("://");
if (paths.length > 1) {
String dfs = paths[1];
String[] filPaths = dfs.split("/");
if (filPaths.length > 0) {
String f = filPaths[0];
path = dfs.replace(f, rootPath);
pathAndSize[0] = path;
}
}
}
// hive外表不能獲取size的處理
if (pathAndSize[1] == null || pathAndSize[1].equals("") || "0".equals(pathAndSize[1].trim())) {
if (pathAndSize[0] != null) {
String path = pathAndSize[0];
Path p = new Path(path);
long total = 0;
Configuration conf = new Configuration();
try {
FileSystem fs = p.getFileSystem(conf);
boolean isHad = fs.exists(p);
if (isHad) {
RemoteIterator<LocatedFileStatus> fd = fs.listFiles(p,
true);// 獲取資料夾下所有檔案
while (fd.hasNext()) {
LocatedFileStatus lf = fd.next();// 獲取檔案
System.out.println(lf.getLen());
total = total + lf.getLen();// 檔案大小
}
}
// 將單位由b轉換為kb
total =total/1024;
pathAndSize[1] = total + "";
fs.close();
} catch (IOException e) {
log.error("Hive檔案讀取出錯", e);
e.printStackTrace();
}
}
}
//關閉結果集,事務和資料庫連結
set.close();
stmt.close();
con.close();
//將結果存入到結果map
map.put("count", count);
map.put("size", pathAndSize[1]);
return map;
}
/**
* 增11111111量匯入的資料,在hive中全部刪除
*/
public void deleteIncrementDataExistInOrcTable(String textfileTable,
String orcTableName, String primaryKey, String hiveUrl)
throws SQLException {
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
System.setProperty("HADOOP_USER_NAME", hiveUser);
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
String deleteSql = "delete from " + orcTableName + " where "
+ primaryKey + " in (select " + primaryKey + " from "
+ textfileTable + ")";
log.info("deleteSql:" + deleteSql);
stmt.execute(deleteSql);
stmt.close();
con.close();
}
/**
* merge臨時表和orc add by yangqi 2015/10/14
*/
@Override
public void mergeIntoHiveOrcTable(Map<String, String[]> map,
String hiveUrl, String primaryKey) throws SQLException {
String hiveUser = Messages.getString("hiveUser");
String hivePwd = Messages.getString("hivePwd");
System.setProperty("HADOOP_USER_NAME", hiveUser);
try {
String jdbcHiveDriver = Messages.getString("jdbcHiveDriver");
Class.forName(jdbcHiveDriver);
} catch (ClassNotFoundException e) {
log.error("hive連線異常", e);
e.printStackTrace();
}
Connection con;
con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd);
Statement stmt = con.createStatement();
String resourceId = map.get("resourceId")[0];
String tableName = map.get("tableName")[0];
String orcTableName = resourceId + "_orc_" + tableName;
String tempOrcTable = resourceId + "_" + tableName;
StringBuffer mergeSql = new StringBuffer("MERGE INTO " + orcTableName + " a USING "
+ tempOrcTable + " b ON (a." + primaryKey + " = b."
+ primaryKey + ") WHEN MATCHED THEN UPDATE SET ");
String[] cols = map.get(tableName);
if (cols != null && cols.length > 0) {
for (int i = 0; i < cols.length; i++) {
if (0 == i) {
mergeSql.append(cols[i].split(" ")[0] + " = b."
+ cols[i].split(" ")[0]);
} else {
mergeSql.append(", " + cols[i].split(" ")[0]
+ " = b." + cols[i].split(" ")[0]);
}
}
}
mergeSql.append(" WHEN NOT MATCHED THEN INSERT (");
if (cols != null && cols.length > 0) {
for (int i = 0; i < cols.length; i++) {
if (0 == i) {
mergeSql.append(cols[i].split(" ")[0]);
} else {
mergeSql.append(", " + cols[i].split(" ")[0]);
}
}
}
mergeSql.append(") VALUES(");
if (cols != null