1. 程式人生 > >呼叫JAVA API對HDFS檔案進行檔案的讀寫、上傳下載、刪除等操作程式碼詳解

呼叫JAVA API對HDFS檔案進行檔案的讀寫、上傳下載、刪除等操作程式碼詳解

Hadoop檔案系統 

基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。 

Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方法可以過去FileSystem例項: 
public static FileSystem.get(Configuration conf) throws IOException 
public static FileSystem.get(URI uri, Configuration conf) throws IOException


具體方法實現

1、public boolean mkdirs(Path f) throws IOException
一次性新建所有目錄(包括父目錄), f是完整的目錄路徑。 

2、public FSOutputStream create(Path f) throws IOException
建立指定path物件的一個檔案,返回一個用於寫入資料的輸出流 
create()有多個過載版本,允許我們指定是否強制覆蓋已有的檔案、檔案備份數量、寫入檔案緩衝區大小、檔案塊大小以及檔案許可權。 

3、public boolean copyFromLocal(Path src, Path dst) throws IOException

將本地檔案拷貝到檔案系統 

4、public boolean exists(Path f) throws IOException
檢查檔案或目錄是否存在 

5、public boolean delete(Path f, Boolean recursive)
永久性刪除指定的檔案或目錄,如果f是一個空目錄或者檔案,那麼recursive的值就會被忽略。只有recursive=true時,一個非空目錄及其內容才會被刪除。 

6、FileStatus類封裝了檔案系統中檔案和目錄的元資料,包括檔案長度、塊大小、備份、修改時間、所有者以及許可權資訊。

程式碼實現詳解

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

public class HdfsOperate {
    ArrayList<HdfsFile> hdfsfiles;

    public HdfsOperate() {
        this.hdfsfiles = new ArrayList<HdfsFile>();
    }

    /**
     * 獲取hdfs路徑下的檔案列表
     *
     * @param srcpath
     * @return
     */
    public String[] getFileList(String srcpath) {
        try {
            Configuration conf = new Configuration();
            Path path = new Path(srcpath);
            FileSystem fs = path.getFileSystem(conf);
            List<String> files = new ArrayList<String>();
            if (fs.exists(path) && fs.isDirectory(path)) {
                for (FileStatus status : fs.listStatus(path)) {
                    files.add(status.getPath().toString());
                }
            }
            //fs.close();
            return files.toArray(new String[]{});
        } catch (IOException e) {
        } catch (Exception e) {
        }
        return null;
    }

    /**
     * 給定檔名和檔案內容,建立hdfs檔案
     *
     * @param dst
     * @param contents
     * @throws IOException
     */
    public void createFile(String dst, byte[] contents)
            throws IOException {
        Configuration conf = new Configuration();
        Path dstPath = new Path(dst);
        FileSystem fs = dstPath.getFileSystem(conf);

        FSDataOutputStream outputStream = fs.create(dstPath);
        outputStream.write(contents);
        outputStream.close();
        System.out.println("create file " + dst + " success!");
        //fs.close();
    }

    /**
     * 刪除hdfs檔案
     *
     * @param filePath
     * @throws IOException
     */
    public void delete(String filePath) throws IOException {
        Configuration conf = new Configuration();
        Path path = new Path(filePath);
        FileSystem fs = path.getFileSystem(conf);

        boolean isok = fs.deleteOnExit(path);
        if (isok) {
            System.out.println("delete file " + filePath + " success!");
        } else {
            System.out.println("delete file " + filePath + " failure");
        }
        //fs.close();
    }

    /**
     * 建立hdfs目錄
     *
     * @param path
     * @throws IOException
     */
    public void mkdir(String path) throws IOException {
        Configuration conf = new Configuration();
        Path srcPath = new Path(path);
        FileSystem fs = srcPath.getFileSystem(conf);

        boolean isok = fs.mkdirs(srcPath);
        if (isok) {
            System.out.println("create dir ok!");
        } else {
            System.out.println("create dir failure");
        }
        //fs.close();
    }

