TXT 資料檔案批量匯入DB
應用背景:
現有100-200個TXT資料檔案,每個檔案中存貯的資料量不盡相同,例如有的儲存2~3萬條,有的存貯2~3百條資料,現需要將這些檔案匯入到Sql Server資料庫中;匯入過程需在前臺頁面中顯示出每個檔案匯入的基本情況(匯入數量,狀態碼,出錯的行數等)。
思路探尋:
(1)專案DAO層使用的Hibernate,經測試匯入一個2萬多條的檔案,加上TXT檔案逐行解析,和生成List;加上使用Hibernate的插入到資料庫中大約需要15-20秒的時間,用時太長;
(2)使用原生JDBC技術進行分批(隨機分組和按檔案大小分組)操作;最終匯入全部檔案大約需要3.5分鐘左右,
本文目的:
記錄本次匯入筆記,希望 猿友 在匯入過程中和多執行緒併發上給出寶貴的意見,在此先表謝意!
STEP 1 : 定義檔案匯入過程中每個檔案匯入的資訊 ENTITY
/** * @Description 記錄TXTGPS檔案匯入過程資訊; */ public class ReceiveGpsTO { // 匯入批TXT檔案所在目錄 private String importDir; // 要匯入的檔名稱 private String importfileName; // 匯入成功的總行數 private int importRows; // 匯入狀態 private String importState; // 匯入過程中備註資訊 private String importRemark; // 匯入過程中出錯的行號 private int errorRow; // 匯入一個檔案所需時間,預設毫秒 private int importTime; public String getImportDir() { return importDir; } public String setImportDir(String importDir) { this.importDir = importDir } ...... }
STEP 2 : 接收頁面請求,開始進行匯入
public class ReceiveGpsDataFS implements IReceiveGpsDataFS { private final static Logger logger = LoggerFactory.getLogger(ReceiveGpsDataFS.class); @Override public GpsTableDataInfo receiveGpsData(ReceiveGpsTO gpsTO, int rangeStart, int fetchSize) { Long bTime = System.currentTimeMillis(); GpsTableDataInfo tableData = new GpsTableDataInfo(); tableData.setRltMsg(GPSConstants.IMPORT_SUCCESS); int sumNum = 0; List<ReceiveGpsTO> rltList = new ArrayList<ReceiveGpsTO>(); if(StringUtils.hasText(gpsTO.getImportDir())){ File dirFile = new File(gpsTO.getImportDir()); if(dirFile.isDirectory()){ File[] importFiles = dirFile.listFiles(); //MARK: 對匯入的大量檔案進行兩種策略分組處理; List<List<File>> groupList = (GPSConstants.RANDOM_OR_SIZE_DIDIVE_STRATEGY) ? RandomGroupUtil.getGroupByRandom(importFiles, GPSConstants.FILE_NUM_4_GROUP) : RandomGroupUtil.getGroupBySize(importFiles, GPSConstants.FILE_NUM_4_GROUP); for(List<File> gl : groupList){ //MARK: 阻塞執行;執行完一組,拿到結果後執行下一組; logger.info("開始匯入GPS組檔案:{}", gl); List<ReceiveGpsTO> eachGroupRltList = ImportGpsData2DB.importGpsData(gl); rltList.addAll(eachGroupRltList); logger.info("開始匯入GPS組檔案:{} 結束", gl); } }else{ logger.error("匯入的檔案路徑{}不是正確的目錄", gpsTO.getImportDir()); tableData.setRltMsg(GPSConstants.IMPORT_DEFAULT); } for(ReceiveGpsTO t : rltList){ sumNum += t.getImportRows(); } tableData.setTotalCount(sumNum); tableData.setData(rltList); Long eTime = System.currentTimeMillis(); logger.info("匯入檔案數量:"+rltList.size()+" 總耗時分鐘: "+(eTime-bTime)/GPSConstants.SECOND_2_MS); } return tableData; } }
對STEP 2 中用到的常量和檔案分組方式進行簡單說明:
對需要匯入的100多個TXT檔案進行分組匯入,分組時提供兩種分組方式: 一種,隨機進行分組,也就是在同一個組內,可能有的檔案比較大,有的檔案比較小,例如:(File1:3M, File2 : 1M, File3: 4M, File4: 20KB);這種分組的好處:每組檔案總的大小相對穩定,不會導致一組檔案特別到,因為後期匯入時按組開執行緒,使用Callable介面 拿CONNECTION操作,可以降低OOM概率;第二種:按檔案大小分組,也就是說同一組內,每個檔案的大小不相上下,好處,後期插入時,使用Future.get方法時是阻塞方法,可以有效的減下彼此間的等待時間,速度快,同時也增加了OOM概率。
常量:
public class GPSConstants {
//匯入STATE
public static final String IMPORT_DEFAULT = "FAILURE";
public static final String IMPORT_SUCCESS = "SUCCESS";
public static final int SECOND_2_MS = 1000;
//匯入檔案進行隨機分組:好處:減小OOM概率
//匯入檔案按大小分組:好處:匯入速度相當較快;壞處:當每組檔案數量比較多時,增加OOM概率
//true : 隨機分組 false: 按大小分組;
public static final boolean RANDOM_OR_SIZE_DIDIVE_STRATEGY = true;
//public static final boolean SIZE_DIDIVE_STRATEGY = true;
//匯入檔案分組:每組中檔案個數;建議當JVM XMx<300不要超過5個;
public static final int FILE_NUM_4_GROUP = 4;
}
兩種分組方式:
/**
* @Description 隨機分組工具類, 提供按檔案大小分組和隨機分組方式
*/
public class RandomGroupUtil {
/**
* @Dscription 隨機分組模式
* @param origData 要分組的資料;
* @param numOfEachGroup 每組中成員數量;
* @return
*/
public static <T> List<List<T>> getGroupByRandom(T[] origData, int numOfEachGroup) {
int len = origData.length;
List<T> origList = new ArrayList<T>();
List<List<T>> groupsList = new ArrayList<List<T>>();
for(T f : origData)
origList.add(f);
if(len < numOfEachGroup) {//只分一組
groupsList.add(origList);
return groupsList;
}
// 計算可以分多少組
int groupNum = ((len + numOfEachGroup) - 1) / numOfEachGroup;
for (int i = 0; i < groupNum-1; i++) {
List<T> group = new ArrayList<T>();
for (int j = 0; j < numOfEachGroup; j++) {
int random = getRandom(origList.size());
group.add(origList.get(random));
origList.remove(random);
}
groupsList.add(group);
}
// 最後剩下的人分成一組;
groupsList.add(origList);
return groupsList;
}
/**
* @Decription 按檔案大小進行分組,每組成員中所含的檔案大小相近
* @param files
* @param numOfEachGroup 每組中要包含的成員數量;
* @return 分好的組
*/
public static List<List<File>> getGroupBySize(File[] files, int numOfEachGroup) {
int len = files.length;
List<File> orgList = new ArrayList<File>();
List<List<File>> groupsList = new ArrayList<List<File>>();
for(File f : files)
orgList.add(f);
if(len < numOfEachGroup) {//只分一組
groupsList.add(orgList);
return groupsList;
}
//按檔案大小排序
Collections.sort(orgList, new Comparator<File>(){
public int compare(File o1, File o2) {
if( o1.length() == o2.length())
return 0;
return (o1.length() - o2.length() > 0) ? 1 : -1;
}
});
// 計算可以分多少組
int groupNum = ((len + numOfEachGroup) - 1) / numOfEachGroup;
for (int i = 0; i < groupNum-1; i++) {
List<File> group = new ArrayList<File>();
for (int j = 0; j < numOfEachGroup; j++) {
group.add(orgList.get(orgList.size()-1));
orgList.remove(orgList.size()-1);
}
groupsList.add(group);
}
// 最後剩下的人分成一組;
groupsList.add(orgList);
return groupsList;
}
private static int getRandom(int num) {
Random r = new Random();
return r.nextInt(num);
}
}
STEP 3 : 使用 Executors 和 DataSource 資料來源連線池按組進行批量匯入
/**
* @Description : 使用executor呼叫call, 匯入狀態資訊封入ReceiveGpsTO
*
*/
public class ImportGpsData2DB {
private static final Logger logger = LoggerFactory.getLogger(ImportGpsData2DB.class);
private static ExecutorService executor = null;
private static WrapDataSource dataSource = null;
static{
executor = Executors.newFixedThreadPool(GPSConstants.FILE_NUM_4_GROUP);
dataSource = (WrapDataSource) ZBus.findCommonService("dataSource");
}
public static List<ReceiveGpsTO> importGpsData(List<File> gl) {
List<ReceiveGpsTO> rList = new ArrayList<ReceiveGpsTO>();
List<Future<ReceiveGpsTO>> fList = new ArrayList<Future<ReceiveGpsTO>>();
for(File file : gl){
Connection conn = null;
try {
conn = dataSource.getConnection();
} catch (SQLException e) {
logger.error("{}從連線池中獲取資料庫連線失敗, 本組檔案 {} 匯入失敗", ImportGpsData2DB.class.getName(), gl);
e.printStackTrace();
return rList;
}
// 開啟執行緒進行匯入
Future<ReceiveGpsTO> future = executor.submit(new ImportGpsCallable(file, conn));
fList.add(future);
}
for(Future<ReceiveGpsTO> f : fList){
try {
ReceiveGpsTO rTO = f.get();
rList.add(rTO);
} catch (Exception e) {
logger.error("{} 獲取本組檔案{} 匯入狀態失敗", ImportGpsData2DB.class.getName(), f);
e.printStackTrace();
}
}
return rList;
}
}
資料來源配置:
<beans>
<bean id="dbproperties"
class="com.ssb.***.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>classpath:*.db.properties</value>
</property>
</bean>
<bean id="dataSource"
class="com.zte.WrapDataSource"
destroy-method="close" init-method="initSource">
<property name="driverClassName"
value="${ssb.driverClassName}" />
<property name="url"
value="${ssb.url}${ssb.dbname}" />
<property name="username" value="${ssb.username}" />
<property name="password" value="${ssb.password}" />
<property name="initialSize" value="5" />
<property name="maxActive" value="30" />
<property name="maxWait" value="-1" />
<property name="maxIdle" value="10" />
<property name="removeAbandoned" value="true" />
<property name="removeAbandonedTimeout" value="30000" />
<property name="logAbandoned" value="true" />
<property name="encrypt" value="${ssb.isEncrypt}" />
</bean>
</beans>
STEP 4 : 實現 CALLABLE 執行緒進行匯入,每組中得每一個檔案對應一個匯入執行緒;
/**
* @Description GPS資料匯入執行緒,主要完成txt檔案解析和資料庫匯入;
* @author ***
*
*/
public class ImportGpsCallable implements Callable<ReceiveGpsTO> {
private static final String SQL_STR = "INSERT INTO [ZXX_MXX].[dbo].[GPS_XXX_PASS_RECORD]( ORDER_NUM, HPHM, HPZL, PASS_TIME, LONGITUDE, "
+ "LATITUDE, SPEED, DIRECTION, POSITION_STATE, MILEAGE, RELATIVE_MILEAGE, VEHICLE_STATE, LOCATION) VALUES("
+ "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private File file;
private Connection conn;
//通過構造方法傳入所需的資源
public ImportGpsCallable(File file, Connection conn) {
this.file = file;
this.conn = conn;
}
@Override
public ReceiveGpsTO call() throws Exception {
ReceiveGpsTO gpsRlt = new ReceiveGpsTO();
gpsRlt.setImportfileName(file.getName());
Long beginImpTime = System.currentTimeMillis();
// 通過工具類DataClearTools對TXT檔案進行格式驗證和資料解析;
List<GpsVehiclePassRecord> rList = DataClearTools.getGpsRecords(file, gpsRlt);
PreparedStatement ps = null;
try{
if(null != conn){
conn.setAutoCommit(false);
ps = conn.prepareStatement(SQL_STR);
int[] rows = null;
if( rList.size() > 0 ){
for(GpsVehiclePassRecord g : rList){
ps.setLong(1, g.getOrderNum());
ps.setString(2, g.getHphm());
ps.setString(3, g.getHpzl());
ps.setTimestamp(4, new Timestamp(g.getPassTime().getTime()));
ps.setString(5, g.getLongitude());
ps.setString(6, g.getLatitude());
ps.setInt(7, g.getSpeed());
ps.setString(8, g.getDirection());
ps.setString(9, g.getPositionState());
ps.setString(10, g.getMileage());
ps.setString(11, g.getRelativeMileage());
ps.setString(12, g.getVehicleState());
ps.setString(13, g.getLocation());
ps.addBatch();
}
rows = ps.executeBatch();
conn.commit();
}else{
gpsRlt.setImportRemark("資料匯入失敗");
gpsRlt.setImportState(GPSConstants.IMPORT_DEFAULT);
}
gpsRlt.setImportRows(rows.length);
Long endImpTime = System.currentTimeMillis();
gpsRlt.setImportTime((int) (endImpTime-beginImpTime));
System.out.println(Thread.currentThread().getName()+" "+gpsRlt.getImportTime());
gpsRlt.setImportRemark("資料匯入成功");
gpsRlt.setImportState(GPSConstants.IMPORT_SUCCESS);
}
}catch(Exception e){
conn.rollback();
e.printStackTrace();
gpsRlt.setImportRemark("資料匯入失敗");
gpsRlt.setImportState(GPSConstants.IMPORT_DEFAULT);
}finally{
if(null != ps){
ps.close();
}
if(null != conn){
conn.close();
}
}
return gpsRlt;
}
}
對TXT檔案進行格式驗證和解析的工具類 DataClearTools 如下
/**
* @Description 對TXT檔案進行解析;
* @author ***
*
*/
public class DataClearTools {
private static final Logger logger = LoggerFactory.getLogger(DataClearTools.class);
private static final String DEF_HPZL = "01";
private static final String ENCODING = "UTF-8";
/**
* @Description 解析TXT -> List<GpsVehiclePassRecord>
* @param file 被解析的TXT檔案
* @param to 解析過程中的狀態MODEL
* @return 解析完成的List
*/
public static List<GpsVehiclePassRecord> getGpsRecords(File file, ReceiveGpsTO to) {
List<GpsVehiclePassRecord> gpsList = new ArrayList<GpsVehiclePassRecord>();
List<GpsVehiclePassRecord> duplicateList = new ArrayList<GpsVehiclePassRecord>();
InputStreamReader read = null;
BufferedReader bufferedReader = null;
int row=0;
try {
if (file.isFile() && file.exists()) {
read = new InputStreamReader(new FileInputStream(file), ENCODING);
bufferedReader = new BufferedReader(read);
String lineTxt = null;
while ((lineTxt = bufferedReader.readLine()) != null) {
row++;
if(1 == row){
continue;
}
if (StringUtils.isNotBlank(lineTxt)) {// 不等於空才執行
GpsVehiclePassRecord pass = null;
try{
pass = lineTxt2GpsPass(lineTxt);
}catch(Exception e){
throw e;
}
if(null != pass)
gpsList.add(pass);
}else{
row++;
}
}
//根據hphm和過車時間去掉重複行
if(gpsList.size()>0){
duplicateList = DuplicateGpsRecord(gpsList);
}
}
}
} catch (IOException e) {
logger.error("解析GPS檔案{}出錯! ",file.getName());
to.setImportRemark("解析檔案過程中出錯");
to.setImportRows(0);
duplicateList.clear();
e.printStackTrace();
} catch (Exception e){
logger.error("解析GPS檔案{}出錯! ",file.getName());
to.setImportRemark("解析檔案行資訊過程中出錯");
to.setImportRows(0);
to.setErrorRow(row);
duplicateList.clear();
e.printStackTrace();
}finally{
if(bufferedReader!=null){
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(null != read){
try {
read.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return duplicateList;
}
/**
* @Description 根據hphm和過車時間去重;
* @param gpsList
* @return
*/
private static List<GpsVehiclePassRecord> DuplicateGpsRecord(List<GpsVehiclePassRecord> gpsList) {
List<GpsVehiclePassRecord> duplicateList = new ArrayList<GpsVehiclePassRecord>();
for(GpsVehiclePassRecord g : gpsList){
if(duplicateList.contains(g)){
continue;
}else{
duplicateList.add(g);
}
}
return duplicateList;
}
/**
* @Description 根據讀到的每行資訊解析成Gps Model
* @param lineTxt
* @return
* @throws Exception
*/
private static GpsVehiclePassRecord lineTxt2GpsPass(String lineTxt) throws Exception {
GpsVehiclePassRecord record = new GpsVehiclePassRecord();
try{
String[] strArr = lineTxt.split("\\s+");
String hphmStr;
//序號
record.setOrderNum(Long.parseLong(strArr[0].trim()));
//XXXX 轉換過程
return record;
}catch(Exception allE){
logger.error("解析行內容到GpsVehicelPassRecord出錯");
throw allE;
}
}
}
結果展示:
CASE 1 : 對 120個TXT 檔案,總共 97萬5千 條資料進行匯入:採用檔案 隨機分組策略,每組 4 個檔案 ;進行匯入:總共用時:576秒;
所需的Heap Size 最大不超過200M (PS: 自然包括工程其它東西佔用); 在此需說明,由於 匯入執行緒使用的是Callable,其Future.get() 方法是阻塞方法,所以每次匯入的時間可能不一樣,不過總體相差不會太多;
CASE 2 : 對 120個TXT 檔案,總共 97萬5千 條資料進行匯入:採用檔案 按檔案大小分組策略,每組 4 個檔案 ;進行匯入:總共用時:366秒;所需的Heap Size 最大不超過300M (PS: 自然包括工程其它東西佔用); 不過可以明顯看出,時間縮短了但是需要的最大Heap Size 有所增加,
需要的HEAP SIZE 比隨機匯入時大的原因,按檔案大小進行排序,當一組檔案都比較大時,自然H S也就比較大,
所需的匯入時間 366 < 576的原因是,在匯入過程中使用了Callable介面,Future.get() 方法是阻塞方法,相同大小的檔案在一起匯入,批次不會等待太多時間;
CASE 3 : 採用 檔案按大小分組,每組成員數量為5個,匯入時間大概3分多點,此處不再羅列,注意並不是 每組的成員越多越好,當成員過高時需要的資料庫連結也越多,到達一定量時,會方式IO阻塞問題;
尾聲:
希望 朋友可以在 異常處理; 資料迴歸;和併發問題上給出寶貴的意見和批評,感謝;