1. 程式人生 > >Hadoop --- 入門之HDFS的JAVA API操作

Hadoop --- 入門之HDFS的JAVA API操作

JAR準備:

將hadoop-2.8.0中share目錄下的jar包新增到工程中:

  • common下的hadoop-common-2.8.0.jar
  • common/lib下的所有jar
  • hdfs下的hadoop-hdfs-2.8.0.jar
  • hdfs/lib下的所有jar

示例:

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.net.URI;
import java.util.Iterator;
import java.util.Map.Entry;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;

//客戶端操作hdfs,是有一個使用者身份的
//預設情況下,hdfs客戶端api會從jvm中獲取一個引數來作為自己的使用者身份:-DHADOOP_USER_NAME=hadoop
public class HdfsClientDemo {

	private FileSystem fs;
	private Configuration conf;

	@Before
	public void init() throws Exception{
		
		conf = new Configuration();
		conf.set("dfs.replication", "2");

		fs = FileSystem.get(new URI("hdfs://192.168.153.136:9000"),conf,"hadoop");
	}
	
	// 獲取conf配置引數
	@Test
	public void testConf(){
		Iterator<Entry<String, String>> iterator = conf.iterator();
		while (iterator.hasNext()) {
			Entry<String, String> ent = iterator.next();
			System.out.println(ent.getKey() + ": " + ent.getValue());
		}
	}
	
	// 上傳檔案到HDFS檔案系統
	@Test
	public void testUpload() throws Exception {
		// (1)該方式是通過IO流的方式上傳檔案到HDFS檔案系統
		//FileInputStream fileInputStream =new FileInputStream("e:/Spring MVC.docx");
		//FSDataOutputStream fsDataOutputStream = fs.create(new Path("/Spring MVC222.docx"));
		//IOUtils.copy(fileInputStream, fsDataOutputStream);
		
		// (2)方式二
		boolean windowsAbsolutePath = Path.isWindowsAbsolutePath("e:/Spring MVC.docx", true);
		System.out.println(windowsAbsolutePath);
		if (windowsAbsolutePath) {
			fs.copyFromLocalFile(new Path("e:/Spring MVC.docx"), new Path("/Spring MVC.docx"));
			fs.close();
		}
	}
	
	// 從HDFS檔案系統下載檔案到本地
	@Test
	public void testDownload() throws Exception {
		//(1)該方式不依賴於本地hadoop環境,直接通過IO流
		//FSDataInputStream fsDataInputStream=fs.open(new Path("/spring/Spring MVC.docx"));
        //FileOutputStream fileOutputStream=new FileOutputStream("e:/Spring MVC.docx");
        //IOUtils.copy(fsDataInputStream, fileOutputStream);
		
        // (2)fs.copyToLocalFile(new Path("/Spring MVC1111.docx"), new Path("e:/"));該方式依賴於本地hadoop環境,可以通過如下方式:引數1表示是否刪除hdfs上的原始檔,引數4表示是否使用java原生API
		fs.copyToLocalFile(false,new Path("/Spring MVC1111.docx"), new Path("e:/"),true);
		fs.close();
	}
	
	
	
	// 建立HDFS目錄,根目錄為/
	@Test
	public void makdirTest() throws Exception {
		boolean mkdirs = fs.mkdirs(new Path("/spring"));
		System.out.println(mkdirs);
	}
	
	@Test
	public void createTest() throws Exception{
		
	}
	
	// 刪除目錄或檔案
	@Test
	public void deleteTest() throws Exception{
		// fs.exists():判斷是否存在,可能不存在
		boolean file_exists = fs.exists(new Path("/spring"));  // true
		// fs.isFile():判斷是否檔案
		boolean file = fs.isFile(new Path("/Spring MVC.docx"));  // true
		// fs.isDirectory():判斷一個物件是否是資料夾
		boolean directory1 = fs.isDirectory(new Path("/spring"));   // true
		boolean directory2 = fs.isDirectory(new Path("/Spring MVC.docx"));  // false
		
		System.out.println(file_exists);
		System.out.println(file);
		System.out.println(directory1);
		System.out.println(directory2);
		if (file_exists) {
			boolean delete = fs.delete(new Path("/spring"), true);//true, 遞迴刪除
			System.out.println(delete);
		}
	}
	
	// 遞迴獲得所有的檔案
	@Test
	public void listTest() throws Exception{
		
		FileStatus[] listStatus = fs.listStatus(new Path("/"));
		for (FileStatus fileStatus : listStatus) {
			System.err.println(fileStatus.getPath()+"================="+fileStatus.toString());
		}
		
		//會遞迴找到所有的檔案
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
		while(listFiles.hasNext()){
			LocatedFileStatus fileStatus = listFiles.next();
			System.out.println("blocksize:" + fileStatus.getBlockSize());
			System.out.println("owner:" + fileStatus.getOwner());
			System.out.println("Replication:" + fileStatus.getReplication());
			System.out.println("Permission:" + fileStatus.getPermission());
			System.out.println("Path:" + fileStatus.getPath());
			System.out.println("FileName:" + fileStatus.getPath().getName());
			System.out.println("File Len:" + fileStatus.getLen());
			BlockLocation[] blockLocations = fileStatus.getBlockLocations();
			for (BlockLocation blockLocation : blockLocations) {
				System.out.println("塊起始偏移量:" + blockLocation.getOffset());
				System.out.println("塊長度:" + blockLocation.getLength());
				String[] hosts = blockLocation.getHosts();
				for (String datanode : hosts) {
					// 這裡需要注意,塊副本列印的結果是3臺機器,而我們hadoop配置的只有dfs.replication為2個副本
					// 那麼為什麼會出現3個副本啦?
					// 這裡是因為client的Configuration和hadoop的hdfs-site.xml配置是獨立且分開的,當前是client則配置依賴於Configuration,此處Configuration並沒有指明dfs.replication的配置,則預設是3
					// 如果要改變該配置,則需要在檔案上傳時指明conf.set("dfs.replication", "2");
					System.out.println("塊副本位置:" + datanode);
				}
			}
		}
		
	}
	
	
	// 顯示hdfs上檔案的內容
	@Test
	public void testCat() throws Exception{
		FSDataInputStream in = fs.open(new Path("/Spring MVC.docx"));
		IOUtils.copy(in, System.out);
	}
	
}