1. 程式人生 > >使用javaAPI操作hdfs

使用javaAPI操作hdfs

文件系統 ole 文件 緩沖區 api println 不存在 ogg 就會

歡迎到https://github.com/huabingood/everyDayLanguagePractise查看源碼。

一.構建環境

在hadoop的安裝包中的share目錄中有hadoop所有你能想象到的內容。
進入安裝包下的share文件夾你會看到doc和hadoop文件夾。其中doc中是hadoop的整個document。而hadoop文件夾中則存放著所有開發hadoop所有用到的jar包,其依賴放到相應的lib文件夾中。
我們這次用到的是hadoop文件夾中的common以及hdfs文件夾中的內容。在創建工程時,繼續將這兩個文件夾中的jar包添加到相應的工程中,然後將文件夾中的lib文件夾的所有jar包作為依賴也添加到工程文件中(common和hdfs文件的lib文件中的jar包可能存在重復,註意去重。);其中source文件夾是該模塊的源碼包,如果想在IDEA中看到源碼,需要將這些內容也添加到工程中。

二.操作HDFS流程

  1. 獲取HDFS的配置,根據HDFS的配置獲取整個HDFS操作系統的內容
  2. 打開HDFS操作系統,進行操作
    1. 文件夾的操作:增刪改查
    2. 文件的上傳下載
    3. 文件的IO操作——hdfs之間的復制

