1. 程式人生 > >hdfs api 實戰

hdfs api 實戰

直接上程式碼:
package com.linewell.hdfs;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;


public class hdfsTest {
	 private static String HDFSUri = "hdfs://192.168.72.129:9000";
	public static void main(String[] args) {
		//mkdir("/local/user");  //在hdfs上建立目錄
		//System.out.println(existDir("/la", false)); //判斷hdfs上是否存在該目錄或檔案並列印
		//getFile("/local/abc.txt", "D://aaa.txt");// 從hdfs下載檔案到本地系統		
		try {
			copyFileToHDFS("D://abc.txt","/local/user/bbb.txt" );//從本地系統上傳檔案到hdfs上
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
				
	}
	/**
     * 刪除檔案或者檔案目錄
     *
     * @param path
     *
     */
	public static void rmdir(String path){
		FileSystem fs = getFileSystem();
		path = HDFSUri + path;
		
			try {
				fs.delete(new Path(path), true);
			} catch (IllegalArgumentException | IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}finally {
				try {
					fs.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}		
	}
	
	
	
	/**
     * 從 HDFS 下載檔案到本地
     *
     * @param srcFile HDFS檔案路徑
     * @param destPath 本地路徑
     */
	public static void getFile(String srcFile,String dsetFile){
	 Configuration conf =new Configuration();
	 //構建filesystem 
	 try {
		FileSystem fs = FileSystem.get(URI.create(HDFSUri), conf);
		InputStream is = fs.open(new Path(srcFile));
		IOUtils.copyBytes(is, new FileOutputStream(new File(dsetFile)), conf,true);
		fs.close();
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	}
	
	
	
	
	
	
	/**
     * 本地檔案上傳至 HDFS
     *
     * @param srcFile 原始檔 路徑
     * @param destPath hdfs路徑
     */
    public static void copyFileToHDFS(String srcFile,String destPath)throws Exception{


        FileInputStream fis=new FileInputStream(new File(srcFile));//讀取本地檔案
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(HDFSUri), conf);
        OutputStream os=fs.create(new Path(destPath));
        //copy
        IOUtils.copyBytes(fis, os, 4096, true);   //ture 結束後關閉fis和os
        System.out.println("拷貝完成...");
        fs.close();
    }
	
	
/*
 * 判斷目錄是否存在
 * @param filepath 目錄路徑
 * @param Create 判斷是否建立目錄
 * 
 * 	
 */
	public static boolean existDir(String filePath,boolean Create){
		boolean flag =false;
		if(StringUtils.isEmpty(filePath)){
			return flag;
		}				
			try {
				Path path =new Path(filePath);
				FileSystem fs =getFileSystem();
				if(Create){
				if(!fs.exists(path)){
					fs.mkdirs(path);
				}
				}
				if(fs.isDirectory(path)){
					flag=true;
				}
				
				
			}catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			return flag;
		}
	
	
	
	
	
	
	 /**
     * 建立檔案目錄
     *
     * @param path 檔案路徑
     */
    public static void mkdir(String path) {
     
      try {
    	  Configuration conf =new Configuration();
    	 FileSystem fs =FileSystem.get(URI.create(HDFSUri),conf);
          System.out.println("filepath="+path);
          fs.mkdirs(new Path(path));
          fs.close();
	} catch (IllegalArgumentException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
    }
	
	
	
	
	/**
     * 獲取檔案系統
     *
     * @return FileSystem 檔案系統
     */
    public static FileSystem getFileSystem() {
        //讀取配置檔案
        Configuration conf = new Configuration();
        // 檔案系統
        FileSystem fs = null;
        String hdfsUri = HDFSUri;
        if(StringUtils.isBlank(hdfsUri)){
            // 返回預設檔案系統  如果在 Hadoop叢集下執行,使用此種方法可直接獲取預設檔案系統
            try {
                fs = FileSystem.get(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else{
            // 返回指定的檔案系統,如果在本地測試,需要使用此種方法獲取檔案系統
            try {
                URI uri = new URI(hdfsUri.trim());
                fs = FileSystem.get(uri,conf);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return fs;
    }

}