1. 程式人生 > >處理海量小檔案——本地檔案讀成sequenceFile檔案

處理海量小檔案——本地檔案讀成sequenceFile檔案

當處理海量小檔案時,先將小檔案進行sequenceFile操作或者類似操作處理,然後再上傳到HDFS系統進行下一步的處理。(如有其它建議,歡迎留言)

一、直接上傳本地柵格資料將導致的問題
HDFS在儲存檔案時,會將檔案break them into chunks,預設inputSplit的大小與block塊的大小一致,為128M,如果單個檔案的大小小於block塊的大小則不會切分,直接將改小檔案儲存到一個block塊中。因此如果不對柵格資料檔案做處理,將導致佔據大量的block塊;由於namenode中會儲存為元資料資訊,因此也將導致namenode節點記錄大量小檔案的位置等元資料資訊而產生壓力過大,甚至namenode節點的記憶體可能會被佔滿。
二、在本地將柵格資料處理成sequenceFile檔案
SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案。通常對小檔案的處理是使用sequenceFile檔案或MapFile檔案。此次選用的處理方式是使用SequenceFile檔案。(MapFile檔案由兩部分組成,data和index,index作為檔案的索引,儲存每個Record的key值以及它的偏移量。mapFile檔案的檢索效率較sequenceFile檔案高,但是訪問mapFile檔案時需要先將索引檔案載入到記憶體)
由於sequenceFile檔案由Key和value組成,此處的key值存放的為檔案的路徑,例如:file:/home/greatmap/World/6/109/48.jpeg;value的值存放的為圖片的位元組陣列檔案。柵格資料存放在的資料夾下,通過遍歷資料夾,將所有的圖片都寫成一個sequenceFile檔案。

下面是具體實現過程:


public class SequenceFileTest {  
   
   
 //本地linux磁碟輸出路徑  
 static String PATH = "/home/greatmap/out";  
 static SequenceFile.Writer writer = null;  
   
 public static void main(String[] args) throws Exception{  
  
//設定讀取本地磁碟檔案  
  Configuration conf = new Configuration();  
  conf.set("fs.default.name", "file:///");  
  conf.set("mapred.job.tracker", "local");  
  
   
  
//linux磁碟下路徑  
  String path = "/home/greatmap/World/";  
  URI uri = new URI(path);  
  FileSystem fileSystem = FileSystem.get(uri, conf);  
  //例項化writer物件  
  writer = SequenceFile.createWriter(fileSystem, conf, new Path(PATH), Text.class, BytesWritable.class);  
    
  //遞迴遍歷資料夾,並將檔案下的檔案寫入sequenceFile檔案  
  listFileAndWriteToSequenceFile(fileSystem,path);  
    
  //關閉流  
  org.apache.hadoop.io.IOUtils.closeStream(writer);  
  
}  
  
 



  
  
/**** 
  * 遞迴檔案;並將檔案寫成SequenceFile檔案 
  * @param fileSystem 
  * @param path 
  * @throws Exception 
  */  
 public static void listFileAndWriteToSequenceFile(FileSystem fileSystem,String path) throws Exception{  
  final FileStatus[] listStatuses = fileSystem.listStatus(new Path(path));  
  for (FileStatus fileStatus : listStatuses) {  
   if(fileStatus.isFile()){  
    Text fileText = new Text(fileStatus.getPath().toString());  
    System.out.println(fileText.toString());  
    //返回一個SequenceFile.Writer例項 需要資料流和path物件 將資料寫入了path物件  
    FSDataInputStream in = fileSystem.open(new Path(fileText.toString()));  
    byte[] buffer = IOUtils.toByteArray(in);  
    in.read(buffer);  
    BytesWritable value = new BytesWritable(buffer);  
      
    //寫成SequenceFile檔案  
    writer.append(fileText, value);  
      
   }  
   if(fileStatus.isDirectory()){  
    listFileAndWriteToSequenceFile(fileSystem,fileStatus.getPath().toString());  
   }  
//   org.apache.hadoop.io.IOUtils.closeStream(writer);  
     
  }  
 }}