1. 程式人生 > >Java實現HadoopHA叢集的hdfs控制

Java實現HadoopHA叢集的hdfs控制

一、HadoopHA的搭建:https://www.cnblogs.com/null-/p/10000309.html

 

 

二、pom檔案依賴:

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
</dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> </dependency> <dependency> <
groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId
>hadoop-mapreduce-client-core</artifactId> <version>2.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>

 

三、控制程式碼:

package com.hdfs.demo;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
 * @author 王傳禮
 */
public class HdfsDemo {

    /**
     * 根據配置獲取HDFS檔案作業系統
     *
     * @return FileSystem
     */
    public static FileSystem getHadoopFileSystem() {
        FileSystem fs = null;
        Configuration conf = null;
        //方法:本地沒有hadoop系統,但可以遠端訪問。根據給定的URI和使用者名稱,訪問hdfs的配置引數
        conf = new Configuration();
        //Hadoop的使用者名稱
        String hdfsUserNmae = "root";
        URI hdfsUri = null;
        try {
            hdfsUri = new URI("hdfs://192.168.182.135:8020");
            // HDFS的訪問路徑
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        try {
            //根據遠端的NN節點,獲取配置資訊,建立HDFS物件
            fs = FileSystem.get(hdfsUri, conf, hdfsUserNmae);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return fs;
    }

    /**
     * 這裡的建立資料夾同shell中的mkdir -p 語序前面的資料夾不存在
     * 跟java中的IO操作一樣,也只能對path物件做操作;但是這裡的Path物件是hdfs中的
     *
     * @param fs,filepath
     * @return
     */
    public static boolean myCreatePath(FileSystem fs,String filepath) {
        boolean b = false;
        Path path = new Path(filepath);
        try {
            // even the path exist,it can also create the path.
            b = fs.mkdirs(path);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return b;
    }

    /**
     * 刪除檔案,實際上刪除的是給定path路徑的最後一個
     * 跟java中一樣,也需要path物件,不過是hadoop.fs包中的。
     * 實際上delete(Path p)已經過時了,更多使用delete(Path p,boolean recursive)
     * 後面的布林值實際上是對檔案的刪除,相當於rm -r
     *
     * @param fs
     * @return
     */
    public static boolean myDropHdfsPath(FileSystem fs, String filepath) {
        boolean b = false;
        // drop the last path
        Path path = new Path(filepath);
        try {
            b = fs.delete(path, true);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return b;
    }


    /**
     * 重新命名資料夾
     *
     * @param hdfs
     * @return
     */
    public static boolean myRename(FileSystem hdfs, String oldname, String newname) {
        boolean b = false;
        Path oldPath = new Path(oldname);
        Path newPath = new Path(newname);

        try {
            b = hdfs.rename(oldPath, newPath);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                hdfs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return b;
    }

    /**
     * 遍歷資料夾
     * public FileStatus[] listStatus(Path p)
     * 通常使用HDFS檔案系統的listStatus(path)來獲取改定路徑的子路徑。然後逐個判斷
     * 值得注意的是:
     * 1.並不是總有資料夾中有檔案,有些資料夾是空的,如果僅僅做是否為檔案的判斷會有問題,必須加檔案的長度是否為0的判斷
     * 2.使用getPath()方法獲取的是FileStatus物件是帶URL路徑的。使用FileStatus.getPath().toUri().getPath()獲取的路徑才是不帶url的路徑
     *
     * @param hdfs
     * @param listPath 傳入的HDFS開始遍歷的路徑
     * @return
     */
    public static Set<String> recursiveHdfsPath(FileSystem hdfs, Path listPath) {

                /*FileStatus[] files = null;
                try {
                    files = hdfs.listStatus(listPath);
                    Path[] paths = FileUtil.stat2Paths(files);
                    for(int i=0;i<files.length;i++){
                        if(files[i].isFile()){
                            // set.add(paths[i].toString());
                            set.add(paths[i].getName());
                        }else {
                            recursiveHdfsPath(hdfs,paths[i]);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error(e);
                }*/

        FileStatus[] files = null;
        Set<String> set = null;
        try {
            files = hdfs.listStatus(listPath);
            // 實際上並不是每個資料夾都會有檔案的。
            if (files.length == 0) {
                // 如果不使用toUri(),獲取的路徑帶URL。
                set.add(listPath.toUri().getPath());
            } else {
                // 判斷是否為檔案
                for (FileStatus f : files) {
                    if (files.length == 0 || f.isFile()) {
                        set.add(f.getPath().toUri().getPath());
                    } else {
                        // 是資料夾,且非空,就繼續遍歷
                        recursiveHdfsPath(hdfs, f.getPath());
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return set;
    }

    /**
     * 檔案簡單的判斷
     * 是否存在
     * 是否是資料夾
     * 是否是檔案
     *
     * @param fs
     */
    public static void myCheck(FileSystem fs, String filepath) {
        boolean isExists = false;
        boolean isDirectorys = false;
        boolean isFiles = false;

        Path path = new Path(filepath);

        try {
            isExists = fs.exists(path);
            isDirectorys = fs.isDirectory(path);
            isFiles = fs.isFile(path);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (!isExists) {
            System.out.println("The path is not exist.");
        } else {
            System.out.println("The path is exist.");
            if (isDirectorys) {
                System.out.println("This is a Directory");
            } else if (isFiles) {
                System.out.println("This is Files");
            }
        }
    }

    /**
     * 獲取配置的所有資訊
     * 首先,我們要知道配置檔案是哪一個
     * 然後我們將獲取的配置檔案用迭代器接收
     * 實際上配置中是KV對,我們可以通過java中的Entry來接收
     */
    public static void showAllConf() {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://node1:8020");
        Iterator<Map.Entry<String, String>> it = conf.iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> entry = it.next();
            System.out.println(entry.getKey() + "=" + entry.getValue());
        }
    }

    /**
     * 檔案下載
     * 注意下載的路徑的最後一個地址是下載的檔名
     * copyToLocalFile(Path local,Path hdfs)
     * 下載命令中的引數是沒有任何布林值的,如果添加了布林是,意味著這是moveToLocalFile()
     *檔案下載有許可權要求 要有寫的許可權
     * @param fs
     */
    public static void getFileFromHDFS(FileSystem fs, String dfsFile, String locPath) {
        Path HDFSPath = new Path(dfsFile);
        Path localPath = new Path(locPath);
        try {
            fs.copyToLocalFile(HDFSPath, localPath);
            System.out.println("File download.");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 檔案的上傳
     * 注意事項同文件的上傳
     * 注意如果上傳的路徑不存在會自動建立
     * 如果存在同名的檔案,會覆蓋
     *
     * @param fs
     */
    public static void myPutFile2HDFS(FileSystem fs, String localFile, String dfsPath) {

        boolean pathExists = false;
        // 如果上傳的路徑不存在會建立
        // 如果該路徑檔案已存在,就會覆蓋
        Path localPath = new Path(localFile);
        Path hdfsPath = new Path(dfsPath);

        try {
            fs.copyFromLocalFile(localPath, hdfsPath);
            System.out.println("File upload.");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * hdfs之間檔案的複製
     * 使用FSDataInputStream來開啟檔案open(Path p)
     * 使用FSDataOutputStream開建立寫到的路徑create(Path p)
     * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)來進行具體的讀寫
     * 說明:
     * 1.java中使用緩衝區來加速讀取檔案,這裡也使用了緩衝區,但是隻要指定緩衝區大小即可,不必單獨設定一個新的陣列來接受
     * 2.最後一個布林值表示是否使用完後關閉讀寫流。通常是false,如果不手動關會報錯的
     *
     * @param hdfs
     */
    public static void copyFileBetweenHDFS(FileSystem hdfs, String in, String out) {
        Path inPath = new Path(in);
        Path outPath = new Path(out);

        // byte[] ioBuffer = new byte[1024*1024*64];
        // int len = 0;

        FSDataInputStream hdfsIn = null;
        FSDataOutputStream hdfsOut = null;

        try {
            hdfsIn = hdfs.open(inPath);
            hdfsOut = hdfs.create(outPath);

            IOUtils.copyBytes(hdfsIn, hdfsOut, 1024 * 1024 * 64, false);

        } catch (IOException e) {
            e.printStackTrace();

       } finally {
            try {
                hdfsOut.close();
                hdfsIn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }

    }
}

四、測試程式碼

package com.hdfs.demo;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.util.Scanner;
import java.util.Set;

import static org.junit.Assert.*;

public class HdfsTest {

    Scanner sc = new Scanner(System.in);
    FileSystem fs = HdfsDemo.getHadoopFileSystem();

    @Test
    public void myCreatePath() {
        //目錄建立測試
        String path = "/usr/test/input";
        System.out.println(HdfsDemo.myCreatePath(fs,path));
    }

    @Test
    public void myDropHdfsPath() {
        // 目錄刪除
        String path = "/usr/test/output";
        System.out.println(HdfsDemo.myDropHdfsPath(fs,path));
    }

    @Test
    public void myRename() {
        //檔案重新命名
        String oldName = "/usr/test/input";
        String newName = "/usr/test/renameInput";
        System.out.println(HdfsDemo.myRename(fs,oldName,newName));
    }

    @Test
    public void recursiveHdfsPath() {
        //遍歷資料夾
        Path path = new Path("/usr/test/");
        Set<String> set = HdfsDemo.recursiveHdfsPath(fs, path);
        for (String str :
                set) {
            System.out.println(str);
        }
    }

    @Test
    public void myCheck() {
        //檔案簡單的判斷 是否存在 是否是資料夾 是否是檔案
        String path = "/usr/test/input/file.txt";
        HdfsDemo.myCheck(fs,path);
    }

    @Test
    public void showAllConf() {
        //獲取配置的所有資訊
        HdfsDemo.showAllConf();
    }

    @Test
    public void getFileFromHDFS() {
        //檔案下載 檔案下載有許可權要求 要有寫的許可權 0644error
        String dfsFile = "/usr/test/input/file.txt";
        String locPath = "e://temp/data/";
        HdfsDemo.getFileFromHDFS(fs,dfsFile,locPath);
    }

    @Test
    public void myPutFile2HDFS() {
        //檔案的上傳
        String localFile = "e://temp/file.txt";
        String dfsPath = "/usr/test/input";
        HdfsDemo.myPutFile2HDFS(fs,localFile,dfsPath);
    }

    @Test
    public void copyFileBetweenHDFS() {
        //hdfs之間檔案的複製
        String in = "/usr/test/output";
        String out = "/usr/test/input";
        HdfsDemo.copyFileBetweenHDFS(fs,in,out);
    }
}