1. 程式人生 > >TXT 資料檔案批量匯入DB

TXT 資料檔案批量匯入DB

應用背景:

現有100-200個TXT資料檔案,每個檔案中存貯的資料量不盡相同,例如有的儲存2~3萬條,有的存貯2~3百條資料,現需要將這些檔案匯入到Sql Server資料庫中;匯入過程需在前臺頁面中顯示出每個檔案匯入的基本情況(匯入數量,狀態碼,出錯的行數等)。

思路探尋:

(1)專案DAO層使用的Hibernate,經測試匯入一個2萬多條的檔案,加上TXT檔案逐行解析,和生成List;加上使用Hibernate的插入到資料庫中大約需要15-20秒的時間,用時太長;

(2)使用原生JDBC技術進行分批(隨機分組和按檔案大小分組)操作;最終匯入全部檔案大約需要3.5分鐘左右,

本文目的:

記錄本次匯入筆記,希望 猿友 在匯入過程中和多執行緒併發上給出寶貴的意見,在此先表謝意!


STEP 1 : 定義檔案匯入過程中每個檔案匯入的資訊 ENTITY

/**
 * @Description 記錄TXTGPS檔案匯入過程資訊;
 */
public class ReceiveGpsTO {
    // 匯入批TXT檔案所在目錄
    private String importDir;
    // 要匯入的檔名稱
    private String importfileName;
    // 匯入成功的總行數
    private int importRows;
    // 匯入狀態
    private String importState;
    // 匯入過程中備註資訊
    private String importRemark;
    // 匯入過程中出錯的行號
    private int errorRow;
    // 匯入一個檔案所需時間,預設毫秒
    private int importTime;
	
    public String getImportDir() {
        return importDir;
    }
    public String setImportDir(String importDir) {
       this.importDir = importDir
    }
    
    ......
}

STEP 2 : 接收頁面請求,開始進行匯入

public class ReceiveGpsDataFS implements IReceiveGpsDataFS {
	
    private final static Logger logger = LoggerFactory.getLogger(ReceiveGpsDataFS.class);

    @Override
    public GpsTableDataInfo receiveGpsData(ReceiveGpsTO gpsTO, int rangeStart,
		int fetchSize) {
	Long bTime = System.currentTimeMillis();
	GpsTableDataInfo tableData = new GpsTableDataInfo();
	tableData.setRltMsg(GPSConstants.IMPORT_SUCCESS);
	int sumNum = 0;
	List<ReceiveGpsTO> rltList = new ArrayList<ReceiveGpsTO>();
	if(StringUtils.hasText(gpsTO.getImportDir())){
		File dirFile = new File(gpsTO.getImportDir());
		if(dirFile.isDirectory()){
		    File[] importFiles = dirFile.listFiles();
		    //MARK: 對匯入的大量檔案進行兩種策略分組處理;
		    List<List<File>> groupList = (GPSConstants.RANDOM_OR_SIZE_DIDIVE_STRATEGY) ? 
						RandomGroupUtil.getGroupByRandom(importFiles, GPSConstants.FILE_NUM_4_GROUP) : 
							RandomGroupUtil.getGroupBySize(importFiles, GPSConstants.FILE_NUM_4_GROUP);
		    for(List<File> gl : groupList){
			//MARK: 阻塞執行;執行完一組,拿到結果後執行下一組;
			logger.info("開始匯入GPS組檔案:{}", gl);
			List<ReceiveGpsTO> eachGroupRltList = ImportGpsData2DB.importGpsData(gl);
			rltList.addAll(eachGroupRltList);
			logger.info("開始匯入GPS組檔案:{} 結束", gl);
		    }
		}else{
		    logger.error("匯入的檔案路徑{}不是正確的目錄", gpsTO.getImportDir());
		    tableData.setRltMsg(GPSConstants.IMPORT_DEFAULT);
		}
		for(ReceiveGpsTO t : rltList){
		    sumNum += t.getImportRows();
		}
		tableData.setTotalCount(sumNum);
		tableData.setData(rltList);
		Long eTime = System.currentTimeMillis();
		logger.info("匯入檔案數量:"+rltList.size()+" 總耗時分鐘: "+(eTime-bTime)/GPSConstants.SECOND_2_MS);
	}
	    return tableData;
    }
	
}

