1. 程式人生 > >Java合併解壓多省話單gz大檔案,程式碼涉及解壓、合併、刪除、複製邏輯

Java合併解壓多省話單gz大檔案,程式碼涉及解壓、合併、刪除、複製邏輯

專案要求是能定時解壓gz檔案,並按照省將檔案成一個大檔案,程式碼實現功能:按照gz壓縮包的路徑格式解壓檔案,最後將檔案合併成按照省份名稱排放的大檔案,專案用log4j記錄日誌,用Java定時器實現定時解壓合併,並且刪除解壓出來的小檔案,同時也刪除解壓目錄下的壓縮包,並將壓縮包移動到已處理資料夾下(這麼做是因為檔案是定時下載到一個目錄的,每次我們去這個目錄下去處理檔案,為了不重複解壓合併已經處理過的檔案,邏輯上我把處理過的檔案都移動到一個已處理資料夾下,並且刪除目錄下的壓縮包,這樣就免去了記錄跟判斷檔案是否是新檔案的開銷),下面直接上程式碼:

附錄裡面回加上工程程式碼,到時候各異直接下載下來

1)先看專案的配置檔案(mergeFilleUtil.properties),log4j內容我就不貼了,一會兒自己下載看看就行:

#原始檔案
PROVINCE_DIR=E:\\test\\rootfile
#解壓後文件存放的路徑
UN_ZIP_PATH=E:\\test\\unZip
#合併後文件存放的路徑
OUT_PATH=E:\\test\\result
#已經處理過的檔案存放路徑
DONE_FILE_PATH=E:\\test\\doneFile
#任務執行結束後哪些檔案路徑下的需要刪除
DELETE_PATH=E:\\test\\rootfile,E:\\test\\unZip
#配置任務的時間間隔,以秒為單位,一天是:24*60*60*1000
TASK_PERIOD=86400
#任務開始的小時時間
TASK_BEGIN_HOUR=9
#任務開始的分鐘
TASK_BEGIN_MINUTE=26

2)主程式入口:

public static void main(String[] args) {
        Map proMap =  LoadProperty.readProperty();
        Timer timer = new Timer();
        MergeFileUtil mergeTask = new MergeFileUtil();//要執行的任務
        //任務的時間間隔,1000表示1秒
        long intevalPeriod = Long.valueOf(proMap.get("TASK_PERIOD").toString());
        //Calendar類封裝了一系列操作date時間的方法
        Calendar calendar = Calendar.getInstance();
        int year = calendar.get(calendar.YEAR);
        int month = calendar.get(calendar.MONTH);
        int day = calendar.get(calendar.DAY_OF_MONTH);
        //指定每天執行任務的小時跟分鐘比如19點32分
        int hour = Integer.valueOf(proMap.get("TASK_BEGIN_HOUR").toString());
        int minute = Integer.valueOf(proMap.get("TASK_BEGIN_MINUTE").toString());
        calendar.set(year, month, day, hour, minute);
        //Calendar.getTime()方法返回Date型別的返回值
        Date date = calendar.getTime();
        System.out.println("執行時間:" +year+"-"+month+"-"+day+"-"+hour+"-"+minute);
        //指定定時任務的執行規則,呼叫定時任務。開始執行
        timer.schedule(mergeTask, date, intevalPeriod*1000);
      }
    }

3)讀取配置檔案工具類:

public class LoadProperty {
    private static Logger log = Logger.getLogger(LoadProperty.class);
    private static Properties prop;
    
    static{
          ClassLoader loader = LoadProperty.class.getClassLoader();
          InputStream in = loader.getResourceAsStream("mergeFilleUtil.properties");//此時仍然把資源裝載近記憶體中了。
          prop = new Properties();
          try {
            //utf-8編碼配置檔案,防止中文路徑出錯
            prop.load(new InputStreamReader(in, "UTF-8"));
         } catch (UnsupportedEncodingException e1) {
            log.error("載入配置檔案出錯,請檢查!");
         } catch (IOException e1) {
            log.error("載入配置檔案流出錯,請檢查!");
         } finally{
             if(in!=null){
                 try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
             }
         }
    }
    @SuppressWarnings("rawtypes")
    public static Map readProperty(){
        return prop;
    }
}

