呼叫JAVA API對HDFS檔案進行檔案的讀寫、上傳下載、刪除等操作程式碼詳解
阿新 • • 發佈:2019-01-29
Hadoop檔案系統
基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方法可以過去FileSystem例項:
public static FileSystem.get(Configuration conf) throws IOException
public static FileSystem.get(URI uri, Configuration conf) throws IOException
具體方法實現
1、public boolean mkdirs(Path f) throws IOException一次性新建所有目錄(包括父目錄), f是完整的目錄路徑。
2、public FSOutputStream create(Path f) throws IOException
建立指定path物件的一個檔案,返回一個用於寫入資料的輸出流
create()有多個過載版本,允許我們指定是否強制覆蓋已有的檔案、檔案備份數量、寫入檔案緩衝區大小、檔案塊大小以及檔案許可權。
3、public boolean copyFromLocal(Path src, Path dst) throws IOException
將本地檔案拷貝到檔案系統
4、public boolean exists(Path f) throws IOException
檢查檔案或目錄是否存在
5、public boolean delete(Path f, Boolean recursive)
永久性刪除指定的檔案或目錄,如果f是一個空目錄或者檔案,那麼recursive的值就會被忽略。只有recursive=true時,一個非空目錄及其內容才會被刪除。
6、FileStatus類封裝了檔案系統中檔案和目錄的元資料,包括檔案長度、塊大小、備份、修改時間、所有者以及許可權資訊。
程式碼實現詳解
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; public class HdfsOperate { ArrayList<HdfsFile> hdfsfiles; public HdfsOperate() { this.hdfsfiles = new ArrayList<HdfsFile>(); } /** * 獲取hdfs路徑下的檔案列表 * * @param srcpath * @return */ public String[] getFileList(String srcpath) { try { Configuration conf = new Configuration(); Path path = new Path(srcpath); FileSystem fs = path.getFileSystem(conf); List<String> files = new ArrayList<String>(); if (fs.exists(path) && fs.isDirectory(path)) { for (FileStatus status : fs.listStatus(path)) { files.add(status.getPath().toString()); } } //fs.close(); return files.toArray(new String[]{}); } catch (IOException e) { } catch (Exception e) { } return null; } /** * 給定檔名和檔案內容,建立hdfs檔案 * * @param dst * @param contents * @throws IOException */ public void createFile(String dst, byte[] contents) throws IOException { Configuration conf = new Configuration(); Path dstPath = new Path(dst); FileSystem fs = dstPath.getFileSystem(conf); FSDataOutputStream outputStream = fs.create(dstPath); outputStream.write(contents); outputStream.close(); System.out.println("create file " + dst + " success!"); //fs.close(); } /** * 刪除hdfs檔案 * * @param filePath * @throws IOException */ public void delete(String filePath) throws IOException { Configuration conf = new Configuration(); Path path = new Path(filePath); FileSystem fs = path.getFileSystem(conf); boolean isok = fs.deleteOnExit(path); if (isok) { System.out.println("delete file " + filePath + " success!"); } else { System.out.println("delete file " + filePath + " failure"); } //fs.close(); } /** * 建立hdfs目錄 * * @param path * @throws IOException */ public void mkdir(String path) throws IOException { Configuration conf = new Configuration(); Path srcPath = new Path(path); FileSystem fs = srcPath.getFileSystem(conf); boolean isok = fs.mkdirs(srcPath); if (isok) { System.out.println("create dir ok!"); } else { System.out.println("create dir failure"); } //fs.close(); } /** * 讀取hdfs檔案內容,並在控制檯打印出來 * * @param filePath * @throws IOException */ public void readFile(String filePath) throws IOException { Configuration conf = new Configuration(); Path srcPath = new Path(filePath); FileSystem fs = null; URI uri; try { uri = new URI(filePath); fs = FileSystem.get(uri, conf); } catch (URISyntaxException e) { e.printStackTrace(); } InputStream in = null; try { in = fs.open(srcPath); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } /** * 下載hdfs檔案到本地目錄 * * @param dstPath * @param srcPath * @throws Exception */ public void downloadFile(String dstPath, String srcPath) throws Exception { Path path = new Path(srcPath); Configuration conf = new Configuration(); FileSystem hdfs = path.getFileSystem(conf); File rootfile = new File(dstPath); if (!rootfile.exists()) { rootfile.mkdirs(); } if (hdfs.isFile(path)) { //只下載非txt檔案 String fileName = path.getName(); if (!fileName.toLowerCase().endsWith("txt")) { FSDataInputStream in = null; FileOutputStream out = null; try { in = hdfs.open(path); File srcfile = new File(rootfile, path.getName()); if (!srcfile.exists()) srcfile.createNewFile(); out = new FileOutputStream(srcfile); IOUtils.copyBytes(in, out, 4096, false); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } //下載完後,在hdfs上將原檔案刪除 this.delete(path.toString()); } } else if (hdfs.isDirectory(path)) { File dstDir = new File(dstPath); if (!dstDir.exists()) { dstDir.mkdirs(); } //在本地目錄上加一層子目錄 String filePath = path.toString();//目錄 String subPath[] = filePath.split("/"); String newdstPath = dstPath + subPath[subPath.length - 1] + "/"; System.out.println("newdstPath=======" + newdstPath); if (hdfs.exists(path) && hdfs.isDirectory(path)) { FileStatus[] srcFileStatus = hdfs.listStatus(path); if (srcFileStatus != null) { for (FileStatus status : hdfs.listStatus(path)) { //下載子目錄下檔案 downloadFile(newdstPath, status.getPath().toString()); } } } } } /** * 下載hdfs檔案內容,儲存到記憶體物件中 * * @param srcPath * @throws Exception */ public void downloadFileByte(String srcPath) throws Exception { Path path = new Path(srcPath); FileSystem hdfs = null; Configuration conf = new Configuration(); hdfs = FileSystem.get(URI.create(srcPath), conf); if (hdfs.exists(path)) { if (hdfs.isFile(path)) { //如果是檔案 FSDataInputStream in = null; FileOutputStream out = null; try { in = hdfs.open(new Path(srcPath)); byte[] t = new byte[in.available()]; in.read(t); hdfsfiles.add(new HdfsFile(path.getName(), srcPath, t)); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } else { //如果是目錄 FileStatus[] srcFileStatus = hdfs.listStatus(new Path(srcPath)); for (int i = 0; i < srcFileStatus.length; i++) { String srcFile = srcFileStatus[i].getPath().toString(); downloadFileByte(srcFile); } } } } public ArrayList<HdfsFile> getHdfsfiles() { return hdfsfiles; } /** * 將本地目錄或檔案上傳的hdfs * * @param localSrc * @param dst * @throws Exception */ public void uploadFile(String localSrc, String dst) throws Exception { Configuration conf = new Configuration(); File srcFile = new File(localSrc); if (srcFile.isDirectory()) { copyDirectory(localSrc, dst, conf); } else { copyFile(localSrc, dst, conf); } } /** * 拷貝本地檔案hdfs目錄下 * * @param localSrc * @param dst * @param conf * @return * @throws Exception */ private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception { File file = new File(localSrc); dst = dst + file.getName(); Path path = new Path(dst); FileSystem fs = path.getFileSystem(conf);//FileSystem.get(conf); fs.exists(path); InputStream in = new BufferedInputStream(new FileInputStream(file)); OutputStream out = fs.create(new Path(dst)); IOUtils.copyBytes(in, out, 4096, true); in.close(); return true; } /** * 拷貝本地目錄到hdfs * @param src * @param dst * @param conf * @return * @throws Exception */ private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception { Path path = new Path(dst); FileSystem fs = path.getFileSystem(conf); if (!fs.exists(path)) { fs.mkdirs(path); } File file = new File(src); File[] files = file.listFiles(); for (int i = 0; i < files.length; i++) { File f = files[i]; if (f.isDirectory()) { String fname = f.getName(); if (dst.endsWith("/")) { copyDirectory(f.getPath(), dst + fname + "/", conf); } else { copyDirectory(f.getPath(), dst + "/" + fname + "/", conf); } } else { copyFile(f.getPath(), dst, conf); } } return true; } }