1. 程式人生 > >Hadoop程式設計學習(四):使用FileSystem類進行檔案讀寫及檢視檔案資訊

Hadoop程式設計學習(四):使用FileSystem類進行檔案讀寫及檢視檔案資訊

http://www.cnblogs.com/beanmoon/archive/2012/12/11/2813235.html

在這一節我們要深入瞭解Hadoop的FileSystem類——這是與與hadoop的檔案系統互動的重要介面。雖然我們只是著重於HDFS的實現,但我們在編碼時一般也要注意程式碼在FileSystem不同子類檔案系統之間的可移植性。這是非常有用的,比如說你可以非常方便的直接用同樣的程式碼在你的本地檔案系統上進行測試。

使用hadoop URL讀資料

  從hadoop檔案系統中讀取檔案的最簡單的方法之一便是使用java.net.URL物件來開啟一個欲從中讀取資料的流(stream)。通常情況下的程式設計風格如下:

複製程式碼
1 InputStream in = null;
2 try {
3     in = new URL("hdfs://host/path").openStream();
4     //     process in
5 } finally {
6     IOUtils.closeStream(in);
7 }
複製程式碼

  想要使java識別出hdfs開頭的URL標示還需要一點其他的工作要做:通過URL的setURLStreamHandlerFactory()方法為java設定一個FSUrlStreamHandlerFactory。這個方法在每個JVM中只能呼叫一次,所以它通常會被放在一個static block中執行(如下所示),但如果你的某部分程式——例如一個你無法修改原始碼的第三方元件——已經呼叫了這個方法,那你就不能通過URL來這樣讀取資料了(下一節我們會介紹另一種方法)。

複製程式碼
 1 public class URLCat {
 2     static {
 3       URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
 4     }
 5     
 6     public static void main(String[] args) throws Exception{
 7       InputStream in = null;
 8       try {
 9           in = new URL(args[0]).openStream();
10           IOUtils.copyBytes(in, System.out, 4096, false
); 11    } finally { 12    // TODO: handle exception 13    IOUtils.closeStream(in); 14    } 15 } 16 }
複製程式碼

  上例中我們使用了Hadoop中的IOUtils類的兩個靜態方法:
  1)IOUtils.copyBytes(),其中in表示拷貝源,System.out表示拷貝目的地(也就是要拷貝到標準輸出中去),4096表示用來拷貝的buffer大小,false表明拷貝完成後我們並不關閉拷貝源可拷貝目的地(因為System.out並不需要關閉,in可以在finally語句中被關閉)。
  2)IOUtils.closeStream(),用來關閉一個流。
  下面是我們的測試例子:

% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

使用FileSystem讀取資料

  就像上節所說的,有時候我們無法通過設定URLStreamHandlerFactory方法的方式來通過URL讀取資料,這時FIleSystem API就派上用場了。
  Hadoop檔案系統中的檔案是用Hadoop的Path物件來表示的(而不是java中的java.io.File物件,因為它的語義太接近於本地檔案系統了)。你可以把一個Path物件看做Hadoop檔案系統中的某一個URL,如上例中的“hdfs://localhost/user/tom/quangle.txt”。
  Filesystem是一個通用的檔案系統API,所以使用它的第一步就是先抽取出它的一個例項出來——在這個例子中是HDFS。下面列出了幾個Filesystem的用於抽取Filesystem例項的幾個靜態方法:

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException

  一個Configuration物件封裝了客戶端或伺服器端的配置資訊,這些配置資訊是通過從conf/core-size.xml之類的配置檔案中讀取出來的名值對來設定的。下面我們一一說明上面的三個方法:
  1)第一個方法返回一個預設的檔案系統(在conf/core-site.xml中通過fs.default.name來指定的,如果在conf/core-site.xml中沒有設定則返回本地檔案系統)。
  2)第二個方法通過uri來指定要返回的檔案系統(例如,如果uri是上個測試例子中的hdfs://localhost/user/tom/quangle.txt,也即以hdfs標識開頭,那麼就返回一個hdfs檔案系統,如果uri中沒有相應的標識則返回本地檔案系統)。
  3)第三個方法返回檔案系統的機理同(2)是相同的,但它同時又限定了該檔案系統的使用者,這在安全方面是很重要的。

  有時候你可能想要使用一個本地檔案系統,你可以使用另一個很方便的方法:
    public static LocalFileSystem getLocal(Configuration conf) throws IOException

  得到一個檔案系統的例項後,我們可以呼叫該例項的open()方法來開啟某個給定檔案的輸入流(第一個方法使用一個預設的4KB的輸入緩衝):

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

  把上面介紹的組合起來我們就得到了下面的程式碼:  

