1. 程式人生 > >java操作hadoop的hdfs檔案工具類

java操作hadoop的hdfs檔案工具類

package com.cictec.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * Created by Administrator on 2017/5/25.
 */
public class HdfsOperation {
	private FileSystem hdfs;

	/**
	 * @return 得到hdfs的連線 FileSystem類
	 * @throws URISyntaxException
	 * @throws IOException
	 * @throws InterruptedException
	 */
	public static FileSystem getFileSystem() throws URISyntaxException, IOException, InterruptedException {
		// 獲取FileSystem類的方法有很多種,這裡只寫一種
		Configuration config = new Configuration();
		URI uri = new URI("hdfs://192.168.10.242:9000");
		return FileSystem.get(uri, config, "root");// 第一位為uri,第二位為config,第三位是登入的使用者
	}

	/**
	 * 檢查檔案或者資料夾是否存在
	 * 
	 * @param filename
	 * @return
	 */
	public boolean checkFileExist(String filename) {
		try {
			Path f = new Path(filename);
			return hdfs.exists(f);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 建立資料夾
	 * 
	 * @param dirName
	 * @return
	 */
	public boolean mkdir(String dirName) {
		if (checkFileExist(dirName))
			return true;
		try {
			Path f = new Path(dirName);
			System.out.println("Create and Write :" + f.getName() + " to hdfs");
			return hdfs.mkdirs(f);
		} catch (Exception e) {
			e.printStackTrace();
		}

		return false;
	}

	/**
	 * 建立一個空檔案
	 * 
	 * @param filePath
	 *            檔案的完整路徑名稱
	 * @return
	 */
	public boolean mkfile(String filePath) {
		try {
			Path f = new Path(filePath);
			FSDataOutputStream os = hdfs.create(f, true);
			os.close();
			return true;
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 複製檔案到指定目錄
	 * 
	 * @param srcfile
	 *            複製的檔案路徑
	 * @param desfile
	 *            貼上的路徑
	 * @return
	 */
	public boolean hdfsCopyUtils(String srcfile, String desfile) {
		Configuration conf = new Configuration();
		Path src = new Path(srcfile);
		Path dst = new Path(desfile);
		try {
			FileUtil.copy(src.getFileSystem(conf), src, dst.getFileSystem(conf), dst, false, conf);
		} catch (IOException e) {
			return false;
		}
		return true;
	}

	/**
	 * 移動檔案或者資料夾
	 * 
	 * @param src
	 *            初始路徑
	 * @param dst
	 *            移動結束路徑
	 * @throws Exception
	 */
	public void movefile(String src, String dst) throws Exception {
		Path p1 = new Path(src);
		Path p2 = new Path(dst);
		hdfs.rename(p1, p2);
	}

	/**
	 * 刪除檔案或者資料夾
	 * 
	 * @param src
	 * @throws Exception
	 */
	public void delete(String src) throws Exception {
		Path p1 = new Path(src);
		if (hdfs.isDirectory(p1)) {
			hdfs.delete(p1, true);
			System.out.println("刪除資料夾成功: " + src);
		} else if (hdfs.isFile(p1)) {
			hdfs.delete(p1, false);
			System.out.println("刪除檔案成功: " + src);
		}
	}

	/**
	 * 讀取本地檔案到HDFS系統, 保證檔案格式是utf-8
	 * 
	 * @param localFilename
	 * @param hdfsPath
	 * @return
	 */
	public boolean copyLocalFileToHDFS(String localFilename, String hdfsPath) {
		try {
			// 如果路徑不存在就建立資料夾
			mkdir(hdfsPath);

			File file = new File(localFilename);
			FileInputStream is = new FileInputStream(file);

			// 如果hdfs上已經存在檔案,那麼先刪除該檔案
			if (this.checkFileExist(hdfsPath + "/" + file.getName())) {
				delete(hdfsPath + "/" + file.getName());
			}

			Path f = new Path(hdfsPath + "/" + file.getName());

			FSDataOutputStream os = hdfs.create(f, true);
			byte[] buffer = new byte[10240000];
			int nCount = 0;

			while (true) {
				int bytesRead = is.read(buffer);
				if (bytesRead <= 0) {
					break;
				}

				os.write(buffer, 0, bytesRead);
				nCount++;
				if (nCount % (100) == 0)
					System.out.println((new Date()).toLocaleString() + ": Have move " + nCount + " blocks");
			}

			is.close();
			os.close();
			System.out.println((new Date()).toLocaleString() + ": Write content of file " + file.getName()
					+ " to hdfs file " + f.getName() + " success");
			return true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 複製本地資料夾到hdfs的檔案
	 * 
	 * @param localPath
	 * @param hdfsPath
	 * @return
	 */
	public boolean CopyLocalDirTohdfs(String localPath, String hdfsPath) {
		try {
			File root = new File(localPath);
			File[] files = root.listFiles();

			for (File file : files) {
				if (file.isFile()) {
					copyLocalFileToHDFS(file.getPath().toString(), hdfsPath);

				} else if (file.isDirectory()) {
					CopyLocalDirTohdfs(localPath + "/" + file.getName(), hdfsPath + "/" + file.getName());
				}
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 從hdfs下載
	 * 
	 * @param hdfsFilename
	 * @param localPath
	 * @return
	 */
	public boolean downloadFileFromHdfs(String hdfsFilename, String localPath) {
		try {
			Path f = new Path(hdfsFilename);

			FSDataInputStream dis = hdfs.open(f);
			File file = new File(localPath + "/" + f.getName());
			FileOutputStream os = new FileOutputStream(file);

			byte[] buffer = new byte[1024000];
			int length = 0;
			while ((length = dis.read(buffer)) > 0) {
				os.write(buffer, 0, length);
			}

			os.close();
			dis.close();

			return true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * HDFS 到 HDFS 的合併 hdfs提供了一種FileUtil.copyMerge()的方法, 注意下面的 false
	 * 這個,如果改為true,就會刪除這個目錄
	 * 
	 * @param folder
	 *            需要合併的目錄
	 * @param file
	 *            要合併成的檔案,完整路徑名稱
	 */
	public void copyMerge(String folder, String file) {
		Configuration conf = new Configuration();
		Path src = new Path(folder);
		Path dst = new Path(file);

		try {
			FileUtil.copyMerge(src.getFileSystem(conf), src, dst.getFileSystem(conf), dst, false, conf, null);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/**
	 * 列出所有DataNode的名字資訊
	 */
	public void listDataNodeInfo() {
		try {
			DistributedFileSystem fs = null;
			fs = (DistributedFileSystem) hdfs;
			DatanodeInfo[] dataNodeStats = fs.getDataNodeStats();
			String[] names = new String[dataNodeStats.length];
			System.out.println("List of all the datanode in the HDFS cluster:");

			for (int i = 0; i < names.length; i++) {
				names[i] = dataNodeStats[i].getHostName();
				System.out.println(names[i]);
			}
			System.out.println(hdfs.getUri().toString());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 檢測是否是備用節點
	 * 
	 * @throws Exception
	 */
	public boolean checkStandbyException(String filename) {
		try {
			Path f = new Path(filename);
			hdfs.exists(f);
		} catch (org.apache.hadoop.ipc.RemoteException e) {
			if (e.getClassName().equals("org.apache.hadoop.ipc.StandbyException")) {
				return true;
			}
		} catch (Exception e) {

		}
		return false;
	}

	/**
	 * 合併檔案
	 * 
	 * @param fileList
	 * @param tarPath
	 * @param rowTerminateFlag
	 * @return
	 */
	public boolean mergeDirFiles(List<FileStatus> fileList, String tarPath, String rowTerminateFlag) {
		// rowTerminateFlag \n
		FSDataOutputStream tarFileOutputStream = null;
		FSDataInputStream srcFileInputStream = null;

		try {
			Path tarFile = new Path(tarPath);
			tarFileOutputStream = hdfs.create(tarFile, true);

			byte[] buffer = new byte[1024000];
			int length = 0;
			long nTotalLength = 0;
			int nCount = 0;
			boolean bfirst = true;
			for (FileStatus file : fileList) {
				if (file.getPath().equals(tarFile)) {
					continue;
				}
				System.out.println(" merging file from  " + file.getPath() + " to " + tarPath);

				if (!bfirst) {
					// 新增換行符
					tarFileOutputStream.write(rowTerminateFlag.getBytes(), 0, rowTerminateFlag.length());
				}

				srcFileInputStream = hdfs.open(file.getPath(), buffer.length);
				while ((length = srcFileInputStream.read(buffer)) > 0) {
					nCount++;
					tarFileOutputStream.write(buffer, 0, length);
					nTotalLength += length;
					// System.out.println(" file length " + file.getLen() + "
					// read " + length);
					if (nCount % 1000 == 0) {
						tarFileOutputStream.flush();
						System.out.println(
								(new Date()).toLocaleString() + ": Have move " + (nTotalLength / 1024000) + " MB");
					}

				}

				srcFileInputStream.close();

				bfirst = false;
			}

		} catch (Exception e) {
			e.printStackTrace();
			try {
				delete(tarPath);
			} catch (Exception e2) {
				// TODO: handle exception
			}
			return false;
		} finally {
			try {
				if (tarFileOutputStream != null) {
					tarFileOutputStream.flush();
					tarFileOutputStream.close();
					srcFileInputStream.close();
				}
			} catch (Exception e2) {
				// TODO: handle exception
			}
		}
		return true;
	}

	/**
	 * 將一個字串寫入某個路徑
	 *
	 * @param text
	 *            要儲存的字串
	 * @param path
	 *            要儲存的路徑
	 */
	public void writerString(String text, String path) {

		try {
			Path f = new Path(path);
			FSDataOutputStream os = hdfs.create(f, true);
			BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os, "utf-8"));// 以UTF-8格式寫入檔案,不亂碼
			writer.write(text);
			writer.close();
			os.close();
		} catch (Exception e) {
			e.printStackTrace();

		}

	}

	/**
	 * 按行讀取檔案內容,並且防止亂碼
	 * 
	 * @param hdfsFilename
	 * @return
	 */
	public boolean readByLine(String hdfsFilename) {
		try {
			Path f = new Path(hdfsFilename);

			FSDataInputStream dis = hdfs.open(f);

			BufferedReader bf = new BufferedReader(new InputStreamReader(dis));// 防止中文亂碼
			String line = null;
			while ((line = bf.readLine()) != null) {
				System.out.println(new String(line.getBytes(), "utf-8"));
			}

			dis.close();
			return true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 
	 * @param srcPath
	 * @param tarPath
	 * @throws Exception
	 */
	public void reNameExistsPath(String srcPath, String tarPath) throws Exception {
		// 檢測輸出目錄是否存在,存在就改名
		if (checkFileExist(srcPath)) {
			tarPath = srcPath.trim();
			while (tarPath.charAt(tarPath.length() - 1) == '/') {
				tarPath = tarPath.substring(0, tarPath.length() - 1);
			}
			Date now = new Date();
			SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmss");
			String nowStr = dateFormat.format(now);
			tarPath += "_" + nowStr;
			movefile(srcPath, tarPath);
		} else {
			tarPath = srcPath;
		}
	}
}