對STEP 2 中用到的常量和檔案分組方式進行簡單說明:

對需要匯入的100多個TXT檔案進行分組匯入,分組時提供兩種分組方式: 一種,隨機進行分組,也就是在同一個組內,可能有的檔案比較大,有的檔案比較小,例如:(File1:3M, File2 : 1M, File3: 4M, File4: 20KB);這種分組的好處:每組檔案總的大小相對穩定,不會導致一組檔案特別到,因為後期匯入時按組開執行緒,使用Callable介面 拿CONNECTION操作,可以降低OOM概率;第二種:按檔案大小分組,也就是說同一組內,每個檔案的大小不相上下,好處,後期插入時,使用Future.get方法時是阻塞方法,可以有效的減下彼此間的等待時間,速度快,同時也增加了OOM概率。

常量:

public class GPSConstants {
    //匯入STATE
    public static final String IMPORT_DEFAULT = "FAILURE";
    public static final String IMPORT_SUCCESS = "SUCCESS";
    public static final int SECOND_2_MS = 1000;
	
    //匯入檔案進行隨機分組:好處:減小OOM概率 
    //匯入檔案按大小分組:好處:匯入速度相當較快;壞處:當每組檔案數量比較多時,增加OOM概率
    //true : 隨機分組 false: 按大小分組;
    public static final boolean RANDOM_OR_SIZE_DIDIVE_STRATEGY = true;
    //public static final boolean SIZE_DIDIVE_STRATEGY = true;

    //匯入檔案分組:每組中檔案個數;建議當JVM XMx<300不要超過5個;
    public static final int FILE_NUM_4_GROUP = 4;
	
}

兩種分組方式:

/**
 * @Description 隨機分組工具類, 提供按檔案大小分組和隨機分組方式
 */
public class RandomGroupUtil {

    /**
     * @Dscription 隨機分組模式
     * @param origData 要分組的資料;
     * @param numOfEachGroup 每組中成員數量;
     * @return
     */
    public static <T> List<List<T>> getGroupByRandom(T[] origData, int numOfEachGroup) {
	int len = origData.length;
    	List<T> origList = new ArrayList<T>();
    	List<List<T>> groupsList = new ArrayList<List<T>>();
    	for(T f : origData)
    	    origList.add(f);
    	if(len < numOfEachGroup) {//只分一組
    	    groupsList.add(origList);
    	    return groupsList;
    	}
        // 計算可以分多少組
        int groupNum = ((len + numOfEachGroup) - 1) / numOfEachGroup;
        for (int i = 0; i < groupNum-1; i++) {
            List<T> group = new ArrayList<T>();
            for (int j = 0; j < numOfEachGroup; j++) {
                int random = getRandom(origList.size());
                group.add(origList.get(random));
                origList.remove(random);
            }
            groupsList.add(group);
        }
        // 最後剩下的人分成一組;
        groupsList.add(origList);
        return groupsList;
    }
	
    /**
     * @Decription 按檔案大小進行分組,每組成員中所含的檔案大小相近
     * @param files
     * @param numOfEachGroup 每組中要包含的成員數量;
     * @return 分好的組
     */
    public static List<List<File>> getGroupBySize(File[] files, int numOfEachGroup) {
	int len = files.length;
    	List<File> orgList = new ArrayList<File>();
    	List<List<File>> groupsList = new ArrayList<List<File>>();
    	for(File f : files)
    	    orgList.add(f);
    	if(len < numOfEachGroup) {//只分一組
    	    groupsList.add(orgList);
    	    return groupsList;
    	}
    	//按檔案大小排序
    	Collections.sort(orgList, new Comparator<File>(){
	    public int compare(File o1, File o2) {
		if( o1.length() == o2.length())
		    return 0;
		return (o1.length() - o2.length() > 0) ? 1 : -1;
	    }
    	});
        // 計算可以分多少組
        int groupNum = ((len + numOfEachGroup) - 1) / numOfEachGroup;
        for (int i = 0; i < groupNum-1; i++) {
            List<File> group = new ArrayList<File>();
            for (int j = 0; j < numOfEachGroup; j++) {
                group.add(orgList.get(orgList.size()-1));
                orgList.remove(orgList.size()-1);
            }
            groupsList.add(group);
        }
        // 最後剩下的人分成一組;
        groupsList.add(orgList);
        return groupsList;
    }
	
