乾貨-java HDFS系統上解壓多檔案zip壓縮包
阿新 • • 發佈:2018-11-08
package com.zx.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.*; import java.util.ArrayList; import java.util.List; import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; import org.apache.commons.compress.archivers.zip.*; public class HdfsUnZip { public static void loadZipFileData(String hdfsFilePath) throws IOException { Configuration conf = new Configuration(); String basePath = "hdfs://master:9000/"; Path hdfs = new Path(basePath); //設定hdfs指定使用者名稱root,否則沒有許可權訪問hdfs System.setProperty("HADOOP_USER_NAME", "root"); FileSystem fs = hdfs.getFileSystem(conf); FSDataInputStream fsInputStream = null; ZipArchiveInputStream zipInputStream = null; ZipArchiveEntry zipEntry = null; FSDataOutputStream mergerout = null; System.out.println(hdfsFilePath); try { //讀取hdfs上檔案到輸入流 fsInputStream = fs.open(new Path(hdfsFilePath)); //將輸入流轉成位元組陣列 byte[] b = toByteArray(fsInputStream); //去除位元組陣列中xx無用資訊(實際為壓縮包無用的頭資訊) b = offXXData(b); //將位元組陣列讀入流,並建立zip處理流物件 zipInputStream = new ZipArchiveInputStream( new ByteArrayInputStream(b)); //判斷是否能獲取zipEntity物件 while ((zipEntry = zipInputStream.getNextZipEntry()) != null) { //獲取當前解壓的檔名 String entryName = zipEntry.getName(); System.out.println("fileName:"+entryName); System.out.println(basePath+entryName); //判斷解壓檔案是否非資料夾 if (!zipEntry.isDirectory()) { System.out.println("is file"); //在hdfs上建立指定檔案 mergerout = fs.create(new Path(basePath+entryName)); int bygeSize1 = 10 * 1024 * 1024; byte[] buffer1 = new byte[bygeSize1]; int nNumber; //並將解壓的內容寫入hdfs檔案 while ((nNumber = zipInputStream.read(buffer1, 0, bygeSize1)) != -1) { mergerout.write(buffer1, 0, nNumber); } mergerout.flush(); }else{ System.out.println("is Direcotry"); } } } catch (Exception e) { e.printStackTrace(); }finally { if(mergerout!=null){ mergerout.close(); } if(zipInputStream!=null){ zipInputStream.close(); } if(fsInputStream!=null){ fsInputStream.close(); } } } public static void zipHdfsFile(String hdfsFilePath) throws IOException { Configuration conf = new Configuration(); String basePath = "hdfs://master:9000/"; Path hdfs = new Path(basePath); //設定hdfs指定使用者名稱root,否則沒有許可權訪問hdfs System.setProperty("HADOOP_USER_NAME", "root"); FileSystem fs = hdfs.getFileSystem(conf); FSDataInputStream fsInputStream = null; ZipArchiveInputStream zipInputStream = null; ZipArchiveEntry zipEntry = null; FSDataOutputStream mergerout = null; System.out.println(hdfsFilePath); try { //讀取hdfs上檔案到輸入流 fsInputStream = fs.open(new Path(hdfsFilePath)); zipInputStream = new ZipArchiveInputStream(fsInputStream); //判斷是否能獲取zipEntity物件 while ((zipEntry = zipInputStream.getNextZipEntry()) != null) { //獲取當前解壓的檔名 String entryName = zipEntry.getName(); System.out.println("fileName:"+entryName); System.out.println(basePath+"process/"+entryName); //判斷解壓檔案是否非資料夾 if (!zipEntry.isDirectory()) { System.out.println("is file"); //在hdfs上建立指定檔案 mergerout = fs.create(new Path(basePath+"process/"+entryName)); int bygeSize1 = 10 * 1024 * 1024; byte[] buffer1 = new byte[bygeSize1]; int nNumber; //並將解壓的內容寫入hdfs檔案 while ((nNumber = zipInputStream.read(buffer1, 0, bygeSize1)) != -1) { mergerout.write(buffer1, 0, nNumber); } mergerout.flush(); }else{ System.out.println("is Direcotry"); } } } catch (Exception e) { e.printStackTrace(); }finally { if(mergerout!=null){ mergerout.close(); } if(zipInputStream!=null){ zipInputStream.close(); } if(fsInputStream!=null){ fsInputStream.close(); } } } public static void main(String[]args) throws IOException { zipHdfsFile("hdfs://master:9000/input.zip"); loadZipFileData("hdfs://master:9000/input/2_74_1517985979_002_00_805690.zip"); } /** * 獲得去除XX後的資料 * * @param data * @return * @throws IOException */ public static byte[] offXXData(byte[] data) throws IOException { ByteArrayOutputStream out = null; CustomLineInputStream lis = null; try { out = new ByteArrayOutputStream(); lis = new CustomLineInputStream(new ByteArrayInputStream(data)); String line = null; List<Byte> bytes = new ArrayList<Byte>(); boolean isFirstLine = true; while ((line = lis.readLineBytes(bytes)) != null) { if (isFirstLine && (!line.startsWith("XX-"))) { return data; } else { isFirstLine = false; } if (!line.startsWith("XX-")) { break; } bytes.clear(); } byte[] bs = new byte[1024]; int i = 0, flag = 0; for (int j = 0; j < bytes.size(); j++) { byte b = bytes.get(j); if ((b == '\n' || b == '\r') && flag == 0) { continue; } else { flag++; } out.write(bytes.get(j)); } while ((i = lis.read(bs)) != -1) { out.write(bs, 0, i); } out.flush(); data = out.toByteArray(); } catch (IOException e) { e.printStackTrace(); throw e; } finally { if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } if (lis != null) { try { lis.close(); } catch (IOException e) { e.printStackTrace(); } } } return data; } //將檔案讀入流,然後換成byte陣列 public static byte[] File2byte(File file) { byte[] buffer = null; try { FileInputStream fis = new FileInputStream(file); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] b = new byte[1024]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); } fis.close(); bos.close(); buffer = bos.toByteArray(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return buffer; } //輸入流轉成byte陣列 public static byte[] toByteArray(InputStream input) throws IOException { ByteArrayOutputStream output = new ByteArrayOutputStream(); byte[] buffer = new byte[4096]; int n = 0; while (-1 != (n = input.read(buffer))) { output.write(buffer, 0, n); } return output.toByteArray(); } }