4) 多執行緒檔案解壓合併工具類

public class MergeFileUtil  extends TimerTask{
     private static Logger log =  Logger.getLogger(MergeFileUtil.class);
    
     @Override
        public void run() {
         unZipFile();
        }
     //解壓
    @SuppressWarnings("rawtypes")
    public void unZipFile(){
          //讀取配置檔案中的路徑
          Map proMap =  LoadProperty.readProperty();
          String rootdir = proMap.get("PROVINCE_DIR").toString();
          String unZipPath = proMap.get("UN_ZIP_PATH").toString();
          String outDir = proMap.get("OUT_PATH").toString();
          File rootfile=new File(rootdir);
            if(!rootfile.exists()||!rootfile.isDirectory()){
                log.error("檔案路徑不存在,請檢查欲解壓的檔案所在目錄是否填寫正確!");
                return;
            }else{
                File[] files=rootfile.listFiles();
                if(files.length==0){
                    log.error("警告:沒有要解壓的檔案!請檢查:"+rootfile+"下是否存在需解壓的檔案!");
                }else{
                    //分配解壓執行緒池
                    ExecutorService pool=Executors.newFixedThreadPool(files.length);
                    for (File provinceDir:files){
                        excuteThreads(pool,provinceDir,outDir,unZipPath);
                    }
                    pool.shutdown();
                    while (true) {    
                        if (pool.isTerminated()) {
                            //解壓結束後啟動合併執行緒池
                            mergerFile();
                            break;  
                        }  
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }  
                    }  
                }
            }
        }
    //解壓類
     private  void excuteThreads(ExecutorService pool,File provinceDir,String outdir,String unZipPath){
            pool.execute(
                    new UnZipFile(provinceDir.getAbsolutePath(),
                            outdir,unZipPath));
        }
    
    
    @SuppressWarnings("rawtypes")
    private void mergerFile(){
          //讀取配置檔案中的路徑
          Map proMap =  LoadProperty.readProperty();
          String rootdir = proMap.get("PROVINCE_DIR").toString();
          String unZipPath = proMap.get("UN_ZIP_PATH").toString();
          String outDir = proMap.get("OUT_PATH").toString();
          String doneFilePath = proMap.get("DONE_FILE_PATH").toString();
          String deletePath = proMap.get("DELETE_PATH").toString();
          String []deleteArray = deletePath.split(",");
          File unZipFile=new File(unZipPath);
          if(!unZipFile.exists()||!unZipFile.isDirectory()){
              log.error("檔案路徑不存在,請檢查欲合併的檔案所在目錄是否填寫正確!");
              return;
          }else{
              File[] files=unZipFile.listFiles();
              if(files.length==0){
                  log.error("警告:沒有要合併的檔案!請檢查:"+unZipFile.getPath()+"下是否存在需合併的檔案!");
              }else{
                  //分配執行緒池
                  ExecutorService pool=Executors.newFixedThreadPool(files.length);
                  for (File provinceDir:files){
                      excuteMergerThreads(pool,provinceDir,outDir,unZipPath);
                  }
                  pool.shutdown();
              while(true){
                  if(pool.isTerminated()){
                      //合併結束後啟動移動跟刪除檔案程式
                      MoveAndDeleteFile  moveAndDeleteFile = new MoveAndDeleteFile();
                      log.info("開始移動檔案到已處理資料夾!");
                      moveAndDeleteFile.moveFile(new File(rootdir),new File(doneFilePath));
                      log.info("移動檔案到已處理資料夾成功!");
                      try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                      log.info("開始刪除檔案!");
                      for(int i=0;i<deleteArray.length;i++){
                          File file = new File(deleteArray[i]);
                          if(!file.exists()){
                              log.error("欲刪除的目錄不存在,請檢查目錄:"+file.getPath()+"是否正確存在!");
                          }else{
                              moveAndDeleteFile.deleteFileAndDir(new File(deleteArray[i]),true);
                          }
                      }
                      log.info("刪除檔案結束!");
                      log.info("解壓合併檔案程式執行結束!");
                      break;
                  }
              }
              }
          }
    }
    //檔案合併類
     private void excuteMergerThreads(ExecutorService pool,File file,String outDir,String unZipPath){
         pool.execute(new MergeFile(file.getAbsolutePath(),
                            outDir,unZipPath));
     }
}