    private static int getRandom(int num) {
        Random r = new Random();
        return r.nextInt(num);
    }
}

STEP 3 : 使用 Executors 和 DataSource 資料來源連線池按組進行批量匯入


/**
 * @Description : 使用executor呼叫call, 匯入狀態資訊封入ReceiveGpsTO
 *
 */
public class ImportGpsData2DB {
    private static final Logger logger = LoggerFactory.getLogger(ImportGpsData2DB.class);
    
    private static ExecutorService executor = null;
    private static WrapDataSource dataSource = null;

    static{
	executor = Executors.newFixedThreadPool(GPSConstants.FILE_NUM_4_GROUP);
	dataSource = (WrapDataSource) ZBus.findCommonService("dataSource");
    }
	
    public static List<ReceiveGpsTO> importGpsData(List<File> gl) {
	List<ReceiveGpsTO> rList = new ArrayList<ReceiveGpsTO>();
	List<Future<ReceiveGpsTO>> fList = new ArrayList<Future<ReceiveGpsTO>>();
	for(File file : gl){
	    Connection conn = null;
	    try {
		conn = dataSource.getConnection();
	    } catch (SQLException e) {
                logger.error("{}從連線池中獲取資料庫連線失敗, 本組檔案 {} 匯入失敗", ImportGpsData2DB.class.getName(), gl);
		e.printStackTrace();
                return rList;
	    }
            // 開啟執行緒進行匯入
	    Future<ReceiveGpsTO> future = executor.submit(new ImportGpsCallable(file, conn));
	    fList.add(future);
	}
	for(Future<ReceiveGpsTO> f : fList){
	    try {
		ReceiveGpsTO rTO = f.get();
		rList.add(rTO);
	    } catch (Exception e) {
                logger.error("{} 獲取本組檔案{} 匯入狀態失敗", ImportGpsData2DB.class.getName(), f);
		e.printStackTrace();
	    } 		
	}
	return rList;
    }

}

資料來源配置:

<beans>
    <bean id="dbproperties"
	class="com.ssb.***.config.PropertyPlaceholderConfigurer">
	<property name="locations">
	    <value>classpath:*.db.properties</value>
	</property>
    </bean>
    <bean id="dataSource"
	class="com.zte.WrapDataSource"
	destroy-method="close" init-method="initSource">
	<property name="driverClassName"
		value="${ssb.driverClassName}" />
	<property name="url"
		value="${ssb.url}${ssb.dbname}" />
	<property name="username" value="${ssb.username}" />
	<property name="password" value="${ssb.password}" />
	<property name="initialSize" value="5" />
	<property name="maxActive" value="30" />
	<property name="maxWait" value="-1" />
	<property name="maxIdle" value="10" />
	<property name="removeAbandoned" value="true" />
	<property name="removeAbandonedTimeout" value="30000" />
	<property name="logAbandoned" value="true" />
	<property name="encrypt" value="${ssb.isEncrypt}" />
    </bean>
</beans>

STEP 4 : 實現 CALLABLE 執行緒進行匯入,每組中得每一個檔案對應一個匯入執行緒;

/**
 * @Description GPS資料匯入執行緒,主要完成txt檔案解析和資料庫匯入;
 * @author ***
 *
 */
public class ImportGpsCallable implements Callable<ReceiveGpsTO> {
	
    private static final String SQL_STR = "INSERT INTO [ZXX_MXX].[dbo].[GPS_XXX_PASS_RECORD]( ORDER_NUM, HPHM, HPZL, PASS_TIME, LONGITUDE, "
			+ "LATITUDE, SPEED, DIRECTION, POSITION_STATE, MILEAGE, RELATIVE_MILEAGE, VEHICLE_STATE, LOCATION) VALUES("
			+ "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
	
    private File file;
    private Connection conn;
    //通過構造方法傳入所需的資源
    public ImportGpsCallable(File file, Connection conn) {
	this.file = file;
	this.conn = conn;
    }

    @Override
    public ReceiveGpsTO call() throws Exception {
	ReceiveGpsTO gpsRlt = new ReceiveGpsTO();
	gpsRlt.setImportfileName(file.getName());
	Long beginImpTime = System.currentTimeMillis();
        // 通過工具類DataClearTools對TXT檔案進行格式驗證和資料解析;
	List<GpsVehiclePassRecord> rList = DataClearTools.getGpsRecords(file, gpsRlt);
	PreparedStatement ps = null;
	try{
            if(null != conn){
		conn.setAutoCommit(false);
		ps = conn.prepareStatement(SQL_STR);
		int[] rows = null;
		if( rList.size() > 0 ){
		    for(GpsVehiclePassRecord g : rList){
			ps.setLong(1, g.getOrderNum());
			ps.setString(2, g.getHphm());
			ps.setString(3, g.getHpzl());
			ps.setTimestamp(4, new Timestamp(g.getPassTime().getTime()));
			ps.setString(5, g.getLongitude());
			ps.setString(6, g.getLatitude());
			ps.setInt(7, g.getSpeed());
			ps.setString(8, g.getDirection());
			ps.setString(9, g.getPositionState());
			ps.setString(10, g.getMileage());
			ps.setString(11, g.getRelativeMileage());
			ps.setString(12, g.getVehicleState());
			ps.setString(13, g.getLocation());
			ps.addBatch();
		    }
		    rows = ps.executeBatch();
		    conn.commit();
		}else{
		    gpsRlt.setImportRemark("資料匯入失敗");
		    gpsRlt.setImportState(GPSConstants.IMPORT_DEFAULT);
		}
		gpsRlt.setImportRows(rows.length);
		Long endImpTime = System.currentTimeMillis();
		gpsRlt.setImportTime((int) (endImpTime-beginImpTime));
		System.out.println(Thread.currentThread().getName()+" "+gpsRlt.getImportTime());
		gpsRlt.setImportRemark("資料匯入成功");
		gpsRlt.setImportState(GPSConstants.IMPORT_SUCCESS);
	    }
	}catch(Exception e){
            conn.rollback();
	    e.printStackTrace();
	    gpsRlt.setImportRemark("資料匯入失敗");
	    gpsRlt.setImportState(GPSConstants.IMPORT_DEFAULT);
	}finally{
	    if(null != ps){
		ps.close();
	    }
	    if(null != conn){
		conn.close();
	    }
	}
	return gpsRlt;
    }
}

對TXT檔案進行格式驗證和解析的工具類 DataClearTools 如下


/**
 * @Description 對TXT檔案進行解析;
 * @author ***
 *
 */
public class DataClearTools {
	
    private static final Logger logger = LoggerFactory.getLogger(DataClearTools.class);
	
    private static final String DEF_HPZL = "01";
    private static final String ENCODING = "UTF-8";
	
    /**
     * @Description 解析TXT -> List<GpsVehiclePassRecord>
     * @param file 被解析的TXT檔案
     * @param to 解析過程中的狀態MODEL
     * @return 解析完成的List
     */
    public static List<GpsVehiclePassRecord> getGpsRecords(File file, ReceiveGpsTO to) {
        List<GpsVehiclePassRecord> gpsList = new ArrayList<GpsVehiclePassRecord>();
	List<GpsVehiclePassRecord> duplicateList = new ArrayList<GpsVehiclePassRecord>();
	InputStreamReader read = null;
	BufferedReader bufferedReader = null;
	int row=0;
	try {
	    if (file.isFile() && file.exists()) {
		read = new InputStreamReader(new FileInputStream(file), ENCODING);
		bufferedReader = new BufferedReader(read);
		String lineTxt = null;
		while ((lineTxt = bufferedReader.readLine()) != null) {
		    row++;
		    if(1 == row){
			continue;
		    }
		    if (StringUtils.isNotBlank(lineTxt)) {// 不等於空才執行
                        GpsVehiclePassRecord pass = null;
			try{
			    pass = lineTxt2GpsPass(lineTxt);	
			}catch(Exception e){
			    throw e;
			}
			if(null != pass)
			    gpsList.add(pass);
			}else{
			    row++;
			}
		    }
		    //根據hphm和過車時間去掉重複行
		    if(gpsList.size()>0){
			duplicateList = DuplicateGpsRecord(gpsList);
		    }
		}
             }
	  } catch (IOException e) {
	    logger.error("解析GPS檔案{}出錯! ",file.getName());
	    to.setImportRemark("解析檔案過程中出錯");
	    to.setImportRows(0);
            duplicateList.clear();
	    e.printStackTrace();
	} catch (Exception e){
	    logger.error("解析GPS檔案{}出錯! ",file.getName());
	    to.setImportRemark("解析檔案行資訊過程中出錯");
	    to.setImportRows(0);
	    to.setErrorRow(row);
	    duplicateList.clear();
	    e.printStackTrace();
	}finally{
	    if(bufferedReader!=null){
		try {
	            bufferedReader.close();
		} catch (IOException e) {
		    e.printStackTrace();
		}
	    }
	    if(null != read){
		try {
		    read.close();
		} catch (IOException e) {
		    e.printStackTrace();
		}
	    }
	}
	return duplicateList;
    }
	
    /**
     * @Description 根據hphm和過車時間去重;
     * @param gpsList
     * @return
     */
    private static List<GpsVehiclePassRecord> DuplicateGpsRecord(List<GpsVehiclePassRecord> gpsList) {
        List<GpsVehiclePassRecord> duplicateList = new ArrayList<GpsVehiclePassRecord>();
	for(GpsVehiclePassRecord g : gpsList){
	    if(duplicateList.contains(g)){
		continue;
	    }else{
		duplicateList.add(g);
	    }
	}
	    return duplicateList;
    }

    /**
     * @Description 根據讀到的每行資訊解析成Gps Model
     * @param lineTxt
     * @return
     * @throws Exception 
     */
    private static GpsVehiclePassRecord lineTxt2GpsPass(String lineTxt) throws Exception {
        GpsVehiclePassRecord record = new GpsVehiclePassRecord();
	try{
	    String[] strArr = lineTxt.split("\\s+");
	    String hphmStr;
	    //序號
	    record.setOrderNum(Long.parseLong(strArr[0].trim()));
	    //XXXX 轉換過程
	    return record;
	}catch(Exception allE){
	    logger.error("解析行內容到GpsVehicelPassRecord出錯");
	    throw allE;
	}
    }
}

結果展示:

CASE 1 : 對 120個TXT 檔案,總共 97萬5千 條資料進行匯入:採用檔案 隨機分組策略,每組 4 個檔案 ;進行匯入:總共用時:576秒;

所需的Heap Size 最大不超過200M (PS: 自然包括工程其它東西佔用); 在此需說明,由於 匯入執行緒使用的是Callable,其Future.get() 方法是阻塞方法,所以每次匯入的時間可能不一樣,不過總體相差不會太多;

CASE 2 : 對 120個TXT 檔案,總共 97萬5千 條資料進行匯入:採用檔案 按檔案大小分組策略,每組 4 個檔案 ;進行匯入:總共用時:366秒;所需的Heap Size 最大不超過300M (PS: 自然包括工程其它東西佔用); 不過可以明顯看出,時間縮短了但是需要的最大Heap Size 有所增加,

需要的HEAP SIZE 比隨機匯入時大的原因,按檔案大小進行排序,當一組檔案都比較大時,自然H S也就比較大,

所需的匯入時間 366 < 576的原因是,在匯入過程中使用了Callable介面,Future.get() 方法是阻塞方法,相同大小的檔案在一起匯入,批次不會等待太多時間;

CASE 3 : 採用 檔案按大小分組,每組成員數量為5個,匯入時間大概3分多點,此處不再羅列,注意並不是 每組的成員越多越好,當成員過高時需要的資料庫連結也越多,到達一定量時,會方式IO阻塞問題;


尾聲:

希望 朋友可以在 異常處理; 資料迴歸;和併發問題上給出寶貴的意見和批評,感謝;