1. 程式人生 > >HADOOP Java API 之 HDFS2.x操作

HADOOP Java API 之 HDFS2.x操作

Java api操作hdfs2.x, 主要包括以下幾個方法:

  • 1. create dir
  • 2.1 create file(don’t use IOUtils)
  • 2.2 create file(use IOUtils)
  • 3. upload local file(s)
  • 4. rename file(s)
  • 5. rename file(s)
  • 6. scan dirs and file information
  • 7. 查詢某個檔案在HDFS叢集的位置
  • 8. 獲取HDFS叢集上所有節點名稱資訊

程式碼demo如下

package com.david.bigdata.hadoop2x.hads_api;


import
org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.io.IOUtils; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; /** * Created by david on 16/11/13. */
public class HdfsCURDTest { public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://localhost:9000"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem hdfs = null; FileStatus[] fstatus = null
; try { hdfs = FileSystem.get(conf); System.out.println("connect HDFS: " + new URI("hdfs://localhost:9000")); } catch (Exception e) { System.err.println("Error on connect HDFS"); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 1. create dir--------------\n"); //hdfs操作之: 1. create dir try { if (!hdfs.exists(new Path("/liuwei0376"))) { hdfs.mkdirs(new Path("/liuwei0376")); } } catch (IOException e) { e.printStackTrace(); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 2.1 create file(don't use IOUtils)--------------\n"); //hdfs操作之: 2.1 create file(don't use IOUtils) String fsrc = "/Users/david/Downloads/accounts.json"; FileInputStream fis = null; FSDataOutputStream fsDataOutputStream = null; try { fis = new FileInputStream(fsrc); Path path = new Path("/liuwei0376/mr/accounts.json"); fsDataOutputStream = hdfs.create(path); byte[] buff = new byte[1024]; int readCount = 0; readCount = fis.read(buff); while (readCount != -1) { fsDataOutputStream.write(buff, 0, readCount); readCount = fis.read(buff);//read next patch data } System.out.println(path + " create is over"); } catch (IOException e) { e.printStackTrace(); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if (fsDataOutputStream != null) { try { fsDataOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 2.2 create file(use IOUtils)--------------\n"); //hdfs操作之: 2.2 create file(use IOUtils) FSDataOutputStream out2 = null; FileInputStream in2 = null; int buff2 = 1024; try { out2 = hdfs.create(new Path("/liuwei0376/mr/dependency.txt")); in2 = new FileInputStream("/Users/david/Downloads/dependency.txt"); /** * copyBytes method usage * * in: origin file path * out: hdfs dir * buff2: buffer size * close: whether close the stream. */ IOUtils.copyBytes(in2, out2, buff2, true); } catch (Exception e) { e.printStackTrace(); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 3. upload local file(s)--------------\n"); //hdfs操作之: 3. upload local file(s) /** * delSrc - whether to delete the src是否刪除原始檔 overwrite - whether to overwrite an existing file是否覆蓋已存在的檔案 srcs - array of paths which are source 可以上傳多個檔案陣列方式 dst – path 目標路徑 fileSystem.copyFromLocalFile(src, dst); fileSystem.copyFromLocalFile(delSrc, src, dst); fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst); fileSystem.copyFromLocalFile(delSrc, overwrite, srcs, dst); */ try { hdfs.copyFromLocalFile( true, true, new Path("/Users/david/Downloads/selenium-java-3.0.0-beta2"), new Path("/liuwei0376/mr/selenium-java-3.0.0-beta2") ); } catch (IOException e) { System.out.println("error in copyFromLocalFile"); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 4. rename file(s)--------------\n"); //hdfs操作之: 4. rename file(s) /** * fileSystem.rename(src, dst); */ try { hdfs.rename( new Path("/liuwei0376/mr/selenium-java-3.0.0-beta2"), new Path("/liuwei0376/mr/selenium-java-3.0.0-beta3") ); } catch (IOException e) { System.err.println("hdfs.rename error"); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 5. rename file(s)--------------\n"); //hdfs操作之: 5. rename file(s) /** * True 表示遞迴刪除 * fileSystem.delete(new Path("/d1"), true); */ try { hdfs.delete(new Path("/liuwei0376/mr/selenium-java-3.0.0-beta3"), true); } catch (IOException e) { System.err.println("hdfs.delete error"); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 6. scan dirs and file information--------------\n"); //hdfs操作之: 6. scan dirs and file information int i = 0; try { fstatus = hdfs.listStatus(new Path("/user/david/hadoop_java_files")); System.out.println(fstatus.length); //列出檔案屬性 for (FileStatus fs : fstatus) { System.out.println("\n------- " + ++i + " -------"); System.out.println("fs.getAccessTime() = " + fs.getAccessTime()); System.out.println("fs.getGroup() = " + fs.getGroup()); System.out.println("fs.getOwner() = " + fs.getOwner()); System.out.println("fs.getBlockSize() = " + fs.getBlockSize()); System.out.println("fs.getLen() = " + fs.getLen()); System.out.println("fs.getModificationTime() = " + fs.getModificationTime()); System.out.println("fs.getPath() = " + fs.getPath()); System.out.println("fs.getPermission() = " + fs.getPermission()); System.out.println("fs.getReplication() = " + fs.getReplication()); } } catch (IOException e) { e.printStackTrace(); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 7. 查詢某個檔案在HDFS叢集的位置--------------\n"); //hdfs操作之: 7. 查詢某個檔案在HDFS叢集的位置 /** * 注意: Path 必須是檔案, 不能為目錄 */ try { FileStatus fs7 = hdfs.getFileStatus(new Path("/liuwei0376/mr/accounts.json")); BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fs7, 0, fs7.getLen()); for (int j = 0, k = blockLocations.length; j < k; j++) { String[] hosts = blockLocations[j].getHosts(); System.out.println("block_" + j + "_location: " + hosts[0]); } } catch (IOException e) { e.printStackTrace(); } System.out.println("\n-------------昏鴿線--------------\n"); System.out.println("\n-------------hdfs操作之: 8. 獲取HDFS叢集上所有節點名稱資訊--------------\n"); //hdfs操作之: 8. 獲取HDFS叢集上所有節點名稱資訊 DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs; try { DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats(); for (int n = 0, m = datanodeInfos.length; n < m; n++) { System.out.println("datanode_" + n + "_name: " + datanodeInfos[n].getHostName()); } } catch (IOException e) { e.printStackTrace(); } if (hdfs != null) { try { hdfs.close(); } catch (IOException e) { e.printStackTrace(); } } } }