5)解壓程式碼:

public class UnZipFile implements Runnable {
    private static Logger log = Logger.getLogger(UnZipFile.class);
    private String outFilePath;        //合併檔案輸出目錄       
    private String mergeFileDir;       //原始檔案存放目錄
    private String unZipFileDir;       //解壓檔案輸出目錄
    private BufferedOutputStream bufferedOutputStream;
    public static final int BUFSIZE = 1024 * 8;
    private static int count;//計算器,記錄檔案解壓個數
    public UnZipFile(){
    }
    public UnZipFile(String mergeFileDir, String outFilePath,String unZipFileDir) {
        this.outFilePath = outFilePath;
        this.mergeFileDir = mergeFileDir;
        this.unZipFileDir = unZipFileDir;
    }
    public void run() {
        File f=new File(outFilePath);
        File mergerDir=new File(mergeFileDir);
        if(!f.exists()){
           f.mkdirs();
            return;
            }
        try {
            log.info("開始解壓路徑:"+mergerDir+"下檔案!");
            unzipFile(mergerDir,unZipFileDir);
            log.info("解壓檔案:"+mergerDir+"結束!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private void unzipFile(File file,String OutFileDir) throws IOException{
        if(file.isDirectory()){    
            for(File children:file.listFiles()){
                unzipFile(children,OutFileDir);
            }
        }else if(file.getAbsolutePath().endsWith(".gz")){
            count++;
            doUnZipFile(file, OutFileDir);
            //每五千個列印一次,用作程式提示
            if(count>0&&count%1000==0){
                log.info("已經解壓"+count+"個檔案!");
            }
        }
    }

    /*
     * 檔案解壓
     * zipFile:傳入解壓檔案
     * outputDirectory:解壓目的地路徑
    */
    public void doUnZipFile(File zipFile,String outputDirectory) {
        FileInputStream fis = null;
        GZIPInputStream is = null;
        ArchiveInputStream in = null;
        BufferedInputStream bufferedInputStream = null;
        String separator = File.separator;
        //確定目的地解壓的目錄結構begin
        int lastIndex = zipFile.getPath().lastIndexOf(separator);
        int index = mergeFileDir.lastIndexOf(separator);
        String dirStructurePath=null;
        if(lastIndex<=index){
            dirStructurePath ="";
        }else{
            dirStructurePath = zipFile.getPath().substring(index+1, zipFile.getPath().lastIndexOf(separator));
        }
        //確定好目的地解壓的目錄結構end
        String dirPath=null;
        try {
            fis = new FileInputStream(zipFile);
            is = new GZIPInputStream(new BufferedInputStream(fis));
            in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);
            bufferedInputStream = new BufferedInputStream(in);
            TarArchiveEntry entry = (TarArchiveEntry) in.getNextEntry();
            while (entry != null) {
                //要解壓的檔案以及其壓縮包內的檔案:包名稱/包下檔名稱
                String name = entry.getName();  
                String[] names = name.split("/");
                //解壓目的地目錄
                String  dirRootPath;
                if(dirStructurePath==""){
                    dirRootPath = outputDirectory;
                }else{
                    dirRootPath = outputDirectory+separator+dirStructurePath;
                }
                for (int i = 0; i < names.length; i++)  
                {  
                    String str = names[i];  
                    dirRootPath = dirRootPath + separator + str;  
                }  
                 dirPath = dirRootPath.substring(0,dirRootPath.lastIndexOf(separator));
                   File dirFilePath = new File(dirPath);
                   if(!dirFilePath.exists()){
                       dirFilePath.mkdirs();
                   }
               if (name.endsWith("/")) {
                   mkFolder(dirRootPath);
               } else {
                   File file = mkFile(dirRootPath);
                   bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file));
                   int b;
                   while ((b = bufferedInputStream.read()) != -1) {
                      bufferedOutputStream.write(b);
                   }
                   bufferedOutputStream.flush();
                   bufferedOutputStream.close();
               }
               entry = (TarArchiveEntry) in.getNextEntry();
            }
            //log.info("解壓檔案:"+zipFile.getName()+"到: "+dirPath+" "+"完成!");
        } catch (FileNotFoundException e) {
            log.info("找不到要解壓的檔案所在路徑!");
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ArchiveException e) {
            e.printStackTrace();
        } finally {
            try {
               if (bufferedInputStream != null) {
                   bufferedInputStream.close();
               }
               if(in!=null){
                   in.close();
               }
               if(is!=null){
                 is.close();  
               }
               if(fis!=null){
                   fis.close();
               }
            } catch (IOException e) {
               e.printStackTrace();
            }
        }
     }
        
        
         private void mkFolder(String fileName) {
            File f = new File(fileName);
            if (!f.exists()) {
                f.mkdir();
            }
         }
        
         private File mkFile(String fileName) {
            File f = new File(fileName);
            try {
                f.createNewFile();
            } catch (IOException e) {
                log.error("檔案"+fileName+"建立失敗");
                e.printStackTrace();
            }
            return f;
         }
}

