1. 程式人生 > >java實現HDFS增刪改查

java實現HDFS增刪改查

環境:Hadoop 2.7.3

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.zookeeper.common.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import
java.net.URI; import java.util.ArrayList; import java.util.List; /** * Created by LCY on 4/2/2018. * 對HDFS執行上傳檔案、上傳目錄、檢視檔案、檢視目錄、刪除檔案等操作 */ public class HDFSUtils { /** * 在HDFS建立新的目錄 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param dir 路徑,比如: '/tmp/testdir' * @return boolean true-success, false-failed * @exception
IOException something wrong happends when operating files */
public boolean mkdir(String uri,String dir) { try { if (StringUtils.isBlank(dir)) { return false; } dir = uri + dir; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dir), conf); if
(!fs.exists(new Path(dir))) { fs.mkdirs(new Path(dir)); } fs.close(); return true; } catch (IOException e) { System.err.println("ERROR:IO"); return false; } } /** * 在HDFS刪除目錄 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param dir 檔案路徑 * @return boolean true-success, false-failed * @exception IOException 如果檔案已經開啟會跑出異常 FileNotFoundException * */ public boolean deleteDir(String uri,String dir) throws IOException { if (StringUtils.isBlank(dir)) { return false; } dir = uri + dir; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dir), conf); fs.delete(new Path(dir), true); fs.close(); return true; } /** * 獲取檔案列表 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param dir 目錄路徑 * @return List<String> list of file names * @throws IOException file io exception */ public List<String> listAll(String uri,String dir) throws IOException { if (StringUtils.isBlank(dir)) { return new ArrayList<String>(); } dir = uri + dir; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dir), conf); FileStatus[] stats = fs.listStatus(new Path(dir)); List<String> names = new ArrayList<String>(); for (int i = 0; i < stats.length; ++i) { if (stats[i].isFile()) { // regular file names.add(stats[i].getPath().toString()); } else if (stats[i].isDirectory()) { // dir names.add(stats[i].getPath().toString()); } else if (stats[i].isSymlink()) { // is s symlink in linux names.add(stats[i].getPath().toString()); } } fs.close(); return names; } /* * 上傳檔案到HDFS * 注意:路徑為完整路徑 * 如果本地檔案不存在有異常 FileNotFoundException * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param localFile 本地檔案路徑例如F:/test.txt or /usr/local/test.txt * * @param hdfsFile hdfs路徑例如: /tmp/dir * @return boolean true-success, false-failed * * @throws IOException file io exception */ public boolean uploadLocalFile2HDFS(String uri,String localFile, String hdfsFile) { if (StringUtils.isBlank(localFile) || StringUtils.isBlank(hdfsFile)) { return false; } try { hdfsFile = uri + hdfsFile; Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(uri), config); Path src = new Path(localFile); Path dst = new Path(hdfsFile); hdfs.copyFromLocalFile(src, dst); hdfs.close(); return true; } catch (IOException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); } return false; } /* * 在HDFS建立一個新的檔案,並寫入content資料 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param newFile 檔案路徑: '/tmp/test.txt' * @param content 寫入的內容 * @return boolean true-success, false-failed * @throws IOException file io exception */ public boolean createNewHDFSFile(String uri,String newFile, String content) throws IOException { if (StringUtils.isBlank(newFile) || null == content) { return false; } newFile = uri + newFile; Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(newFile), config); FSDataOutputStream fsdos = hdfs.create(new Path(newFile)); fsdos.write(content.getBytes("UTF-8")); fsdos.close(); hdfs.close(); return true; } /** * 在HDFS刪除檔案 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param hdfsFile 完整的路徑 '/tmp/test.txt' * @return boolean true-success, false-failed * @throws IOException file io exception */ public boolean deleteHDFSFile(String uri,String hdfsFile) throws IOException { if (StringUtils.isBlank(hdfsFile)) { return false; } hdfsFile = uri + hdfsFile; Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(hdfsFile), config); Path path = new Path(hdfsFile); boolean isDeleted = hdfs.delete(path, true); hdfs.close(); return isDeleted; } /** * 讀取檔案內容 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param hdfsFile 完整的檔案路徑 '/tmp/test.txt' * @return 檔案的byte[] * @throws IOException file io exception */ public byte[] readHDFSFile(String uri,String hdfsFile) throws Exception { if (StringUtils.isBlank(hdfsFile)) { return null; } hdfsFile = uri + hdfsFile; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf); // check if the file exists Path path = new Path(hdfsFile); if (fs.exists(path)) { FSDataInputStream is = fs.open(path); // get the file info to create the buffer FileStatus stat = fs.getFileStatus(path); // create the buffer byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.close(); return buffer; } else { throw new Exception("the file is not found ."); } } /** * 在檔案中寫入內容 * * @param uri HDFS地址,比如: 'hdfs://192.168.12.12' * @param hdfsFile 檔案路徑'/tmp/test.txt' * @param content 寫入的內容 * @return boolean true-success, false-failed * @throws Exception something wrong */ public boolean append(String uri,String hdfsFile, String content) throws Exception { if (StringUtils.isBlank(hdfsFile)) { return false; } if(StringUtils.isEmpty(content)){ return true; } hdfsFile = uri + hdfsFile; Configuration conf = new Configuration(); // solve the problem when appending at single datanode hadoop env conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf); // check if the file exists Path path = new Path(hdfsFile); if (fs.exists(path)) { try { InputStream in = new ByteArrayInputStream(content.getBytes()); OutputStream out = fs.append(new Path(hdfsFile)); IOUtils.copyBytes(in, out, 4096, true); out.close(); in.close(); fs.close(); } catch (Exception ex) { fs.close(); throw ex; } } else { createNewHDFSFile(uri,hdfsFile, content); } return true; } }

呼叫:

public class test {
    @Test
    public void update() {
        String uri="hdfs://172.16.0.108";
        HDFSUtils hdfsUtils=new HDFSUtils();
      hdfsUtils.uploadLocalFile2HDFS(uri,"data/ITEM.txt","/tmp/test");
    }

}