1. 程式人生 > >HDFS Java API操作

HDFS Java API操作

1.

2.

以下是處於完全分散式
在這裡插入圖片描述

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.
apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.
hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; /** * Description: HDFSAPI<br/> * Copyright (c) , 2018, xlj <br/> * This program is protected by copyright laws. <br/> * Program Name:HDFSAPI.java <br/> * @version : 1.0 */
public class HDFSAPI { public static void main(String[] args) throws IOException { //讀取HDFS的檔案到控制檯,引數是你上傳到HDFS的路徑 catFileToConsole("/data.txt"); //讀取本地檔案中的內容儲存到本地 catFileToLocal("/data.txt"); } /** * @param filePath * @throws IOException */ public static void catFileToConsole(String filePath) throws IOException { /* * 任何檔案系統都是與當前的環境變數緊密聯絡,對於當前的HDFS來說 * 我們需要在建立當前檔案系統的例項之前,必須獲得當前環境變數 * Configuration * 為使用者提供當前環境變數的一個例項。其中封裝了當前搭建環境的配置 * 有了這個配置例項,才能繼續呼叫FileSystem類 */ //1.獲取Configuration物件 Configuration conf = new Configuration(); /*2.需要設定當前相關屬性---->設定core-site.xml對應相關屬性 * 因為現在是要連線HDFS分散式檔案系統,所以要配置一個連線屬性 * 9000--->是一個埠,這是一個內部通訊埠號 * 50070-->HDFS WebUI介面的埠號 * 50090-->SecondaryNamenode埠號 * */ conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); //需要都檔案系統進行連線訪問,FileSystem這是一個抽象類 //提供N個get方法來獲取當前的連線 //當前會丟擲一個IOException的異常,就是一個流操作 FileSystem fs = FileSystem.get(conf); //open方法連線HDFS 引數:一個path--》HDFS分散式檔案系統要訪問的具體的地址 FSDataInputStream fis = fs.open(new Path(filePath)); //推薦一個工具類IOUtils IOUtils.copyBytes(fis, System.out, 4096, true);; } /** * 讀取檔案中的內容儲存到本地 * @param filePath * @throws IOException */ public static void catFileToLocal(String filePath) throws IOException { //1.建立連線配置 Configuration conf = new Configuration(); //2.設計相關屬性 conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); //3.獲取FileSystem物件 FileSystem fs = FileSystem.get(conf); //4.獲取一個輸入流物件,讀取當前HDFS分散式檔案系統中資料夾的內容 FSDataInputStream fis = fs.open(new Path(filePath)); //5.建立一個輸出流將內容寫到本地檔案 OutputStream os = new FileOutputStream(new File("dir/file.txt")); IOUtils.copyBytes(fis, os, 4096, true); } /** * 在檔案系統中建立資料夾 * @param dirPath * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void mkdir(String dirPath) throws IOException, InterruptedException, URISyntaxException { //1.獲取configuration物件 Configuration conf = new Configuration(); //2.直接建立FileSystem //三個引數 : /* * 第一個引數:URI --hdfs的內部通訊網站 URI ---net包 * 第二個引數:Configuration物件 * 第三個引數:使用者---root */ FileSystem fs = FileSystem.get(new URI("hdfs://hadoo1:9000"), conf,"root"); //建立當前路徑下的資料夾,這裡需要注意的是方法名多一個s //就意味著即可以建立一個資料夾,也可以建立多資料夾 boolean result = fs.mkdirs(new Path(dirPath)); if(result) { System.out.println("資料夾建立成功"); }else { System.out.println("資料夾建立失敗"); } } /** * 建立空檔案 * @param filePath * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void touchFile(String filePath) throws IOException, InterruptedException, URISyntaxException { //1.建立Configuration物件 Configuration conf = new Configuration(); //設定屬性 //conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); //有設定過副本數量 conf.set設定副本數量--在不設定副本數量的前提下預設是3 //所以這裡創建出來的檔案使用的是預設值 //若需要自己的副本數量 conf.set需要設定副本屬性 // <name>dfs.replication</name> <value>2</value> // 3.獲取FileSystem物件 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),conf,"root"); //4.建立空檔案 fs.create(new Path(filePath)); System.out.println("建立成功"); } /** * 顯示檔案資訊 * @param dirPath * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void list(String dirPath) throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), conf,"root"); /* * FileSystem 類似於Java中的File * 所有的FileSystem 中提供了一個遍歷資料夾的方式 listStatus 返回值是一個數組,陣列元素的資料型別是FileStatus */ FileStatus [] fss = fs.listStatus(new Path(dirPath)); for (FileStatus f : fss) { System.out.println("檔名字:"+f.getPath().getName()); System.out.println("檔案的所屬者:"+f.getOwner()); System.out.println("檔案的所屬組: "+f.getGroup()); System.out.println("檔案的大小: "+f.getLen()); System.out.println("檔案的副本數:"+f.getReplication()); System.out.println("是否是目錄:"+f.isDir()); } } /** * 獲取HDFS中資源情況 * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void getSource() throws IOException, InterruptedException, URISyntaxException { //通過FileSystem獲取連線 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root"); //通過其子類轉換成當前子類物件並代用其獲取狀態的方法getStatus DistributedFileSystem dfs = (DistributedFileSystem)fs; FsStatus fss = dfs.getStatus(); System.out.println("總量:"+(fss.getCapacity() / 1024 / 1024)+"GB"); System.out.println("使用的量:"+(fss.getUsed() / 1024 / 1024)+"MB"); System.out.println("w維持總量:"+(fss.getRemaining() / 1024 / 1024)+"GB"); } /** * 獲取單臺節點的資訊 * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void getNodeInfos() throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root"); DistributedFileSystem dfs = (DistributedFileSystem)fs; DatanodeInfo[] dis = dfs.getDataNodeStats(); for (DatanodeInfo datanodeInfo : dis) { System.out.println("當前節點的總容量:"+(datanodeInfo.getCapacity())); System.out.println("HostName:"+(datanodeInfo.getHostName())); System.out.println("IP地址:"+(datanodeInfo.getIpAddr())); System.out.println(datanodeInfo.getName()); } } /** * 塊資訊 * @param filePath * @throws IOException * @throws URISyntaxException * @throws InterruptedException */ public static void getBlockInfos(String filePath) throws IOException, InterruptedException, URISyntaxException { //通過FileSystem獲取連線 FileSystem fs = FileSystem.get(new URI("hdfs://hadoo1:9000"), new Configuration(), "root"); //拿到當前檔案的描述資訊 FileStatus fss = fs.getFileStatus(new Path(filePath)); //獲取檔案中塊的資訊-->返回值是一個數組,為什麼 //因為只要檔案大於128M就會被切分為其他塊 BlockLocation[] bls = fs.getFileBlockLocations(fss, 0, fss.getLen()); for (BlockLocation bl : bls) { //BlockLocation方法:返回值是字串陣列,將bl物件中的屬性值存在陣列中 for (int i = 0; i < bl.getHosts().length; i++) { System.out.println(bl.getHosts()[i]); } } } /** * 帶進度的檔案上傳 * @param filePath * @param filepath2 * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void upLoadProcess(String filePath,String filepath2) throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root"); FSDataOutputStream fos = fs.create(new Path(filePath),new Progressable() { //Progressable介面中提供了一個progress這個方法,每次在寫檔案的時候寫64K @Override public void progress() { //迴圈 獲取檔案大小等等 System.out.println("*"); } }); //獲取一個檔案 InputStream fis = new FileInputStream(new File(filepath2)); IOUtils.copyBytes(fis, fos, 1024,true); } /** * 檔案的上傳 相當於 -put * @param paths * @param HDFSfilePath * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void upload(String localFilePath,String HDFSfilePath) throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(), "root"); /* * 第一個 引數:本地檔案的位置 * 第二個引數:HDFS檔案系統中儲存檔案的位置 */ fs.copyFromLocalFile(new Path(localFilePath), new Path(HDFSfilePath)); } public static void uploads(Path []paths ,String HDFSfilePath) throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(), "root"); /*1.上傳成功後是否要刪除本地檔案 *2.上傳後若有相同檔案是否要覆蓋 *這兩個引數都是boolean型別 *true---上傳成功後要刪除和覆蓋 *false--不刪除也不覆蓋 *3.刪除是多個檔案路徑 *4.HDFS檔案系統中要儲存的路徑 */ fs.copyFromLocalFile(false, true,paths,new Path(HDFSfilePath)); } public static void download(String HDFSfilePath,String localFilePath) throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root"); /* * 1.下載完後是否要刪除HDFS檔案 boolean * 2.HDFS檔案系統的路徑 * 3.當前下載到本地的路徑 * 4.是否要使用本地的檔案系統,改用為java的IO流 */ fs.copyToLocalFile(false, new Path(HDFSfilePath),new Path(localFilePath),true); } /** * 刪除HDFS檔案系統中的檔案 * @param filePath * @throws URISyntaxException * @throws InterruptedException * @throws IOException */ public static void deleteFile(String filePath) throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(),"root"); Path path = new Path(filePath); if(fs.exists(path)) { //檢查是不是一個目錄,是就遞迴刪除,不是就直接刪除 if(fs.isDirectory(path)) { fs.delete(path,true); }else { fs.delete(path,false); } }else { System.out.println("刪除的檔案不存在"); } } /** * 移動HDFS系統中的檔案,集移動剪下於一身 * @param src * @param dsc * @throws IOException * @throws InterruptedException * @throws URISyntaxException */ public static void rename(String src,String dsc) throws IOException, InterruptedException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(),"root"); fs.rename(new Path(src), new Path(dsc)); } }