(Java實現)資料庫表結構掃描,壞行壞列掃描實現
阿新 • • 發佈:2018-12-19
(Java實現)Cache資料庫表結構掃描,壞行壞列掃描實現
- 最近接觸了Cache資料庫,需要將海量資料通過sqoop抽取到hadoop平臺。但是發現一個問題,Cache資料庫是一個後關係型資料,底層是按照global進行存存的,表結構的定義通過president持久class檔案來實現。
- class檔案實現的表結構有很多附加功能,可以新增展示資料和儲存資料,新增compute列而不用儲存compute值,面向物件開發才會展現。如果新增的compute計算公式相對複雜,引用了多個數據列,而在迭代公式中沒有做非法資料判斷的話就會導致表結構展現的異常,sqoop或者面向表結構讀取資訊的工具就會丟擲資料庫內部異常的錯誤。Cache這種獨門資料對於傳統行業的人來說簡直是惡夢,根本不清楚如何進入global的世界,global的大門如此神祕。基於這個問題,在研究了Cache資料庫這麼多年的前提下,和大家分享一個java的掃描方式來批量獲取表中的壞行壞列問題。並提供處理方式。大資料的到來,我們沒法不懂任何一個數據庫,也可能會接觸各種各樣的奇葩架構,所以要用傳統技術來分析偏冷知識。
基本思路就是 :
獲取主鍵–>儲存主鍵(記憶體or文字)–>多執行緒訪問主鍵–>根據資料庫的行數和分片規則生成統一的排程計劃–>監控執行緒執行情況 -->合併資訊 -->生成掃描log
- 獲取主鍵:由於cache資料庫的主鍵並不是都為數字Integer型別,很多varchar型別主鍵,因此需要指定。而傳統的select max(RowId)和select min(RowId)的方式並不適用~,此處為大坑!
- 儲存主鍵:資料行數在1000W以下建議用ArryList的形勢儲存到記憶體,加快執行緒呼叫速度;大於1000W建議就落盤到文字中。1000W行有時候佔用記憶體就1G了。
- 多執行緒訪問主鍵:加快資料掃描速度
- 分片規則:建議為50W一個執行緒,最多多少個根據資料庫的License情況來定,Licence佔用過多就導致其他執行緒無法訪問了
- 監控的執行緒執行情況:定義一個ArrayList ThreadMon 的集合,每個執行緒執行完畢會新增一個Interger的元素,而分片之前已經知道匯流排程數量,通過size和count的比對來監控是否完成。當然還有很多執行緒通訊的方式,感覺不如這樣簡單。
- 合併資訊:每個執行緒掃描到的錯誤都儲存到ArrayList SQLTableScanInfo集合中。最後統一把集合資訊遍歷輸出即可。
- 最終生成log,資訊如下:
開始時間為:2018-10-30 09:19:58 執行結束,總共分了 13 片 每片數量為:500000 監控執行緒的ArryList長度為:13 總資料量:6138360 結束時間為:2018-10-30 09:55:16 發現錯誤數量4 ====================================ErrorInfo=================================== PKName:id PKValue:555555 java.sql.SQLException: [SQLCODE: <-400>:<出現致命錯誤 >] [Cache Error: <<ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] [Location: <ServerLoop - Query Fetch>] [%msg: <Unexpected error occurred: <ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] PKName:id PKValue:30 java.sql.SQLException: [SQLCODE: <-400>:<出現致命錯誤 >] [Cache Error: <<ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] [Location: <ServerLoop - Query Fetch>] [%msg: <Unexpected error occurred: <ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] PKName:id PKValue:90 java.sql.SQLException: [SQLCODE: <-400>:<出現致命錯誤 >] [Cache Error: <<ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] [Location: <ServerLoop - Query Fetch>] [%msg: <Unexpected error occurred: <ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] PKName:id PKValue:100 java.sql.SQLException: [SQLCODE: <-400>:<出現致命錯誤 >] [Cache Error: <<ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] [Location: <ServerLoop - Query Fetch>] [%msg: <Unexpected error occurred: <ILLEGAL VALUE>%0Afirst+8^%sqlcq.pSYS.cls18.1>] ====================================ErrorInfo===================================
拿著主鍵和名和主鍵值就可完成對原庫的比對,修復損壞資料。
具體程式碼如下:
package com.cache.repairtable;
import java.io.*;
import java.sql.*;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.*;
/*
*created by qun.liu;
*For Details Connect to QQ:471832953
*/
@SuppressWarnings({ "unused" })
public class RepairTable4 {
//final static int SplitNum =5000;
final static ArrayList<Integer> ThreadMon = new ArrayList<>();
final static ArrayList<String> SQLTableScanInfo = new ArrayList<>();
@SuppressWarnings("resource")
public static void main(String[] args) throws ClassNotFoundException, SQLException, IOException {
System.out.println("輸入ip:");
Scanner ipsc = new Scanner(System.in);
String ipstr = ipsc.nextLine();
System.out.println("輸入port:");
Scanner portsc = new Scanner(System.in);
String portstr = portsc.nextLine();
System.out.println("輸入namespace:");
Scanner nssc = new Scanner(System.in);
String nsstr = nssc.nextLine();
String DB_URL = "jdbc:Cache://" + ipstr + ":" + portstr + "/" + nsstr;
String JDBC_DRIVER = "com.intersys.jdbc.CacheDriver";
//String DB_URL = "jdbc:Cache://192.168.31.187:1972/%sys";
String UserName = "_system";
String Password = "sys";
Class.forName(JDBC_DRIVER);
Connection dbconnection = DriverManager.getConnection(DB_URL, UserName, Password);
dbconnection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
System.out.println(DB_URL);
SQLSelectRun(dbconnection, DB_URL, UserName, Password);
}
@SuppressWarnings("resource")
private static void SQLSelectRun(Connection dbconnection, String url, String user, String password)
throws IOException {
Scanner s = new Scanner(System.in);
System.out.println("開始獲取表的原始資訊:");
System.out.println("please input schema:");
String schema = s.nextLine();
Scanner t = new Scanner(System.in);
System.out.println("please input table:");
String tablename = t.nextLine();
String sqlstr = "select * from " + schema + "." + tablename;
// String sqlstr="select * from DHCCSYS.DatabaseInfo";
System.out.println(sqlstr);
try {
PreparedStatement sqlrst = dbconnection.prepareStatement(sqlstr, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
java.sql.ResultSet sqlRST = sqlrst.executeQuery();
int count = 0;
ResultSetMetaData rsmd = sqlRST.getMetaData();
System.out.println("TableColumns CountNum:" + rsmd.getColumnCount());
String[] arry = new String[rsmd.getColumnCount()];
for (int i = 1; i < rsmd.getColumnCount(); i++) {
String ColumnName = rsmd.getColumnName(i);
String ColumnType = rsmd.getColumnTypeName(i);
System.out.println(ColumnName + " --> " + ColumnType);
}
sqlRST.close();
System.out.println("Which Column is PrimaryKey:");
Scanner sc = new Scanner(System.in);
String PkName = sc.nextLine();
System.out.println("正在獲取" + schema + "." + tablename + "的" + PkName + "主鍵......");
String sqlstr2 = "select " + PkName + " from " + schema + "." + tablename;
System.out.println(sqlstr2);
String FilePKInfo = schema + tablename + "pkinfo.txt";
PreparedStatement sqlrst2 = dbconnection.prepareStatement(sqlstr2, ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
ResultSet sqlRST2 = sqlrst2.executeQuery();
// 執行分片方法
ExecuteSplit(PkName, sqlRST2, dbconnection, schema, tablename, PkName, url, user, password);
} catch (SQLException e) {
e.printStackTrace();
}
}
private static void ExecuteSplit(String PkName, ResultSet sqlRST2, Connection dbconnection, String Schema,
String Table, String PKName, String url, String user, String password) throws IOException, SQLException {
ArrayList<String> ALPrimaryKey = new ArrayList<>();
// ArrayList<Integer> ThreadMon=new ArrayList<>();
String PKFileName=Schema+"_"+Table+"_"+PkName+".txt";
BufferedWriter bw=new BufferedWriter(new FileWriter(PKFileName));
int count=0;
while (sqlRST2.next()) {
//ALPrimaryKey.add(sqlRST2.getString(PkName));
bw.write(sqlRST2.getString(PkName));
bw.newLine();
bw.flush();
count++;
}
bw.close();
System.out.println("主鍵獲取完畢,開始掃描行資訊有誤報錯!");
//for (String PK : ALPrimaryKey) {
// System.out.println(PK);
//}
// BufferedReader br=new BufferedReader(new FileReader(PKFileName));
// int count=0;
// while((br.readLine())!=null) {
// count++;
// }
System.out.println("總資料量:" + count);
System.out.print("請輸入資料庫允許的最大程序數:");
Scanner sc=new Scanner(System.in);
int ProcessLimitNum=Integer.valueOf(sc.nextLine());
int SplitNum;
SplitNum=count/ProcessLimitNum;
if(SplitNum<=500000) {
SplitNum=500000;
System.out.println("資料量不大,調整分片行數限制為"+SplitNum);
}else {
System.out.println("資料量較大,調整分片行數限制為"+SplitNum);
}
SimpleDateFormat Btime = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
String BeginTime = Btime.format(new java.util.Date());
int tempNum1 = count % SplitNum; // 餘數
int tempNum2 = count / SplitNum; // 商
// System.out.println(tempNum2);
int splitcount = tempNum2; // 分了多少片呢?
// 行數大於預定的分片數,且有餘數
if ((tempNum1 != 0) & (count > SplitNum)) {
for (int i = 1; i <= tempNum2; i++) {
System.out.println("第" + i + "片");
int beginnum = (i - 1) * SplitNum+1;
int endnum = i * SplitNum;
System.out.println(beginnum);
System.out.println(endnum);
new Thread(new Runnable() {
public void run() {
// ThreadFindEachRowInfo(ALPrimaryKey, beginnum, endnum,
// Schema, Table, PkName,url,user,password);
try {
Connection dbconnection = DriverManager.getConnection(url, user, password);
dbconnection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
BufferedReader br=new BufferedReader(new FileReader(PKFileName));
String pkname;
int cc=0;
while((pkname=br.readLine())!=null) {
cc++;
if((cc>=beginnum)&(cc<=endnum)){
String sqlstr = "select * from " + Schema + "." + Table + " where " + PKName + "=\'"
+ pkname + "\'";
//System.out.println(sqlstr);
try {
// System.out.println(sqlstr);
Statement stmt = dbconnection.createStatement();
ResultSet sqlRST1 = stmt.executeQuery(sqlstr);
while (sqlRST1.next()) {
}
} catch (SQLException e) {
SQLTableScanInfo.add("PKName:" + PKName + " PKValue:" + pkname);
SQLTableScanInfo.add(e.toString());
}
}
}
br.close();
ThreadMon.add(1);
dbconnection.close();
} catch (SQLException e1) {
e1.printStackTrace();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}).start();
}
}
System.out.println("正在後臺執行" + splitcount);
File file = new File(Schema + Table + "_Result.log");
FileWriter fw = new FileWriter(file);
try {
while (true) {
Thread.sleep(1000);
/// 每個執行緒都會往ArryList裡add數值1.通過判斷ArryList的長度和總子程序數是否相等來判斷是不是所有的執行緒執行完畢
if (ThreadMon.size() == splitcount) {
System.out.println("開始時間為:" + BeginTime);
fw.write("開始時間為:" + BeginTime + "\r\n");
System.out.println("執行結束,總共分了 " + splitcount + " 片\r\n");
fw.write("執行結束,總共分了 " + splitcount + " 片\r\n");
System.out.println("每片數量為:"+SplitNum);
fw.write("每片數量為:"+SplitNum+"\r\n");
System.out.println("監控執行緒的ArryList長度為:" + ThreadMon.size());
fw.write("監控執行緒的ArryList長度為:" + ThreadMon.size() + "\r\n");
System.out.println("總資料量:" + count);
fw.write("總資料量:" + count + "\r\n");
SimpleDateFormat Etime = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
String EndTime = Etime.format(new java.util.Date());
System.out.println("結束時間為:" + EndTime);
fw.write("結束時間為:" + EndTime + "\r\n");
fw.write("發現錯誤數量"+SQLTableScanInfo.size()/2+"\r\n");
fw.write("====================================ErrorInfo===================================\r\n");
if (SQLTableScanInfo.size() != 0) {
for (String s : SQLTableScanInfo) {
System.out.println(s);
fw.write(s + "\r\n");
//fw.flush();
}
} else {
System.out.println("恭喜,未發現錯誤!");
fw.write("恭喜,未發現錯誤!\r\n");
}
System.out.println("log路徑為:" + file.getAbsolutePath());
try {
dbconnection.close();
} catch (SQLException e) {
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
fw.write("====================================ErrorInfo===================================\r\n");
fw.close();
break;
}
}
} catch (InterruptedException e) {
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
}
private static void ThreadFindEachRowInfo(ArrayList<String> pkarry, int Begin, int End, String Schema, String Table,
String PKName, String url, String user, String password) {
// System.out.println("執行緒內部執行");
try {
Connection dbconnection = DriverManager.getConnection(url, user, password);
dbconnection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
for (int i = Begin; i <= End; i++) {
// System.out.println(pkarry.get(i));
String sqlstr = "select * from " + Schema + "." + Table + " where " + PKName + "=\'" + pkarry.get(i)
+ "\'";
try {
// System.out.println(sqlstr);
Statement stmt = dbconnection.createStatement();
ResultSet sqlRST1 = stmt.executeQuery(sqlstr);
while (sqlRST1.next()) {
continue;
}
} catch (SQLException e) {
SQLTableScanInfo.add("PKName:" + PKName + " PKValue:" + pkarry.