1. 程式人生 > >HDFS java api介面測試demo

HDFS java api介面測試demo

1. 建立mapreduce工程,設定hadoop home

2. 建立HDFSUtil  類

package Bruce.Hadoop.HDFSManger;

import java.util.Iterator;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;


public class HDFSUtil {  
    public synchronized static FileSystem getFileSystem(String ip, int port) {  
        FileSystem fs = null;  
        String url = "hdfs://" + ip + ":" + String.valueOf(port);  
        Configuration config = new Configuration();  
        config.set("fs.default.name", url);  
        try {  
            fs = FileSystem.get(config);  
        } catch (Exception e) {  
            e.printStackTrace(); 
        }  
        return fs;  
    }  
    public synchronized static void listNode(FileSystem fs) {  
        DistributedFileSystem dfs = (DistributedFileSystem) fs;  
        try {  
            DatanodeInfo[] infos = dfs.getDataNodeStats();  
            for (DatanodeInfo node : infos) {  
                System.out.println("HostName: " + node.getHostName() + "/n"  
                        + node.getDatanodeReport());  
                System.out.println("--------------------------------");  
            }  
        } catch (Exception e) {  
        	 e.printStackTrace(); 
        }  
    }  
    /** 
     * 列印系統配置 
     *  
     * @param fs 
     */  
    public synchronized static void listConfig(FileSystem fs) {  
        Iterator<Entry<String, String>> entrys = fs.getConf().iterator();  
        while (entrys.hasNext()) {  
            Entry<String, String> item = entrys.next();  
            System.out.println(item.getKey() + ": " + item.getValue());  
        }  
    }  
    /** 
     * 建立目錄和父目錄 
     *  
     * @param fs 
     * @param dirName 
     */  
    public synchronized static void mkdirs(FileSystem fs, String dirName) {  
        // Path home = fs.getHomeDirectory();  
        Path workDir = fs.getWorkingDirectory();  
        String dir = workDir + "/" + dirName;  
        Path src = new Path(dir);  
        // FsPermission p = FsPermission.getDefault();  
        boolean succ;  
        try {  
            succ = fs.mkdirs(src);  
            if (succ) {  
            	System.out.println("create directory " + dir + " successed. ");  
            } else {  
            	System.out.println("create directory " + dir + " failed. ");  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    /** 
     * 刪除目錄和子目錄 
     *  
     * @param fs 
     * @param dirName 
     */  
    public synchronized static void rmdirs(FileSystem fs, String dirName) {  
        // Path home = fs.getHomeDirectory();  
        Path workDir = fs.getWorkingDirectory();  
        String dir = workDir + "/" + dirName;  
        Path src = new Path(dir);  
        boolean succ;  
        try {  
            succ = fs.delete(src, true);  
            if (succ) {  
            	System.out.println("remove directory " + dir + " successed. ");  
            } else {  
            	System.out.println("remove directory " + dir + " failed. ");  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    /** 
     * 上傳目錄或檔案 
     *  
     * @param fs 
     * @param local 
     * @param remote 
     */  
    public synchronized static void upload(FileSystem fs, String local,  
            String remote) {  
        // Path home = fs.getHomeDirectory();  
        Path workDir = fs.getWorkingDirectory();  
        Path dst = new Path(workDir + "/" + remote);  
        Path src = new Path(local);  
        try {  
            fs.copyFromLocalFile(false, true, src, dst);  
            System.out.println("upload " + local + " to  " + remote + " successed. ");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    /** 
     * 下載目錄或檔案 
     *  
     * @param fs 
     * @param local 
     * @param remote 
     */  
    public synchronized static void download(FileSystem fs, String local,  
            String remote) {  
        // Path home = fs.getHomeDirectory();  
        Path workDir = fs.getWorkingDirectory();  
        Path dst = new Path(workDir + "/" + remote);  
        Path src = new Path(local);  
        try {  
            fs.copyToLocalFile(false, dst, src);  
            System.out.println("download from " + remote + " to  " + local  
                    + " successed. ");  
        } catch (Exception e) {  
        	e.printStackTrace();  
        }  
    }  
    /** 
     * 位元組數轉換 
     *  
     * @param size 
     * @return 
     */  
    public synchronized static String convertSize(long size) {  
        String result = String.valueOf(size);  
        if (size < 1024 * 1024) {  
            result = String.valueOf(size / 1024) + " KB";  
        } else if (size >= 1024 * 1024 && size < 1024 * 1024 * 1024) {  
            result = String.valueOf(size / 1024 / 1024) + " MB";  
        } else if (size >= 1024 * 1024 * 1024) {  
            result = String.valueOf(size / 1024 / 1024 / 1024) + " GB";  
        } else {  
            result = result + " B";  
        }  
        return result;  
    }  
    /** 
     * 遍歷HDFS上的檔案和目錄 
     *  
     * @param fs 
     * @param path 
     */  
    public synchronized static void listFile(FileSystem fs, String path) {  
        Path workDir = fs.getWorkingDirectory();  
        Path dst;  
        if (null == path || "".equals(path)) {  
            dst = new Path(workDir + "/" + path);  
        } else {  
            dst = new Path(path);  
        }  
        try {  
            String relativePath = "";  
            FileStatus[] fList = fs.listStatus(dst);  
            for (FileStatus f : fList) {  
                if (null != f) {  
                    relativePath = new StringBuffer()  
                            .append(f.getPath().getParent()).append("/")  
                            .append(f.getPath().getName()).toString();  
                    if (f.isDir()) {  
                        listFile(fs, relativePath);  
                    } else {  
                        System.out.println(convertSize(f.getLen()) + "/t/t"  
                                + relativePath);  
                    }  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
        }  
    }  
    public synchronized static void write(FileSystem fs, String path,  
            String data) {  
        // Path home = fs.getHomeDirectory();  
        Path workDir = fs.getWorkingDirectory();  
        Path dst = new Path(workDir + "/" + path);  
        try {  
            FSDataOutputStream dos = fs.create(dst);  
            dos.writeUTF(data);  
            dos.close();  
            System.out.println("write content to " + path + " successed. ");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    public synchronized static void append(FileSystem fs, String path,  
            String data) {  
        // Path home = fs.getHomeDirectory();  
        Path workDir = fs.getWorkingDirectory();  
        Path dst = new Path(workDir + "/" + path);  
        try {  
            FSDataOutputStream dos = fs.append(dst);  
dos.writeUTF(data); dos.close(); System.out.println("append content to " + path + " successed. "); } catch (Exception e) { e.printStackTrace(); } } public synchronized static String read(FileSystem fs, String path) { String content = null; // Path home = fs.getHomeDirectory(); Path workDir = fs.getWorkingDirectory(); Path dst = new Path(workDir + "/" + path); try { // reading FSDataInputStream dis = fs.open(dst); content = dis.readUTF(); dis.close(); System.out.println("read content from " + path + " successed. "); } catch (Exception e) { e.printStackTrace(); } return content; } }
3. 建立測試用例
package Bruce.Hadoop.HDFSManger;

//必須 是hadoop程式才行
import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSTest {
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		FileSystem fs = HDFSUtil.getFileSystem("192.168.100.3", 9000); 
		HDFSUtil.listNode(fs); //列印各個node資訊
		String Dir = "input";
		String FileName = "Name.txt";

		try {
			if(!fs.exists(new Path("input")))
			{
				HDFSUtil.mkdirs(fs, Dir);
				System.out.println("mkdir" + Dir);
			}
			else
			{
				System.out.println( Dir + " exists!");
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		HDFSUtil.write(fs, Dir+"/"+FileName, "bruce wang"); //會重寫的
//		HDFSUtil.append(fs, Dir+"/"+FileName, "/ntest-測試2"); //最好不用對hdfs檔案進行追加操作。支援 性不好

		System.out.println("write " + Dir+"/"+FileName);
		String sFileContend = HDFSUtil.read(fs, Dir+"/"+FileName);
		System.out.println(sFileContend);
		System.out.println("read " + Dir+"/"+FileName);		
	}

}
4. run as hadoop,得到:
12/02/12 01:17:57 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
HostName: BruceWangUbuntu/nName: 192.168.100.3:50010
Decommission Status : Normal
Configured Capacity: 20608348160 (19.19 GB)
DFS Used: 24609 (24.03 KB)
Non DFS Used: 4905893855 (4.57 GB)
DFS Remaining: 15702429696(14.62 GB)
DFS Used%: 0%
DFS Remaining%: 76.19%
Last contact: Sun Feb 12 01:17:48 CST 2012

--------------------------------
input exists!
write content to input/Name.txt successed. 
write input/Name.txt
read content from input/Name.txt successed. 
bruce wang
read input/Name.txt