    /**
     * 讀取hdfs檔案內容,並在控制檯打印出來
     *
     * @param filePath
     * @throws IOException
     */
    public void readFile(String filePath) throws IOException {
        Configuration conf = new Configuration();
        Path srcPath = new Path(filePath);
        FileSystem fs = null;
        URI uri;
        try {
            uri = new URI(filePath);
            fs = FileSystem.get(uri, conf);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        InputStream in = null;
        try {
            in = fs.open(srcPath);
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }

    /**
     * 下載hdfs檔案到本地目錄
     *
     * @param dstPath
     * @param srcPath
     * @throws Exception
     */
    public void downloadFile(String dstPath, String srcPath) throws Exception {
        Path path = new Path(srcPath);
        Configuration conf = new Configuration();
        FileSystem hdfs = path.getFileSystem(conf);

        File rootfile = new File(dstPath);
        if (!rootfile.exists()) {
            rootfile.mkdirs();
        }

        if (hdfs.isFile(path)) {
            //只下載非txt檔案
            String fileName = path.getName();
            if (!fileName.toLowerCase().endsWith("txt")) {
                FSDataInputStream in = null;
                FileOutputStream out = null;
                try {
                    in = hdfs.open(path);
                    File srcfile = new File(rootfile, path.getName());
                    if (!srcfile.exists()) srcfile.createNewFile();
                    out = new FileOutputStream(srcfile);
                    IOUtils.copyBytes(in, out, 4096, false);
                } finally {
                    IOUtils.closeStream(in);
                    IOUtils.closeStream(out);
                }
                //下載完後,在hdfs上將原檔案刪除
                this.delete(path.toString());
            }
        } else if (hdfs.isDirectory(path)) {
            File dstDir = new File(dstPath);
            if (!dstDir.exists()) {
                dstDir.mkdirs();
            }
            //在本地目錄上加一層子目錄
            String filePath = path.toString();//目錄
            String subPath[] = filePath.split("/");
            String newdstPath = dstPath + subPath[subPath.length - 1] + "/";
            System.out.println("newdstPath=======" + newdstPath);
            if (hdfs.exists(path) && hdfs.isDirectory(path)) {
                FileStatus[] srcFileStatus = hdfs.listStatus(path);
                if (srcFileStatus != null) {
                    for (FileStatus status : hdfs.listStatus(path)) {
                        //下載子目錄下檔案
                        downloadFile(newdstPath, status.getPath().toString());
                    }
                }
            }
        }
    }

    /**
     * 下載hdfs檔案內容,儲存到記憶體物件中
     *
     * @param srcPath
     * @throws Exception
     */
    public void downloadFileByte(String srcPath) throws Exception {
        Path path = new Path(srcPath);
        FileSystem hdfs = null;
        Configuration conf = new Configuration();
        hdfs = FileSystem.get(URI.create(srcPath), conf);
        if (hdfs.exists(path)) {
            if (hdfs.isFile(path)) {
                //如果是檔案
                FSDataInputStream in = null;
                FileOutputStream out = null;
                try {
                    in = hdfs.open(new Path(srcPath));
                    byte[] t = new byte[in.available()];
                    in.read(t);
                    hdfsfiles.add(new HdfsFile(path.getName(), srcPath, t));
                } finally {
                    IOUtils.closeStream(in);
                    IOUtils.closeStream(out);
                }
            } else {
                //如果是目錄
                FileStatus[] srcFileStatus = hdfs.listStatus(new Path(srcPath));
                for (int i = 0; i < srcFileStatus.length; i++) {
                    String srcFile = srcFileStatus[i].getPath().toString();
                    downloadFileByte(srcFile);
                }
            }
        }
    }

    public ArrayList<HdfsFile> getHdfsfiles() {
        return hdfsfiles;
    }

    /**
     * 將本地目錄或檔案上傳的hdfs
     *
     * @param localSrc
     * @param dst
     * @throws Exception
     */
    public void uploadFile(String localSrc, String dst) throws Exception {

        Configuration conf = new Configuration();
        File srcFile = new File(localSrc);
        if (srcFile.isDirectory()) {
            copyDirectory(localSrc, dst, conf);
        } else {
            copyFile(localSrc, dst, conf);
        }
    }

    /**
     * 拷貝本地檔案hdfs目錄下
     *
     * @param localSrc
     * @param dst
     * @param conf
     * @return
     * @throws Exception
     */
    private boolean copyFile(String localSrc, String dst, Configuration conf) throws Exception {
        File file = new File(localSrc);
        dst = dst + file.getName();
        Path path = new Path(dst);
        FileSystem fs = path.getFileSystem(conf);//FileSystem.get(conf);
        fs.exists(path);
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        OutputStream out = fs.create(new Path(dst));
        IOUtils.copyBytes(in, out, 4096, true);
        in.close();
        return true;
    }

    /**
     * 拷貝本地目錄到hdfs
     * @param src
     * @param dst
     * @param conf
     * @return
     * @throws Exception
     */
    private boolean copyDirectory(String src, String dst, Configuration conf) throws Exception {
        Path path = new Path(dst);
        FileSystem fs = path.getFileSystem(conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
        }
        File file = new File(src);

        File[] files = file.listFiles();
        for (int i = 0; i < files.length; i++) {
            File f = files[i];
            if (f.isDirectory()) {
                String fname = f.getName();
                if (dst.endsWith("/")) {
                    copyDirectory(f.getPath(), dst + fname + "/", conf);
                } else {
                    copyDirectory(f.getPath(), dst + "/" + fname + "/", conf);
                }
            } else {
                copyFile(f.getPath(), dst, conf);
            }
        }
        return true;
    }
}