1. 程式人生 > >HDFS簡單程式設計例項:檔案合併

HDFS簡單程式設計例項:檔案合併

 下圖顯示了HDFS檔案系統中路徑為“localhost:50070/explorer.html#/user/hadoop”的目錄中所有的檔案資訊:

對於該目錄下的所有檔案,我們將執行以下操作:

首先,從該目錄中過濾出所有後綴名不為".abc"的檔案。

然後,對過濾之後的檔案進行讀取。

最後,將這些檔案的內容合併到檔案“hdfs://localhost:9000/user/hadoop/merge.txt”中。

 

程式碼如下:

 1 package mergeFile;
 2 
 3 import java.io.IOException;
 4 import java.io.PrintStream;
5 import java.net.URI; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FSDataInputStream; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 import org.apache.hadoop.fs.FileStatus; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path;
13 import org.apache.hadoop.fs.PathFilter; 14 15 16 class myPathFilter implements PathFilter{ //過濾掉檔名滿足特定條件的檔案 17 String reg = null; 18 myPathFilter(String reg){ 19 this.reg = reg; 20 } 21 public boolean accept(Path path) { 22 if(!(path.toString().matches(reg)))
23 return true; 24 return false; 25 } 26 27 } 28 29 public class merge { 30 Path inputPath = null; //待合併的檔案所在的目錄的路徑 31 Path outputPath = null; //輸出檔案的路徑 32 public merge(String input, String output){ 33 this.inputPath = new Path(input); 34 this.outputPath = new Path(output); 35 } 36 public void doMerge() throws IOException{ 37 Configuration conf = new Configuration(); 38 conf.set("fs.defaultFS","hdfs://localhost:9000" ); 39 conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); 40 41 FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf); 42 FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf); 43 44 FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new myPathFilter(".*\\.abc")); //過濾掉目錄中字尾為.abc的檔案 45 FSDataOutputStream fsdos = fsDst.create(outputPath); 46 47 //下面分別讀取過濾之後的每個檔案的內容,並輸出到同一個檔案中 48 for(FileStatus sta:sourceStatus){ 49 System.out.println("路徑: " + sta.getPath() + " 檔案大小: " + sta.getLen() + " 許可權: " + sta.getPermission() + " 內容: "); 50 FSDataInputStream fsdis = fsSource.open(sta.getPath()); 51 byte[] data = new byte[1024]; 52 int read = -1; 53 PrintStream ps = new PrintStream(System.out); 54 while((read = fsdis.read(data)) > 0){ 55 ps.write(data, 0, read); 56 fsdos.write(data, 0, read); 57 } 58 } 59 fsdos.close(); 60 } 61 public static void main(String args[]) throws IOException{ 62 merge merge = new merge("hdfs://localhost:9000/user/hadoop/", "hdfs://localhost:9000/user/hadoop/merge.txt"); 63 merge.doMerge(); 64 } 65 }

執行結果: