1. 程式人生 > >通過API訪問HDFS

通過API訪問HDFS

一、通過 java.net.URL

1.在ubuntu下開啟eclipse

2.建立專案

3.匯入hadoop所有jar包

 Build Path --->Configure  Build Path ---> Add External JARs --->FileSystem --->mnt ---> hgfs --->share for linux --->hadoop2.9.0--->-lib

4.編寫程式碼

package hadoopDemo;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

public class TestFileSystem {
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
		}
	public static void main(String[] args) throws Exception {
		
		String urlString = "hdfs://ubuntucp:8020/test/a.txt";
		URL url = new URL(urlString) ;
		InputStream is = url.openStream();
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		byte[] buf = new byte[1024];
		int len = 0 ;
		while((len = is.read(buf)) != -1){
			baos.write(buf, 0, len);
		}
		byte[] data = baos.toByteArray();
		System.out.println(new String(data));


	}

}

其中

static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        }

是為了讓Java程式能識別Hadoop的 hdfs URL 方案所做的額外工作。

還可以呼叫Hadoop中IOUtils 類

package hadoopDemo;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URL;


import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class TestFileSystem {
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
		}
	public static void main(String[] args) throws Exception {
		
		String urlString = "hdfs://ubuntucp:8020/test/a.txt";
		URL url = new URL(urlString) ;
		InputStream is = url.openStream();
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
//		byte[] buf = new byte[1024];
//		int len = 0 ;
//		while((len = is.read(buf)) != -1){
//			baos.write(buf, 0, len);
//		}
//		byte[] data = baos.toByteArray();
//		System.out.println(new String(data));
//		is.close();
		IOUtils.copyBytes(is, baos, 1024);
		IOUtils.closeStream(is) ;
		System.out.println(new String(baos.toByteArray()));


	}

}

5.新增log4j的屬性檔案

將其粘到專案的src下

二、通過FileSystem API 讀取資料

1.首先構建單元測試

在已有工程下新建名字為test的 Source Folder  用於存放測試類原始碼(和src並列),並要求包名也相同,然後新建測試類

package hadoopDemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Test;

public class TestFileSystemAPI {
	@Test
	public void read() throws Exception{
		Configuration conf = new Configuration() ;
		FileSystem fs = FileSystem.get(conf) ;
	}

}

對測試類除錯,發現變數 fs 的值為 LocalFileSystem ,這是因為用的是 /mnt/hgfs/share for linux/hadoop-2.9.0/_lib/hadoop-common-2.9.0.jar 中 core-default.xml 的預設配置。

2.新建名稱為 core-site.xml 的 File 

為了便於管理可以在工程下新建 Source Folder ,把 core-site.xml 放入,如下:

core-site.xml 內容如下

<?xml version="1.0"?>
<configuration>
	<property>
		<name>fs.defaultFS</name>
		<value>hdfs://ubuntucp:8020/</value>
	</property>
</configuration>

再次除錯測試類,發現變數 fs 的值為 DistributedFileSystem  ,這樣就可以訪問了

但這樣必須要求配置檔名稱為 core-site.xml ,如果用其他名稱,需要在程式中新增指定配置檔案:

package hadoopDemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	@Test
	public void read() throws Exception{
		Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置 
		//新增指定配置檔案
		conf.addResource("my-core-site.xml") ;
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/a.txt" ;
		Path path = new Path(file) ;
		FSDataInputStream in = fs.open(path) ;
		IOUtils.copyBytes(in, System.out, 1024, true) ;//1024是緩衝區的長度而不是流的長度,true為關閉流
		
	}

}

直接使用 FileSystem 以標準輸出格式顯示 Hadoop 檔案系統中的檔案,如下:

package hadoopDemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	@Test
	public void read() throws Exception{
		Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/a.txt" ;
		Path path = new Path(file) ;
		FSDataInputStream in = fs.open(path) ;
		IOUtils.copyBytes(in, System.out, 1024, true) ;//1024是緩衝區的長度而不是流的長度,true為關閉流
	}

}

FSDataInputStream 繼承了 DataInputStream 類和實現了 Seekbale 介面,Seekable 介面支援在檔案中找到指定位置,並提供一個查詢當前位置相對於檔案起始位置偏移量(getPos()) 的查詢方法。與 java.io.InputStream 的 skip() 不同,seek() 可以移到檔案中任意一個絕對位置,skip() 則只能相對於當前位置定位到另一個新位置。

//Seekable介面
public interface Seekable {
    void seek(long pos) throws IOException ;
    long getPos() throws IOException ;
}

通過API實現seek操作: 

package hadoopDemo;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {

	
	/*
	 * 通過API實現seek操作
	 */
	@Test
	public void seek() throws Exception{
		Configuration conf = new Configuration() ;
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/how.txt" ;
		Path path = new Path(file) ;
		FSDataInputStream in = fs.open(path) ;
		IOUtils.copyBytes(in, new FileOutputStream("/home/ubuntu/Downloads/how1.jpg") , 1024, false) ;
		in.seek(0) ;//重新定位到起始位置
		IOUtils.copyBytes(in, new FileOutputStream("/home/ubuntu/Downloads/how2.jpg") , 1024, true) ;
	}

}

FSDataInputStream 類也實現了 PositionedReadable 介面,從一個指定偏移量出讀取檔案的一部分:

//PositionedReadable介面
public interface PositionedReadable {
    public int read(long position, byte[] buffer, int offset, int length)
        throws IOException;
    public void readFully(long position, byte[] buffer, int offset, int length)
        throws IOException;
    public void readFully(long position, byte[] buffer) throws IOException;
}

由上,read() 方法從檔案的指定 position 處讀取至多為 length 位元組的資料並存入緩衝區 buffer 的指定偏移量 offset 處。返回值是實際讀到的位元組數。readFully()  方法將指定 length 長度的位元組數資料讀到 buffer 中。

 3.獲取檔案狀態

將單元測試改為如下:

package hadoopDemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	@Test
	public void read() throws Exception{
		Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/a.txt" ;
		Path path = new Path(file) ;

		FileStatus ft = fs.getFileStatus(path) ;//獲取檔案狀態
		System.out.println("塊大小 " + ft.getBlockSize()); //得到塊大小
		System.out.println("訪問時間 " + ft.getAccessTime()); //得到訪問時間
		System.out.println("組 " + ft.getGroup()); //得到組
		System.out.println("檔案長度bytes " + ft.getLen()); //得到長度
		System.out.println("修改時間 " + ft.getModificationTime()); //得到修改時間
		System.out.println("檔案擁有者 " + ft.getOwner()); //得到檔案擁有者
		System.out.println("檔案複製因子 " + ft.getReplication()); //得到檔案複製
	}

}

列出檔案目錄:

package hadoopDemo;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	
	/*
	 * 列出檔案目錄
	 */
	@Test
	public void listFile() throws Exception{
		Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置 
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/" ;
		Path path = new Path(file) ;
		FileStatus[] fst = fs.listStatus(path) ;
		System.out.println("一種方法");
		for(FileStatus ft : fst){
			System.out.println(ft.getPath() + ": isFile = " + ft.isFile());
		}
		//工具類,直接將FileStatus[]陣列提取資料形成Path[]陣列,可以替代ft.getPath()的for迴圈
		System.out.println("另一種方法");
		Path[] listesPaths = FileUtil.stat2Paths(fst) ;
		for(Path p : listesPaths){
			System.out.println(p);
		}
		
	}
	

}

結果如下

4.獲取塊資訊

將單元測試改為如下:

package hadoopDemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	@Test
	public void read() throws Exception{
		Configuration conf = new Configuration() ;//Configuration物件封裝了客戶端或伺服器的配置 
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/hadoop-2.9.0.tar.gz" ;
		Path path = new Path(file) ;
		
		
		//得到指定路徑下的檔案的狀態
		FileStatus ft = fs.getFileStatus(path) ;//獲取檔案狀態,FileStatus相當於檔案或者目錄
		
		//得到指定檔案狀態的塊位置資訊集合
		BlockLocation[] location = fs.getFileBlockLocations(ft, 0, ft.getLen()) ;//一個檔案被切割成兩塊,則BlockLocation[]就有兩個BlockLocation元素
		for(BlockLocation block : location){
			System.out.println(block.getHosts()) ;//之所以getHosts()是String[],是因為每一個塊的副本在不同的主機上
		}
	}

}

除錯location的值如下: 

可以看到location包含了3塊,每塊中含有hosts(主機名稱),length,offset(偏移量),names(datanode的遠端通訊rpc地址) 等資訊

在storageids中有

和 datanode 中 ~/hadoop/dfs/data/current 的VERSION 中的storageID 相同:

5.通過 API 實現檔案上傳(寫入資料)

package hadoopDemo;

import java.io.FileInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	
	/*
	 * 通過API實現檔案上傳
	 */
	@Test
	public void putFile() throws Exception{
		Configuration conf = new Configuration() ;
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/how.txt" ;
		Path path = new Path(file) ;
		FSDataOutputStream out = fs.create(path) ;//建立檔案系統資料輸出流用來寫入檔案
		IOUtils.copyBytes(new FileInputStream("/home/ubuntu/Downloads/bizhi.jpg"), out, 1024) ;
		
	}

}

使用 append() 方法在一個現有檔案末尾追加資料:

package hadoopDemo;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {
	
	/*
	 * 使用append()方法在一個現有檔案末尾追加資料
	 */
	@Test
	public void append() throws Exception{
		Configuration conf = new Configuration() ;
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/a.txt" ;//檔案已存在
		Path path = new Path(file) ;
		FSDataOutputStream out = fs.append(path) ;
		out.writeChars("I miss you !") ;
		out.close() ;
	}

}

