1. 程式人生 > >Windows系統上 HDFS java API的使用

Windows系統上 HDFS java API的使用

1.建立資料夾

package hdfsApi1;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
//建立資料夾
public class Api1 {
	public static void main(String[] args) {
		try {
//			讀取配置檔案
			Configuration conf  = new Configuration();
//			遠端連線
			URI uri = new URI("hdfs://192.168.200.133:9000");
//			得到檔案系統
			FileSystem fs = FileSystem.get(uri, conf, "wsm");
//			要建立的檔案路徑 包裝為Path
			Path path = new Path("/spark/file");
			if (!fs.exists(path)) {
				if (fs.mkdirs(path)) {
					System.out.println("檔案建立成功");
				}else {
					System.out.println("檔案建立失敗");
				}
			}else {
				System.out.println("檔案已經存在");
			}
		} catch (URISyntaxException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		
		
		
	}
}

2.列出檔案狀態

package hdfsApi1;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
//列出檔案狀態
public class API2 {
	public static void main(String[] args) {
		try {
//			讀取配置檔案
			Configuration conf = new Configuration();
//			遠端URI
			URI uri = new URI("hdfs://192.168.200.132:9000");
//			獲取檔案系統
			FileSystem fs = FileSystem.get(uri,conf,"python");
//			需要列出哪個路徑下的檔案
			Path path = new Path("/wsm");
			FileStatus[] fss = fs.listStatus(path);
			for(FileStatus f1 : fss) {
				System.out.println(f1.getPath().getName());
			}
			
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}

3.建立檔案,並寫入資料

package hdfsApi1;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

//建立檔案 並且寫入資料
public class API3 {
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
//			設定檔案的副本數 因為搭建的是偽分散式 所以副本數為 1
			conf.set("dfs.replication","1");
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf,"python");
			String fileName = "/wsm/1.txt";
			Path path = new Path(fileName);
			if (!fs.exists(path)) {
//				通過 FSDataOutputStream 向檔案寫入資料 append()是追加
				FSDataOutputStream fos = fs.create(path);
				fos.writeBytes("this is 1.txt");
				fos.flush();
				fos.close();
			}else {
				System.out.println("檔案已經存在");
			}
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}

4.讀取檔案

public class API4 {
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf, "python");
			Path f = new Path("/wsm/1.txt");
//			開啟檔案
			FSDataInputStream fis = fs.open(f);
			
			int len = -1;
			byte[] bt = new byte[1024];
			while ( (len=fis.read(bt)) >0 ) {
				System.out.println(new String(bt,0,len));
			}
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}

5.刪除檔案

public class API5 {
	public static void main(String[] args) {
		try {
			
			Configuration conf  = new Configuration();
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf, "python");
			
			Path path = new Path("/wsm/1.txt");
			if (fs.exists(path)) {
				//false 不遞迴刪除
				if (fs.delete(path,false)) {
					System.out.println("檔案刪除成功");
				}else {
					System.out.println("檔案刪除失敗");
				}
			}else {
				System.out.println("檔案不存在");
			}
		} catch (URISyntaxException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

6.使用copyFromLocalFile上傳檔案

//檔案上傳
public class API7 {
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			conf.set("dfs.replication","1");
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf,"python");
			Path dst = new Path("/wsm/2_upload.txt");
			Path src = new Path("upload.txt");
			fs.copyFromLocalFile(src, dst);
			System.out.println("上傳成功");
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}

7.使用copyToLocalFile實現檔案下載

//檔案下載
public class API8 {
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			conf.set("dfs.replication","1");
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf,"python");
			Path src = new Path("/wsm/2_upload.txt");
			Path dst = new Path("download2.txt");
			fs.copyToLocalFile(src, dst);
			System.out.println("下載成功");
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}

8.使用IOUtils讀取檔案

public class API9 {
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			conf.set("dfs.replication","1");
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf,"python");
			
			Path path = new Path("/wsm/2_upload.txt");
			FSDataInputStream fis = fs.open(path);
			
			IOUtils.copy(fis, System.out);
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}

9.用IOUtils寫入檔案 然後用FileInputStream讀取

public class API_B10 {
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			conf.set("dfs.replication","1");
			URI uri = new URI("hdfs://192.168.200.132:9000");
			FileSystem fs = FileSystem.get(uri, conf,"python");
			Path path = new Path("/wsm/1.txt");
			FSDataOutputStream fos = fs.append(path);//檔案系統追加其他檔案的內容
			FileInputStream fis = new FileInputStream(new File("upload.txt"));
			IOUtils.copy(fis, fos);
			System.out.println("成功");
		} catch (URISyntaxException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}