1. 程式人生 > >讀取hdfs目錄下的gz文件內容

讀取hdfs目錄下的gz文件內容

txt文件 edi stat java 創建 reader === ati lean

讀取hdfs指定目錄下的gz文件,並讀取gz文件裏面的文本信息

package com.bw.credit.hdfs;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import com.google.common.base.Strings; public class HdfsReader { private static Configuration conf = new Configuration(); private
static String hdfspath = "hdfs://cluster-01:8020"; public static void main(String[] args) throws IOException { HdfsReader ope = new HdfsReader(); String path = "hdfs://cluster-01:8020/ads/cdrlogs_s/dt=20190320"; List<String> arr = ope.getFileList(path); int lineNum =0; int num19 = 0; int num00 = 0; for (String filepath : arr) { System.out.println(filepath); InputStream inputStream = null; GZIPInputStream gzInputStream = null; InputStreamReader reader = null; BufferedReader buff = null; try { FileSystem system = FileSystem.get(conf); inputStream = system.open(new Path(hdfspath+filepath)); gzInputStream = new GZIPInputStream(inputStream); reader = new InputStreamReader(gzInputStream); buff = new BufferedReader(reader); String line = null; while((line = buff.readLine())!=null){ if(line.contains("2019-03-19")) { num19++; } if(line.contains(",,0")) { num00++; } lineNum++; } } catch (IOException e) { e.printStackTrace(); } } System.out.println(lineNum+","+num19+","+num00); System.out.println(lineNum-num19-num00); } /** * 獲取hdfs路徑下的文件列表 * * @param srcpath * @return */ public List<String> getFileList(String srcpath) { List<String> files = new ArrayList<String>(); try { Path path = new Path(srcpath); FileSystem fs = path.getFileSystem(conf); if (fs.isDirectory(path)) { for (FileStatus status : fs.listStatus(path)) { List<String> arr = getFileList(status.getPath().toUri().getPath()); files.addAll(arr); } } else { files.add(path.toUri().getPath()); } return files; } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 給定文件名和文件內容,創建hdfs文件 * * @param dst * @param contents * @throws IOException */ public void createFile(String dst, byte[] contents) throws IOException { Configuration conf = new Configuration(); Path dstPath = new Path(dst); FileSystem fs = dstPath.getFileSystem(conf); FSDataOutputStream outputStream = fs.create(dstPath); outputStream.write(contents); outputStream.close(); System.out.println("create file " + dst + " success!"); // fs.close(); } /** * 刪除hdfs文件 * * @param filePath * @throws IOException */ public void delete(String filePath) throws IOException { Configuration conf = new Configuration(); Path path = new Path(filePath); FileSystem fs = path.getFileSystem(conf); boolean isok = fs.deleteOnExit(path); if (isok) { System.out.println("delete file " + filePath + " success!"); } else { System.out.println("delete file " + filePath + " failure"); } // fs.close(); } /** * 創建hdfs目錄 * * @param path * @throws IOException */ public void mkdir(String path) throws IOException { Configuration conf = new Configuration(); Path srcPath = new Path(path); FileSystem fs = srcPath.getFileSystem(conf); boolean isok = fs.mkdirs(srcPath); if (isok) { System.out.println("create dir ok!"); } else { System.out.println("create dir failure"); } // fs.close(); } /** * 讀取hdfs文件內容,並在控制臺打印出來 * * @param filePath * @throws IOException */ public void readFile(String filePath) throws IOException { Configuration conf = new Configuration(); Path srcPath = new Path(filePath); FileSystem fs = null; URI uri; try { uri = new URI(filePath); fs = FileSystem.get(uri, conf); } catch (URISyntaxException e) { e.printStackTrace(); } InputStream in = null; try { in = fs.open(srcPath); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } /** * 下載hdfs文件到本地目錄 * * @param dstPath * @param srcPath * @throws Exception */ public void downloadFile(String dstPath, String srcPath) throws Exception { Path path = new Path(srcPath); Configuration conf = new Configuration(); FileSystem hdfs = path.getFileSystem(conf); File rootfile = new File(dstPath); if (!rootfile.exists()) { rootfile.mkdirs(); } if (hdfs.isFile(path)) { // 只下載非txt文件 String fileName = path.getName(); if (!fileName.toLowerCase().endsWith("txt")) { FSDataInputStream in = null; FileOutputStream out = null; try { in = hdfs.open(path); File srcfile = new File(rootfile, path.getName()); if (!srcfile.exists()) srcfile.createNewFile(); out = new FileOutputStream(srcfile); IOUtils.copyBytes(in, out, 4096, false); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } // 下載完後,在hdfs上將原文件刪除 this.delete(path.toString()); } } else if (hdfs.isDirectory(path)) { File dstDir = new File(dstPath); if (!dstDir.exists()) { dstDir.mkdirs(); } // 在本地目錄上加一層子目錄 String filePath = path.toString();// 目錄 String subPath[] = filePath.split("/"); String newdstPath = dstPath + subPath[subPath.length - 1] + "/"; System.out.println("newdstPath=======" + newdstPath); if (hdfs.exists(path) && hdfs.isDirectory(path)) { FileStatus[] srcFileStatus = hdfs.listStatus(path); if (srcFileStatus != null) { for (FileStatus status : hdfs.listStatus(path)) { // 下載子目錄下文件 downloadFile(newdstPath, status.getPath().toString()); } } } } } // /** // * 下載hdfs文件內容,保存到內存對象中 // * // * @param srcPath // * @throws Exception // */ // public void downloadFileByte(String srcPath) throws Exception { // Path path = new Path(srcPath); // FileSystem hdfs = null; // Configuration conf = new Configuration(); // hdfs = FileSystem.get(URI.create(srcPath), conf); // if (hdfs.exists(path)) { // if (hdfs.isFile(path)) { // //如果是文件 // FSDataInputStream in = null; // FileOutputStream out = null; // try { // in = hdfs.open(new Path(srcPath)); // byte[] t = new byte[in.available()]; // in.read(t); // hdfsfiles.add(new HdfsFile(path.getName(), srcPath, t)); // } finally { // IOUtils.closeStream(in); // IOUtils.closeStream(out); // } // } else { // //如果是目錄 // FileStatus[] srcFileStatus = hdfs.listStatus(new Path(srcPath)); // for (int i = 0; i < srcFileStatus.length; i++) { // String srcFile = srcFileStatus[i].getPath().toString(); // downloadFileByte(srcFile); // } // } // } // } // // public ArrayList<HdfsFile> getHdfsfiles() { // return hdfsfiles; // } /** * 將本地目錄或文件上傳的hdfs * * @param localSrc * @param dst * @throws Exception */ public void uploadFile(String localSrc, String dst) throws Exception { Configuration conf = new Configuration(); File srcFile = new File(localSrc); if (srcFile.isDirectory()) { copyDirectory(localSrc, dst, conf); } else { copyFile(localSrc, dst, conf); } } /** * 拷貝本地文件hdfs目錄下 * * @param localSrc * @param dst * @param conf * @return * @throws Exception */ private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception { File file = new File(localSrc); dst = dst + file.getName(); Path path = new Path(dst); FileSystem fs = path.getFileSystem(conf);// FileSystem.get(conf); fs.exists(path); InputStream in = new BufferedInputStream(new FileInputStream(file)); OutputStream out = fs.create(new Path(dst)); IOUtils.copyBytes(in, out, 4096, true); in.close(); return true; } /** * 拷貝本地目錄到hdfs * * @param src * @param dst * @param conf * @return * @throws Exception */ private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception { Path path = new Path(dst); FileSystem fs = path.getFileSystem(conf); if (!fs.exists(path)) { fs.mkdirs(path); } File file = new File(src); File[] files = file.listFiles(); for (int i = 0; i < files.length; i++) { File f = files[i]; if (f.isDirectory()) { String fname = f.getName(); if (dst.endsWith("/")) { copyDirectory(f.getPath(), dst + fname + "/", conf); } else { copyDirectory(f.getPath(), dst + "/" + fname + "/", conf); } } else { copyFile(f.getPath(), dst, conf); } } return true; } }

讀取hdfs目錄下的gz文件內容