6.檔案副本數和塊大小修改

首先要在叢集中修改最小塊限制:

1).進入 /soft/hadoop/etc/hadoop_cluster ,修改 hdfs-site.xml ,新增如下內容(把塊最小限制改為10K):

        <property>
                <name>dfs.namenode.fs-limits.min-block-size</name>
                <value>10240</value>
        </property>

2).將修改後的 hdfs-site.xml 分發給各主機

3).停掉叢集重新開啟:

start-dfs.sh

然後可以通過API實現副本數以及塊大小的修改:

預設配置請參考 F:\share for linux\hadoop-2.9.0\_conf 中的 hdfs-default.xml 

package hadoopDemo;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {

	
	/*
	 * 修改副本書和塊大小
	 */
	@Test
	public void customReplication() throws Exception{
		Configuration conf = new Configuration() ;
		//set(String,String),修改修改當前會話副本數為4
		conf.set("dfs.replication", "" + 4) ;
		//修改當前檔案塊大小為50K,但hdfs有最小塊限制,所以要先修改最小塊限制(需在叢集中修改)
		conf.set("dfs.blocksize", "" + (1024*50)) ;
		FileSystem fs = FileSystem.get(conf) ;
		String file = "hdfs://ubuntucp:8020/test/modify.txt" ;
		Path path = new Path(file) ;
		FSDataOutputStream out = fs.create(path) ;//建立檔案系統資料輸出流用來寫入檔案
		IOUtils.copyBytes(new FileInputStream("/home/ubuntu/Downloads/bizhi.jpg"), out, 1024) ;
		
	}

}


7.檔案通配及過濾

Hadoop為執行通配提供了兩個FileSystem方法:

public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

       globStatus() 方法返回路徑格式與指定模式匹配的所有 FileStatus 物件組成的陣列。PathFilter 命令作為可選項可以進一步匹配結果進行限制。

萬用字元及其含義:

萬用字元 匹配
 * 代表0到多個字元
?   代表單一字元
[ ab ] 代表字元型別,匹配{a,b}中的一個字元
[ ^ab ] 代表不是{a,b}中的一個字元
[ a-b ] 代表匹配一個a到b之間的字元包括ab,ASCII程式碼在a-b之間的
[ ^a-b] 代表不在a到b之間的字元包括ab
{a,b} 代表匹配a或b的一個語句
\c 代表轉義字元匹配原字元c

例項:

/*   /2007    /2008
/*/*  /2007/12   /2008/01
/200? /2007   /2008
/200[78] /2007  /2008
/200[7-8] /2007  /2008

        萬用字元模式並不能總能夠精確地描述想要訪問的檔案集,比如使用通配格式排除一個特定的檔案就不太可能。FileSystem 中的 listStatus() 和 globStatus() 方法提供了可選的 PathFilter 物件,從而控制萬用字元:

package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
}

範例   PathFilter 用於排除匹配正則表示式的路徑:

首先在HDFS中建立相關目錄:

hdfs dfs -mkdir -p /test/2007/12/30 /test/2007/12/31 /test/2008/01/01 /test/2008/01/02

接著定義排他性路徑過濾的類 RegexExcludePathFilter :

package hadoopDemo.pathFilter;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/*
 * 排他性路徑過濾
 */
public class RegexExcludePathFilter implements PathFilter {//實現了PathFilter介面
	private String regexp ;
	
	public RegexExcludePathFilter(String regexp) {
		this.regexp = regexp;
	}

	public boolean accept(Path path) {
		return !path.toString().matches(regexp);
	}

}

 然後在單元測試中呼叫該類:

package hadoopDemo;

import hadoopDemo.pathFilter.RegexExcludePathFilter;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class TestFileSystemAPI {

	/*
	 * 通過PathFilter應用正則表示式對路徑過濾
	 */
	@Test
	public void pathFilter() throws Exception{
		Configuration conf = new Configuration() ;
		FileSystem fs = FileSystem.get(conf) ;
		FileStatus[] ft = fs.globStatus(new Path("/test/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$")) ;
		//直接將FileStatus[]陣列提取資料形成Path[]陣列,可以替代ft.getPath()的for迴圈
		Path[] path = FileUtil.stat2Paths(ft) ;
		for(Path p : path){
			System.out.println(p);
		}
	}
	

}


   結果為:

            

8.刪除資料

使用 FileSystem 的 delete() 方法可以永久性刪除檔案或目錄:

public boolean delete(Path f , boolean recursive) throws IOException

       如果 f 是一個檔案或空目錄,那麼 recursive 的值就會被忽略,直接刪除。只有在 recursive 值為 true 時,非空目錄及其內容才會被刪除。