複製程式碼
 1 public class FileSystemCat {
 2     public static void main(String[] args) throws Exception {
 3         String uri = args[0];
 4         Configuration configuration = new Configuration();
 5         FileSystem fs = FileSystem.get(URI.create(uri), configuration);
 6         InputStream in = null;
 7         try{
 8             in = fs.open(new Path(uri));
 9             IOUtils.copyBytes(in, System.out, 4096, false);
10         } finally {
11             IOUtils.closeStream(in);
12         }
13     }
14 }
複製程式碼

FSDataInputStream

  與URL的openStream()方法返回InputStream不同,FileSystem的open()方法返回的是一個FSDataInputStream物件(繼承關係:java.io.InputStream --> java.io.FilterInputStream --> java.io.DataInputStream --> org.apache.hadoop.fs.FSDataInputStream)。由於FSDataInputStream實現了Closeable,DataInput,PositionedReadable,Seekable等介面,你可以從流中的任意一個位置讀取資料。
  Seekable介面的seek()
和getPos()方法允許我們跳轉到流中的某個位置並得到其位置:

public interface Seekable {
  void seek(long pos) throws IOException;
  long getPos() throws IOException;
}

  如果呼叫seek()時指定了一個超過檔案長度的位移值,會丟擲IOException異常。
  與java.io.Inputstream的skip()方法指明一個相對位移值不同,seek()方法使用的是絕對位移值。如下所示的程式碼通過seek()方法讀取了兩次輸入檔案:  

複製程式碼
 1 public class FileSystemDoubleCat {
 2   public static void main(String[] args) throws Exception {
 3     String uri = args[0];
 4     Configuration conf = new Configuration();
 5     FileSystem fs = FileSystem.get(URI.create(uri), conf);
 6     FSDataInputStream in = null;
 7     try {
 8       in = fs.open(new Path(uri));
 9       IOUtils.copyBytes(in, System.out, 4096, false);
10       in.seek(0); // go back to the start of the file
11       IOUtils.copyBytes(in, System.out, 4096, false);
12     } finally {
13       IOUtils.closeStream(in);
14     }
15   }
16 }
複製程式碼

  執行結果如下:

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

  FSDataInputStream也實現了PositionedReadable介面,這允許你從流中的某個給定位置讀取給定長度的內容:

public interface PositionedReadable {
  public int read(long position, byte[] buffer, int offset, int length)
    throws IOException;
  public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException;
  public void readFully(long position, byte[] buffer) throws IOException;
}

  說明:read()方法從檔案的給定position出讀取length個位元組到buffer的offset處。返回值是讀取到的實際位元組數,呼叫者應該檢查這個返回值,因為它可能比length小(可能讀到了檔案末尾,或發生了中斷等等)。

  呼叫所有的這些方法並不會改變檔案的偏移值,所以這些方法是執行緒安全的。也由此提供了一種當訪問某檔案的內容時訪問該檔案的另一部分資料——例如元資料——的很方便的方法。
  最後需要注意的是呼叫seek()方法的代價比較高,應儘量避免使用。你的程式應該基於流式訪問來構建,而不是執行一大堆seek。

寫資料

  FileSystem類有很多方法用來建立一個檔案,最簡單的就是以欲建立檔案的Path物件為引數的create(Path f)方法,該方法返回一個用來寫入資料的輸出流:
    public FSDataOutputStream create(Path f) throws IOException
  該方法還有幾個過載的方法,通過這些過載的方法你可以指定是否覆蓋該檔名已存在的檔案,這個檔案的備份數,用來寫資料的buffer size,該檔案的block大小和檔案許可權等。

create()方法會建立指定的檔名中包含的任何不存在的父目錄,這樣雖然很方便,但不推薦使用(因為如果某個父目錄中存在其他資料,會被覆蓋掉從而導致檔案丟失)。如果你想要當父目錄不存在時該建立操作失敗,你可以在呼叫create()方法之前呼叫exists()方法檢查指明的父目錄是否存在,如果存在則報錯以讓create()失敗

  create()方法還有一個過載方法可以讓你傳遞一個回撥的藉口——progressable,這樣你的程式就會知道你的資料被寫入了多少,也即寫入的進度(progress):

package org.apache.hadoop.util;
public interface Progressable {
  public void progress();
}

  除了建立一個新檔案以寫入資料以外,我們還可以使用append()方法向一個已存在檔案新增資料:
    public FSDataOutputStream append(Path f) throws IOException
  有了這個函式,應用程式就可以向那些不能限制大小的檔案(如logfile,你事先並不知道待記錄日誌會有多少)寫資料了。append操作在Hadoop的fileSystem中是可選的,例如HDFS實現了它,但S3就沒有。

  下面這個例子展示瞭如何從本地檔案系統拷貝一個檔案到HDFS,我們在每64KB大小的資料寫入之後呼叫一次progress()函式,這個函式每被呼叫一次列印一個句點:  

複製程式碼
 1 public class FileCopyWithProgress {
 2     public static void main(String[] args) throws Exception {
 3         String localSrc = args[0];
 4         String dst = args[1];
 5         InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
 6         Configuration conf = new Configuration();
 7         FileSystem fs = FileSystem.get(URI.create(dst), conf);
 8         OutputStream out = fs.create(new Path(dst), new Progressable() {
 9             public void progress() {
10                 System.out.print(".");
11             }
12         });
13         IOUtils.copyBytes(in, out, 4096, true);
14     }
15 }
複製程式碼

   下面是該例子的示範用法:

% hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/
1400-8.txt
...............

  注:現在除了HDFS以外的其他Hadoop支援的檔案系統都不支援progress()方法,但我們應該知道進度資訊(pregress)在MapReduce程式中是非常重要的。

FSDataOutputStream

  FileSystem中的create()方法返回一個FSDataOutputStream,像FSDataInputStream一樣,它也有一個用於查詢位移的方法(但並沒有類似於FSDataInputStream中seek()的方法,因為Hadoop不允許向流中的任意位置寫資料,我們只能在一個檔案的末尾處新增資料):

package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
  public long getPos() throws IOException {
    // implementation elided
  }
  // implementation elided
}

查詢某個檔案系統

檔案元資料:FileStatus

  任何檔案系統的典型功能就是能夠遍歷它的目錄結構從而獲取有關目錄和檔案的資訊。Hadoop中的FileStatus類為檔案和目錄包裝了其元資料(包括檔案長度,block大小,冗餘度,修改時間,檔案所有者和許可權等資訊),其getFileStatus()方法提供了獲取某個給定檔案或目錄的FileStatus物件的途徑,如下所示:

複製程式碼
 1 public class ShowFileStatusTest {
 2     private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing (這個類在最新的Hadoop1.0.4中已經被廢棄了)
 3                                     
 4     private FileSystem fs;
 5 
 6     @Before
 7     public void setUp() throws IOException {
 8         Configuration conf = new Configuration();
 9         if (System.getProperty("test.build.data") == null) {
10             System.setProperty("test.build.data", "/tmp");
11         }
12         cluster = new MiniDFSCluster(conf, 1, true, null);
13         fs = cluster.getFileSystem();
14         OutputStream out = fs.create(new Path("/dir/file"));
15         out.write("content".getBytes("UTF-8"));
16         out.close();
17     }
18 
19     @After
20     public void tearDown() throws IOException {
21         if (fs != null) {
22             fs.close();
23         }
24         if (cluster != null) {
25             cluster.shutdown();
26         }
27     }
28 
29     @Test(expected = FileNotFoundException.class)
30     public void throwsFileNotFoundForNonExistentFile() throws IOException {
31         fs.getFileStatus(new Path("no-such-file"));
32     }
33 
34     @Test
35     public void fileStatusForFile() throws IOException {
36         Path file = new Path("/dir/file");
37         FileStatus stat = fs.getFileStatus(file);
38         assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
39         assertThat(stat.isDir(), is(false));
40         assertThat(stat.getLen(), is(7L));
41         assertThat(stat.getModificationTime(),
42                 is(lessThanOrEqualTo(System.currentTimeMillis())));
43         assertThat(stat.getReplication(), is((short) 1));
44         assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
45         assertThat(stat.getOwner(), is("tom"));
46         assertThat(stat.getGroup(), is("supergroup"));
47         assertThat(stat.getPermission().toString(), is("rw-r--r--"));
48     }
49 
50     @Test
51     public void fileStatusForDirectory() throws IOException {
52         Path dir = new Path("/dir");
53         FileStatus stat = fs.getFileStatus(dir);
54         assertThat(stat.getPath().toUri().getPath(), is("/dir"));
55         assertThat(stat.isDir(), is(true));
56         assertThat(stat.getLen(), is(0L));
57         assertThat(stat.getModificationTime(),
58                 is(lessThanOrEqualTo(System.currentTimeMillis())));
59         assertThat(stat.getReplication(), is((short) 0));
60         assertThat(stat.getBlockSize(), is(0L));
61         assertThat(stat.getOwner(), is("tom"));
62         assertThat(stat.getGroup(), is("supergroup"));
63         assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
64     }
65 }
複製程式碼

Listing files

  除了從某個單一檔案或目錄獲取檔案資訊以外,你可能還需要列出某個目錄中的所有檔案,這就要使用FileSystem的listStatus()方法了:

public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

  當傳入引數是一個檔案時,它獲取此檔案的FileStatus物件,當傳入檔案是目錄時,它返回零個或多個FileStatus物件,分別代表該目錄下所有檔案的對應資訊。
  過載後的函式允許你指定一個PathFilter來進一步限定要匹配的檔案或目錄。
  下面我們使用listStatus()方法獲得引數中指定的檔案(可以有多個)的元資料資訊,存放在一個FIleStatus陣列中,再使用stat2Paths()方法吧FileStatus陣列轉化為Path陣列,最後打印出檔名來:

複製程式碼
 1 public class ListStatus {
 2     public static void main(String[] args) throws Exception {
 3         String uri = args[0];
 4         Configuration conf = new Configuration();
 5         FileSystem fs = FileSystem.get(URI.create(uri), conf);
 6         Path[] paths = new Path[args.length];
 7         for (int i = 0; i < paths.length; i++) {
 8             paths[i] = new Path(args[i]);
 9         }
10         FileStatus[] status = fs.listStatus(paths);
11         Path[] listedPaths = FileUtil.stat2Paths(status);
12         for (Path p : listedPaths) {
13             System.out.println(p);
14         }
15     }
16 }
複製程式碼

   執行結果如下:

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt

檔案模式

  在某個單一操作中處理一些列檔案是很常見的。例如一個日誌處理的MapReduce作業可能要分析一個月的日誌量。如果一個檔案一個檔案或者一個目錄一個目錄的宣告那就太麻煩了,我們可以使用萬用字元(wild card)來匹配多個檔案(這個操作也叫做globbing)。Hadoop提供了兩種方法來處理檔案組:

public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

  globStatus()方法返回匹配檔案模式的多個檔案的FileStatus陣列(以Path排序)。一個可選的PathFilter可以用來進一步限制匹配模式。Hadoop中的匹配符與Unix中bash相同,如下所示:  

  假設某個日誌檔案的組織結構如下:

  則對應於該組織結構有如下表示:

PathFilter

  使用檔案模式有時候並不能有效的描述你想要的一系列檔案,例如如果你想排除某個特定檔案就很難。所以FileSystem的listStatus()和globStatus()方法就提供了一個可選引數:PathFilter——它允許你一些更細化的控制匹配:

package org.apache.hadoop.fs;
public interface PathFilter {
  boolean accept(Path path);
}

  PathFilter的作用就像java.io.FileFilter,只不過前者針對Path物件,而後者針對File物件。下面我們用PathFIlter來排除一個符合給定正則表示式的檔案:  

複製程式碼
 1 public class RegexExcludePathFilter implements PathFilter {
 2     private final String regex;
 3 
 4     public RegexExcludePathFilter(String regex) {
 5         this.regex = regex;
 6     }
 7 
 8     public boolean accept(Path path) {
 9         return !path.toString().matches(regex);
10     }
11 }
複製程式碼

  RegexExcludePathFilter只讓不匹配(具體參見accept方法的實現)給定正則表示式的檔案通過,我們通過檔案模式(file pattern)得到所需的檔案集後,再用RegexExcludePathFilter來過濾掉我們不需要的檔案:
    fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31$"))
  這樣我們就得到:/2007/12/30

  注意:Filter只能根據檔名來過濾檔案,是不能通過檔案的屬性(如修改時間,檔案所有者等)來過濾檔案的。但它仍然提供了檔案模式和正則表示式所不能提供的功能。

刪除資料

  使用FIleSystem的delete()方法可以永久的刪除一個檔案或目錄:
    public boolean delete(Path f, boolean recursive) throws IOException
  如果傳入的Path f是一個檔案或者空目錄,recursive的值會被忽略掉。當recursive值為true時,給定的非空目錄連同其內容會被一併刪除掉。