6)合併程式碼:

public class MergeFile implements Runnable{
    private static Logger log = Logger.getLogger(UnZipFile.class);
    private String outFilePath;        //合併檔案輸出目錄       
    private String mergeFileDir;       //解壓檔案存放目錄
    private static volatile  int count;//計算器,記錄檔案解壓個數
    public static final int BUFSIZE = 1024 * 8;
    public MergeFile(String mergeFileDir, String outFilePath,String unZipFileDir) {
        this.outFilePath = outFilePath;
        this.mergeFileDir = mergeFileDir;
    }
    
    @Override
    public void run() {
        mergerUnZipFile();
    }
    
    private void mergerUnZipFile(){
        FileChannel outChannel = null;  
        //合併解壓目錄下的檔案
        File f = new File(mergeFileDir);
        String path = f.getPath();
        String outPath = outFilePath+path.substring(path.lastIndexOf(File.separator));
        if(!new File(outPath).exists()){
            new File(outPath).mkdirs();
        }
        try {
            //構造合併檔案路徑以及檔名
            File f2=new File(outPath+File.separator+"result");
            if(!f2.exists()){
                //若檔案不存在,則新建
                f2.createNewFile();
                outChannel = new FileOutputStream(f2).getChannel();
            }else{
                //若檔案存在,則在原始檔後面追加新的內容
                outChannel = new FileOutputStream(f2,true).getChannel();
            }
        } catch (IOException e) {
            log.error("檔案:"+outPath+File.separator+"result"+"不存在,請檢查!");
            e.printStackTrace();
        }
        log.info("開始合併路徑:"+f.getPath()+"下檔案");
        mergeFile(f,outChannel);
        log.info("路徑:"+f.getPath()+"下檔案合併完成");
        if(outChannel!=null){
            try {
                outChannel.close();
            } catch (IOException e) {
                log.error("關閉管道流出錯,請檢查檔案合併類下run方法的檔案管道流!");
                e.printStackTrace();
            }
        }
     }
    /*
       * 用途:合併檔案
       * file:存放檔案的根目錄
       * outFileChannel:輸出檔案管道流
       */
    public  void mergeFile(File file,FileChannel outFileChannel){
        if(file.isDirectory()){
            for(File children:file.listFiles()){
             mergeFile(children,outFileChannel);
            }
        }else if(!file.getAbsolutePath().endsWith(".gz")){
             mergeFiles(file,outFileChannel);
             //每五千個列印一次,用作程式提示
             count++;
            if(count>0&&count%50000==0){
                log.info("已經合併檔案"+count+"個!");
            }
        }
    }
    public static void mergeFiles(File file,FileChannel outChannel) {
        try {    
            FileChannel fc = new FileInputStream(file).getChannel();   
            ByteBuffer bb = ByteBuffer.allocate(BUFSIZE);  
            while(fc.read(bb) != -1){  
                bb.flip();  
                outChannel.write(bb);  
                bb.clear();  
            }  
            fc.close();  
            //log.info("合併檔案"+file.getPath()+"完成!");  
        } catch (IOException ioe) {
            log.error("合併檔案:"+file.getPath()+"失敗!");
            ioe.printStackTrace();  
        }
    }  
}

