Java合併解壓多省話單gz大檔案,程式碼涉及解壓、合併、刪除、複製邏輯
附錄裡面回加上工程程式碼,到時候各異直接下載下來
1)先看專案的配置檔案(mergeFilleUtil.properties),log4j內容我就不貼了,一會兒自己下載看看就行:
#原始檔案
PROVINCE_DIR=E:\\test\\rootfile
#解壓後文件存放的路徑
UN_ZIP_PATH=E:\\test\\unZip
#合併後文件存放的路徑
OUT_PATH=E:\\test\\result
#已經處理過的檔案存放路徑
DONE_FILE_PATH=E:\\test\\doneFile
#任務執行結束後哪些檔案路徑下的需要刪除
DELETE_PATH=E:\\test\\rootfile,E:\\test\\unZip
#配置任務的時間間隔,以秒為單位,一天是:24*60*60*1000
TASK_PERIOD=86400
#任務開始的小時時間
TASK_BEGIN_HOUR=9
#任務開始的分鐘
TASK_BEGIN_MINUTE=26
2)主程式入口:
public static void main(String[] args) {
Map proMap = LoadProperty.readProperty();
Timer timer = new Timer();
MergeFileUtil mergeTask = new MergeFileUtil();//要執行的任務
//任務的時間間隔,1000表示1秒
long intevalPeriod = Long.valueOf(proMap.get("TASK_PERIOD").toString());
//Calendar類封裝了一系列操作date時間的方法
Calendar calendar = Calendar.getInstance();
int year = calendar.get(calendar.YEAR);
int month = calendar.get(calendar.MONTH);
int day = calendar.get(calendar.DAY_OF_MONTH);
//指定每天執行任務的小時跟分鐘比如19點32分
int hour = Integer.valueOf(proMap.get("TASK_BEGIN_HOUR").toString());
int minute = Integer.valueOf(proMap.get("TASK_BEGIN_MINUTE").toString());
calendar.set(year, month, day, hour, minute);
//Calendar.getTime()方法返回Date型別的返回值
Date date = calendar.getTime();
System.out.println("執行時間:" +year+"-"+month+"-"+day+"-"+hour+"-"+minute);
//指定定時任務的執行規則,呼叫定時任務。開始執行
timer.schedule(mergeTask, date, intevalPeriod*1000);
}
}
3)讀取配置檔案工具類:
public class LoadProperty {
private static Logger log = Logger.getLogger(LoadProperty.class);
private static Properties prop;
static{
ClassLoader loader = LoadProperty.class.getClassLoader();
InputStream in = loader.getResourceAsStream("mergeFilleUtil.properties");//此時仍然把資源裝載近記憶體中了。
prop = new Properties();
try {
//utf-8編碼配置檔案,防止中文路徑出錯
prop.load(new InputStreamReader(in, "UTF-8"));
} catch (UnsupportedEncodingException e1) {
log.error("載入配置檔案出錯,請檢查!");
} catch (IOException e1) {
log.error("載入配置檔案流出錯,請檢查!");
} finally{
if(in!=null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@SuppressWarnings("rawtypes")
public static Map readProperty(){
return prop;
}
}
4) 多執行緒檔案解壓合併工具類
public class MergeFileUtil extends TimerTask{
private static Logger log = Logger.getLogger(MergeFileUtil.class);
@Override
public void run() {
unZipFile();
}
//解壓
@SuppressWarnings("rawtypes")
public void unZipFile(){
//讀取配置檔案中的路徑
Map proMap = LoadProperty.readProperty();
String rootdir = proMap.get("PROVINCE_DIR").toString();
String unZipPath = proMap.get("UN_ZIP_PATH").toString();
String outDir = proMap.get("OUT_PATH").toString();
File rootfile=new File(rootdir);
if(!rootfile.exists()||!rootfile.isDirectory()){
log.error("檔案路徑不存在,請檢查欲解壓的檔案所在目錄是否填寫正確!");
return;
}else{
File[] files=rootfile.listFiles();
if(files.length==0){
log.error("警告:沒有要解壓的檔案!請檢查:"+rootfile+"下是否存在需解壓的檔案!");
}else{
//分配解壓執行緒池
ExecutorService pool=Executors.newFixedThreadPool(files.length);
for (File provinceDir:files){
excuteThreads(pool,provinceDir,outDir,unZipPath);
}
pool.shutdown();
while (true) {
if (pool.isTerminated()) {
//解壓結束後啟動合併執行緒池
mergerFile();
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
//解壓類
private void excuteThreads(ExecutorService pool,File provinceDir,String outdir,String unZipPath){
pool.execute(
new UnZipFile(provinceDir.getAbsolutePath(),
outdir,unZipPath));
}
@SuppressWarnings("rawtypes")
private void mergerFile(){
//讀取配置檔案中的路徑
Map proMap = LoadProperty.readProperty();
String rootdir = proMap.get("PROVINCE_DIR").toString();
String unZipPath = proMap.get("UN_ZIP_PATH").toString();
String outDir = proMap.get("OUT_PATH").toString();
String doneFilePath = proMap.get("DONE_FILE_PATH").toString();
String deletePath = proMap.get("DELETE_PATH").toString();
String []deleteArray = deletePath.split(",");
File unZipFile=new File(unZipPath);
if(!unZipFile.exists()||!unZipFile.isDirectory()){
log.error("檔案路徑不存在,請檢查欲合併的檔案所在目錄是否填寫正確!");
return;
}else{
File[] files=unZipFile.listFiles();
if(files.length==0){
log.error("警告:沒有要合併的檔案!請檢查:"+unZipFile.getPath()+"下是否存在需合併的檔案!");
}else{
//分配執行緒池
ExecutorService pool=Executors.newFixedThreadPool(files.length);
for (File provinceDir:files){
excuteMergerThreads(pool,provinceDir,outDir,unZipPath);
}
pool.shutdown();
while(true){
if(pool.isTerminated()){
//合併結束後啟動移動跟刪除檔案程式
MoveAndDeleteFile moveAndDeleteFile = new MoveAndDeleteFile();
log.info("開始移動檔案到已處理資料夾!");
moveAndDeleteFile.moveFile(new File(rootdir),new File(doneFilePath));
log.info("移動檔案到已處理資料夾成功!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("開始刪除檔案!");
for(int i=0;i<deleteArray.length;i++){
File file = new File(deleteArray[i]);
if(!file.exists()){
log.error("欲刪除的目錄不存在,請檢查目錄:"+file.getPath()+"是否正確存在!");
}else{
moveAndDeleteFile.deleteFileAndDir(new File(deleteArray[i]),true);
}
}
log.info("刪除檔案結束!");
log.info("解壓合併檔案程式執行結束!");
break;
}
}
}
}
}
//檔案合併類
private void excuteMergerThreads(ExecutorService pool,File file,String outDir,String unZipPath){
pool.execute(new MergeFile(file.getAbsolutePath(),
outDir,unZipPath));
}
}
5)解壓程式碼:
public class UnZipFile implements Runnable {
private static Logger log = Logger.getLogger(UnZipFile.class);
private String outFilePath; //合併檔案輸出目錄
private String mergeFileDir; //原始檔案存放目錄
private String unZipFileDir; //解壓檔案輸出目錄
private BufferedOutputStream bufferedOutputStream;
public static final int BUFSIZE = 1024 * 8;
private static int count;//計算器,記錄檔案解壓個數
public UnZipFile(){
}
public UnZipFile(String mergeFileDir, String outFilePath,String unZipFileDir) {
this.outFilePath = outFilePath;
this.mergeFileDir = mergeFileDir;
this.unZipFileDir = unZipFileDir;
}
public void run() {
File f=new File(outFilePath);
File mergerDir=new File(mergeFileDir);
if(!f.exists()){
f.mkdirs();
return;
}
try {
log.info("開始解壓路徑:"+mergerDir+"下檔案!");
unzipFile(mergerDir,unZipFileDir);
log.info("解壓檔案:"+mergerDir+"結束!");
} catch (IOException e) {
e.printStackTrace();
}
}
private void unzipFile(File file,String OutFileDir) throws IOException{
if(file.isDirectory()){
for(File children:file.listFiles()){
unzipFile(children,OutFileDir);
}
}else if(file.getAbsolutePath().endsWith(".gz")){
count++;
doUnZipFile(file, OutFileDir);
//每五千個列印一次,用作程式提示
if(count>0&&count%1000==0){
log.info("已經解壓"+count+"個檔案!");
}
}
}
/*
* 檔案解壓
* zipFile:傳入解壓檔案
* outputDirectory:解壓目的地路徑
*/
public void doUnZipFile(File zipFile,String outputDirectory) {
FileInputStream fis = null;
GZIPInputStream is = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
String separator = File.separator;
//確定目的地解壓的目錄結構begin
int lastIndex = zipFile.getPath().lastIndexOf(separator);
int index = mergeFileDir.lastIndexOf(separator);
String dirStructurePath=null;
if(lastIndex<=index){
dirStructurePath ="";
}else{
dirStructurePath = zipFile.getPath().substring(index+1, zipFile.getPath().lastIndexOf(separator));
}
//確定好目的地解壓的目錄結構end
String dirPath=null;
try {
fis = new FileInputStream(zipFile);
is = new GZIPInputStream(new BufferedInputStream(fis));
in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);
bufferedInputStream = new BufferedInputStream(in);
TarArchiveEntry entry = (TarArchiveEntry) in.getNextEntry();
while (entry != null) {
//要解壓的檔案以及其壓縮包內的檔案:包名稱/包下檔名稱
String name = entry.getName();
String[] names = name.split("/");
//解壓目的地目錄
String dirRootPath;
if(dirStructurePath==""){
dirRootPath = outputDirectory;
}else{
dirRootPath = outputDirectory+separator+dirStructurePath;
}
for (int i = 0; i < names.length; i++)
{
String str = names[i];
dirRootPath = dirRootPath + separator + str;
}
dirPath = dirRootPath.substring(0,dirRootPath.lastIndexOf(separator));
File dirFilePath = new File(dirPath);
if(!dirFilePath.exists()){
dirFilePath.mkdirs();
}
if (name.endsWith("/")) {
mkFolder(dirRootPath);
} else {
File file = mkFile(dirRootPath);
bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file));
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
}
entry = (TarArchiveEntry) in.getNextEntry();
}
//log.info("解壓檔案:"+zipFile.getName()+"到: "+dirPath+" "+"完成!");
} catch (FileNotFoundException e) {
log.info("找不到要解壓的檔案所在路徑!");
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ArchiveException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
if(in!=null){
in.close();
}
if(is!=null){
is.close();
}
if(fis!=null){
fis.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void mkFolder(String fileName) {
File f = new File(fileName);
if (!f.exists()) {
f.mkdir();
}
}
private File mkFile(String fileName) {
File f = new File(fileName);
try {
f.createNewFile();
} catch (IOException e) {
log.error("檔案"+fileName+"建立失敗");
e.printStackTrace();
}
return f;
}
}
6)合併程式碼:
public class MergeFile implements Runnable{
private static Logger log = Logger.getLogger(UnZipFile.class);
private String outFilePath; //合併檔案輸出目錄
private String mergeFileDir; //解壓檔案存放目錄
private static volatile int count;//計算器,記錄檔案解壓個數
public static final int BUFSIZE = 1024 * 8;
public MergeFile(String mergeFileDir, String outFilePath,String unZipFileDir) {
this.outFilePath = outFilePath;
this.mergeFileDir = mergeFileDir;
}
@Override
public void run() {
mergerUnZipFile();
}
private void mergerUnZipFile(){
FileChannel outChannel = null;
//合併解壓目錄下的檔案
File f = new File(mergeFileDir);
String path = f.getPath();
String outPath = outFilePath+path.substring(path.lastIndexOf(File.separator));
if(!new File(outPath).exists()){
new File(outPath).mkdirs();
}
try {
//構造合併檔案路徑以及檔名
File f2=new File(outPath+File.separator+"result");
if(!f2.exists()){
//若檔案不存在,則新建
f2.createNewFile();
outChannel = new FileOutputStream(f2).getChannel();
}else{
//若檔案存在,則在原始檔後面追加新的內容
outChannel = new FileOutputStream(f2,true).getChannel();
}
} catch (IOException e) {
log.error("檔案:"+outPath+File.separator+"result"+"不存在,請檢查!");
e.printStackTrace();
}
log.info("開始合併路徑:"+f.getPath()+"下檔案");
mergeFile(f,outChannel);
log.info("路徑:"+f.getPath()+"下檔案合併完成");
if(outChannel!=null){
try {
outChannel.close();
} catch (IOException e) {
log.error("關閉管道流出錯,請檢查檔案合併類下run方法的檔案管道流!");
e.printStackTrace();
}
}
}
/*
* 用途:合併檔案
* file:存放檔案的根目錄
* outFileChannel:輸出檔案管道流
*/
public void mergeFile(File file,FileChannel outFileChannel){
if(file.isDirectory()){
for(File children:file.listFiles()){
mergeFile(children,outFileChannel);
}
}else if(!file.getAbsolutePath().endsWith(".gz")){
mergeFiles(file,outFileChannel);
//每五千個列印一次,用作程式提示
count++;
if(count>0&&count%50000==0){
log.info("已經合併檔案"+count+"個!");
}
}
}
public static void mergeFiles(File file,FileChannel outChannel) {
try {
FileChannel fc = new FileInputStream(file).getChannel();
ByteBuffer bb = ByteBuffer.allocate(BUFSIZE);
while(fc.read(bb) != -1){
bb.flip();
outChannel.write(bb);
bb.clear();
}
fc.close();
//log.info("合併檔案"+file.getPath()+"完成!");
} catch (IOException ioe) {
log.error("合併檔案:"+file.getPath()+"失敗!");
ioe.printStackTrace();
}
}
}
7)移動跟刪除檔案程式碼:
public class MoveAndDeleteFile {
private static int count;//計數器
private Logger log = Logger.getLogger(MoveAndDeleteFile.class);
/*
* 用途:將檔案從一個目錄移動到另外一個目錄
* rootdir:存放原檔案的根目錄
* rootBoo:為true時不刪除根目錄,只刪除根目錄下檔案跟資料夾
*/
public void moveFile(File rootdir,File doneFilePath){
//開始移動檔案
if(rootdir.isDirectory()){
for(File file:rootdir.listFiles()){
moveFile(file,doneFilePath);
}
}else{
moveTheFile(rootdir,doneFilePath);
//每五千個列印一次,用作程式提示
count++;
if(count>0&&count%5000==0){
log.info("已經移動檔案"+count+"個!");
}
}
}
public void moveTheFile(File rootdir,File doneFilePath){
FileInputStream fi = null;
FileOutputStream fo = null;
FileChannel in = null;
FileChannel out = null;
try {
String currentFilePath = rootdir.getPath();
//確定好目的地解壓的目錄結構begin
int index = "E:\\test\\rootfile\\".lastIndexOf(File.separator);
String dirStructurePath = doneFilePath+currentFilePath.substring(index);
File dir = new File(dirStructurePath.substring(0, dirStructurePath.lastIndexOf(File.separator)));
if(!dir.exists()){
dir.mkdirs();
}
File dirFile = new File(dirStructurePath);
if(!dirFile.exists()){
dir.createNewFile();
}
fi = new FileInputStream(rootdir);
fo = new FileOutputStream(dirStructurePath);
in = fi.getChannel();//得到對應的檔案通道
out = fo.getChannel();//得到對應的檔案通道
in.transferTo(0, in.size(), out);//連線兩個通道,並且從in通道讀取,然後寫入out通道
//System.out.println("移動檔案:"+rootdir.getPath()+"完成!");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fi.close();
in.close();
fo.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
* 先刪除檔案再刪除資料夾
* file:存放檔案的根目錄
* rootBoo:為true時不刪除根目錄,只刪除根目錄下檔案跟資料夾
*/
public void deleteFileAndDir(File file,boolean rootBoo){
String rootDir = null;
if(rootBoo){
rootDir = file.getPath();
}
File []files = file.listFiles();
for(int i=0;i<files.length;i++){
if(!files[i].isDirectory()){
files[i].delete();
//每五千個列印一次,用作程式提示
count++;
if(count>0&&count%50000==0){
log.info("已經刪除檔案"+count+"個!");
}
}else{
deleteFileAndDir(files[i],false);
}
}
//不刪除存放檔案的根目錄
if(rootDir!=file.getPath()){
file.delete();
}
}
}
8)程式碼執行分析結果:
程式碼測試了兩個省共15G的壓縮包,解壓後三十萬個小檔案,從解壓、合併、移動、刪除整個流程耗時共3個小時,合併後的檔案大小共40G,效率上看還可以
9)注意事項
解壓跟合併必須做成多執行緒啟動的,每個省的解壓跟合併都分配一個執行緒,程式碼裡面用了Java執行緒池,在分配執行緒池數量前線計算一共有幾個,按照省數量分配,程式碼裡面在
上面第四點:多執行緒檔案解壓合併工具類裡面有體現。
10)專案的配置檔案我放在程式碼裡面一同放在了附件,程式碼有不當之處歡迎指點,大家共同學習進步。用到的jar包我也在專案裡面直接上傳了,下載下來就能看到。
專案的百度網盤路徑:http://pan.baidu.com/s/1c0vYUJI,複製直接下載即可