三.具體的操作命令

  1. 根據配置獲取HDFS文件操作系統(共有三種方式)
    1. 方法一:直接獲取配置文件方法
      通常情況下該方法用於本地有hadoop系統,可以直接進行訪問。此時僅需在配置文件中指定要操作的文件系統為hdfs即可。這裏的conf的配置文件可以設置hdfs的各種參數,並且優先級比配置文件要搞
    2. 方法二:指定URI路徑,進而獲取配置文件創建操作系統
      通常該方法用於本地沒有hadoop系統,但是可以通過URI的方式進行訪問。此時要給給定hadoop的NN節點的訪問路徑,hadoop的用戶名,以及配置文件信息(此時會自動訪問遠程hadoop的配置文件)
                /**
                 * 根據配置文件獲取HDFS操作對象
                 * 有兩種方法:
                 *  1.使用conf直接從本地獲取配置文件創建HDFS對象
                 *  2.多用於本地沒有hadoop系統,但是可以遠程訪問。使用給定的URI和用戶名,訪問遠程的配置文件,然後創建HDFS對象。
                 * @return FileSystem
                 */
                public FileSystem getHadoopFileSystem() {
            
            
                    FileSystem fs 
= null; Configuration conf = null; // 方法一,本地有配置文件,直接獲取配置文件(core-site.xml,hdfs-site.xml) // 根據配置文件創建HDFS對象 // 此時必須指定hdsf的訪問路徑。 conf = new Configuration(); // 文件系統為必須設置的內容。其他配置參數可以自行設置,且優先級最高 conf.set("fs.defaultFS", "hdfs://huabingood01:9000"); try { // 根據配置文件創建HDFS對象 fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); logger.error("",e); } // 方法二:本地沒有hadoop系統,但是可以遠程訪問。根據給定的URI和用戶名,訪問hdfs的配置參數 // 此時的conf不需任何設置,只需讀取遠程的配置文件即可。 /*conf = new Configuration(); // Hadoop的用戶名 String hdfsUserName = "huabingood"; URI hdfsUri = null; try { // HDFS的訪問路徑 hdfsUri = new URI("hdfs://huabingood01:9000"); } catch (URISyntaxException e) { e.printStackTrace(); logger.error(e); } try { // 根據遠程的NN節點,獲取配置信息,創建HDFS對象 fs = FileSystem.get(hdfsUri,conf,hdfsUserName); } catch (IOException e) { e.printStackTrace(); logger.error(e); } catch (InterruptedException e) { e.printStackTrace(); logger.error(e); }*/ // 方法三,反正我們沒有搞懂。 /*conf = new Configuration(); conf.addResource("/opt/huabingood/pseudoDistributeHadoop/hadoop-2.6.0-cdh5.10.0/etc/hadoop/core-site.xml"); conf.addResource("/opt/huabingood/pseudoDistributeHadoop/hadoop-2.6.0-cdh5.10.0/etc/hadoop/hdfs-site.xml"); try { fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); logger.error(e); }*/ return fs; }

  2.添加文件夾

 /**
     * 這裏的創建文件夾同shell中的mkdir -p 語序前面的文件夾不存在
     * 跟java中的IO操作一樣,也只能對path對象做操作;但是這裏的Path對象是hdfs中的
     * @param fs
     * @return
     */
    public boolean myCreatePath(FileSystem fs){
        boolean b = false;

        Path path = new Path("/hyw/test/huabingood/hyw");
        try {
            // even the path exist,it can also create the path.
            b = fs.mkdirs(path);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e);
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e);
            }
        }
        return b;
    } 

  3.刪除文件夾

 /**
     * 刪除文件,實際上刪除的是給定path路徑的最後一個
     * 跟java中一樣,也需要path對象,不過是hadoop.fs包中的。
     * 實際上delete(Path p)已經過時了,更多使用delete(Path p,boolean recursive)
     * 後面的布爾值實際上是對文件的刪除,相當於rm -r
     * @param fs
     * @return
     */
    public boolean myDropHdfsPath(FileSystem fs){
        boolean b = false;
        // drop the last path
        Path path = new Path("/huabingood/hadoop.tar.gz");
        try {
            b = fs.delete(path,true);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e);
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e);
            }
        }

        return b;
    } 

  4.重命名

 /**
     * 重命名文件夾
     * @param hdfs
     * @return
     */
    public boolean myRename(FileSystem hdfs){
        boolean b = false;
        Path oldPath = new Path("/hyw/test/huabingood");
        Path newPath = new Path("/hyw/test/huabing");

        try {
            b = hdfs.rename(oldPath,newPath);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e);
        }finally {
            try {
                hdfs.close();
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e);
            }
        }

        return b;
    } 

  5.文件夾的遞歸遍歷

         /**
             * 遍歷文件夾
             * public FileStatus[] listStatus(Path p)
             * 通常使用HDFS文件系統的listStatus(path)來獲取改定路徑的子路徑。然後逐個判斷
             * 值得註意的是:
             *  1.並不是總有文件夾中有文件,有些文件夾是空的,如果僅僅做是否為文件的判斷會有問題,必須加文件的長度是否為0的判斷
             *  2.使用getPath()方法獲取的是FileStatus對象是帶URL路徑的。使用FileStatus.getPath().toUri().getPath()獲取的路徑才是不帶url的路徑
             * @param hdfs
             * @param listPath 傳入的HDFS開始遍歷的路徑
             * @return
             */
            public Set<String> recursiveHdfsPath(FileSystem hdfs,Path listPath){
        
                /*FileStatus[] files = null;
                try {
                    files = hdfs.listStatus(listPath);
                    Path[] paths = FileUtil.stat2Paths(files);
                    for(int i=0;i<files.length;i++){
                        if(files[i].isFile()){
                            // set.add(paths[i].toString());
                            set.add(paths[i].getName());
                        }else {
                            recursiveHdfsPath(hdfs,paths[i]);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error(e);
                }*/
        
                FileStatus[] files = null;
        
                try {
                    files = hdfs.listStatus(listPath);
                    // 實際上並不是每個文件夾都會有文件的。
                    if(files.length == 0){
                        // 如果不使用toUri(),獲取的路徑帶URL。
                        set.add(listPath.toUri().getPath());
                    }else {
                        // 判斷是否為文件
                        for (FileStatus f : files) {
                            if (files.length == 0 || f.isFile()) {
                                set.add(f.getPath().toUri().getPath());
                            } else {
                                // 是文件夾,且非空,就繼續遍歷
                                recursiveHdfsPath(hdfs, f.getPath());
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error(e);
                }
                return set;
            } 

  6.文件的判斷

 /**
     * 文件簡單的判斷
     * 是否存在
     * 是否是文件夾
     * 是否是文件
     * @param fs
     */
    public void myCheck(FileSystem fs){
        boolean isExists = false;
        boolean isDirectorys = false;
        boolean isFiles = false;

        Path path = new Path("/hyw/test/huabingood01");

        try {
            isExists = fs.exists(path);
            isDirectorys = fs.isDirectory(path);
            isFiles = fs.isFile(path);
        } catch (IOException e){
            e.printStackTrace();
            logger.error(e);
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
                logger.error(e);
            }
        }

        if(!isExists){
            System.out.println("lu jing not cun zai.");
        }else{
            System.out.println("lu jing cun zai.");
            if(isDirectorys){
                System.out.println("Directory");
            }else if(isFiles){
                System.out.println("Files");
            }
        } 

  7.文件配置信息的查詢

 /**
     * 獲取配置的所有信息
     * 首先,我們要知道配置文件是哪一個
     * 然後我們將獲取的配置文件用叠代器接收
     * 實際上配置中是KV對,我們可以通過java中的Entry來接收
     *
     */
    public void showAllConf(){
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://huabingood01:9000");
        Iterator<Map.Entry<String,String>> it = conf.iterator();
        while(it.hasNext()){
            Map.Entry<String,String> entry = it.next();
            System.out.println(entry.getKey()+"=" +entry.getValue());
        }
    } 

  8.文件的上傳

    /**
     * 文件下載
     * 註意下載的路徑的最後一個地址是下載的文件名
     * copyToLocalFile(Path local,Path hdfs)
     * 下載命令中的參數是沒有任何布爾值的,如果添加了布爾是,意味著這是moveToLocalFile()
     * @param fs
     */
    public void getFileFromHDFS(FileSystem fs){
        Path HDFSPath = new Path("/hyw/test/hadoop-2.6.0-cdh5.10.0.tar.gz");
        Path localPath = new Path("/home/huabingood");

        try {
            fs.copyToLocalFile(HDFSPath,localPath);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error("zhe li you cuo wu !" ,e);
        }finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
                logger.error("zhe li you cuo wu !", e);
            }
        }
    } 

  9.文件的下載

         /**
             * 文件的下載
             * 註意事項同文件的上傳
             * 註意如果上傳的路徑不存在會自動創建
             * 如果存在同名的文件,會覆蓋
             * @param fs
             */
            public void myPutFile2HDFS(FileSystem fs){
        
                boolean pathExists = false;
                // 如果上傳的路徑不存在會創建
                // 如果該路徑文件已存在,就會覆蓋
                Path localPath = new Path("/home/huabingood/繡春刀.rmbv");
                Path hdfsPath = new Path("/hyw/test/huabingood/abc/efg/繡春刀.rmbv");
        
                try {
                    fs.copyFromLocalFile(localPath,hdfsPath);
                } catch (IOException e) {
                    e.printStackTrace();
                }finally {
                    try {
                        fs.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
        
            } 

  10.文件的IO操作——HDFS之間文件的復制

            /**
             * hdfs之間文件的復制
             * 使用FSDataInputStream來打開文件open(Path p)
             * 使用FSDataOutputStream開創建寫到的路徑create(Path p)
             * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)來進行具體的讀寫
             * 說明:
             *  1.java中使用緩沖區來加速讀取文件,這裏也使用了緩沖區,但是只要指定緩沖區大小即可,不必單獨設置一個新的數組來接受
             *  2.最後一個布爾值表示是否使用完後關閉讀寫流。通常是false,如果不手動關會報錯的
             * @param hdfs
             */
            public void copyFileBetweenHDFS(FileSystem hdfs){
                Path inPath = new Path("/hyw/test/hadoop-2.6.0-cdh5.10.0.tar.gz");
                Path outPath = new Path("/huabin/hadoop.tar.gz");
        
                // byte[] ioBuffer = new byte[1024*1024*64];
                // int len = 0;
        
                FSDataInputStream hdfsIn = null;
                FSDataOutputStream hdfsOut = null;
        
                try {
                    hdfsIn = hdfs.open(inPath);
                    hdfsOut = hdfs.create(outPath);
        
                    IOUtils.copyBytes(hdfsIn,hdfsOut,1024*1024*64,false);
        
                    /*while((len=hdfsIn.read(ioBuffer))!= -1){
                        IOUtils.copyBytes(hdfsIn,hdfsOut,len,true);
                    }*/
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error(e);
                }finally {
                    try {
                    hdfsOut.close();
                    hdfsIn.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
        
                }
        
            } 

使用javaAPI操作hdfs