7)移動跟刪除檔案程式碼:

public class MoveAndDeleteFile {

    private static int count;//計數器
    private Logger log = Logger.getLogger(MoveAndDeleteFile.class);
      /*
       * 用途:將檔案從一個目錄移動到另外一個目錄
       * rootdir:存放原檔案的根目錄
       * rootBoo:為true時不刪除根目錄,只刪除根目錄下檔案跟資料夾
       */
     public void moveFile(File rootdir,File doneFilePath){
             //開始移動檔案  
         if(rootdir.isDirectory()){
           for(File file:rootdir.listFiles()){
               moveFile(file,doneFilePath);
               }
          }else{
              moveTheFile(rootdir,doneFilePath);
            //每五千個列印一次,用作程式提示
             count++;
            if(count>0&&count%5000==0){
                log.info("已經移動檔案"+count+"個!");
            }
            }
         }
            
    public void moveTheFile(File rootdir,File doneFilePath){
            FileInputStream fi = null;
            FileOutputStream fo = null;
            FileChannel in = null;
            FileChannel out = null;
            try {
                String currentFilePath = rootdir.getPath();     
                //確定好目的地解壓的目錄結構begin
                int index = "E:\\test\\rootfile\\".lastIndexOf(File.separator);
                String dirStructurePath = doneFilePath+currentFilePath.substring(index);
                File dir = new File(dirStructurePath.substring(0, dirStructurePath.lastIndexOf(File.separator)));
                if(!dir.exists()){
                    dir.mkdirs();
                }
                File dirFile = new File(dirStructurePath);
                if(!dirFile.exists()){
                    dir.createNewFile();
                }
                fi = new FileInputStream(rootdir);
                fo = new FileOutputStream(dirStructurePath);
                in = fi.getChannel();//得到對應的檔案通道
                out = fo.getChannel();//得到對應的檔案通道
                in.transferTo(0, in.size(), out);//連線兩個通道,並且從in通道讀取,然後寫入out通道
                //System.out.println("移動檔案:"+rootdir.getPath()+"完成!");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fi.close();
                    in.close();
                    fo.close();
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }    
           
    /*
     * 先刪除檔案再刪除資料夾
     * file:存放檔案的根目錄
     * rootBoo:為true時不刪除根目錄,只刪除根目錄下檔案跟資料夾
     */
        public  void deleteFileAndDir(File  file,boolean rootBoo){
            String rootDir = null;
            if(rootBoo){
                rootDir = file.getPath();
            }
            File []files = file.listFiles();
            for(int i=0;i<files.length;i++){
                if(!files[i].isDirectory()){
                    files[i].delete();
                    //每五千個列印一次,用作程式提示
                     count++;
                    if(count>0&&count%50000==0){
                       log.info("已經刪除檔案"+count+"個!");
                    }
                }else{
                    deleteFileAndDir(files[i],false);
                }
            }
            //不刪除存放檔案的根目錄
            if(rootDir!=file.getPath()){
                file.delete();
            }
        }
}

8)程式碼執行分析結果:

程式碼測試了兩個省共15G的壓縮包,解壓後三十萬個小檔案,從解壓、合併、移動、刪除整個流程耗時共3個小時,合併後的檔案大小共40G,效率上看還可以

9)注意事項

解壓跟合併必須做成多執行緒啟動的,每個省的解壓跟合併都分配一個執行緒,程式碼裡面用了Java執行緒池,在分配執行緒池數量前線計算一共有幾個,按照省數量分配,程式碼裡面在

上面第四點:多執行緒檔案解壓合併工具類裡面有體現。

10)專案的配置檔案我放在程式碼裡面一同放在了附件,程式碼有不當之處歡迎指點,大家共同學習進步。用到的jar包我也在專案裡面直接上傳了,下載下來就能看到。

專案的百度網盤路徑:http://pan.baidu.com/s/1c0vYUJI,複製直接下載即可