1. 程式人生 > >用Hadoop AVRO進行大量小檔案的處理

用Hadoop AVRO進行大量小檔案的處理

使用 使用使用 使用 HDFS 儲存大量小檔案的缺點:
1.Hadoop NameNode 在記憶體中儲存所有檔案的“元資訊”資料。據統計,每一個檔案需要消耗 NameNode600 位元組記憶體。如果需要儲存大量的小檔案會對NameNode 造成極大的壓力。
2.如果採用 Hadoop MapReduce 進行小檔案的處理,那麼 Mapper 的個數就會跟小檔案的個數成線性相關(備註:FileInputFormat 預設只對大於 HDFS Block Size的檔案進行劃分)。如果小檔案特別多,MapReduce 就會在消耗大量的時間進行Map 程序的建立和銷燬。
為了解決大量小檔案帶來的問題,我們可以將很多小檔案打包,組裝成一個大檔案。 Apache Avro 是語言獨立的資料序列化系統。 Avro 在概念上分為兩部分:模式(Schema)和資料(一般為二進位制資料)。Schema 一般採用 Json 格式進行描述。Avro 同時定義了一些自己的資料型別如表所示:

Avro基礎資料型別

型別

描述

模式

null 

The absence of a value

 "null"

boolean

A binary value

"boolean"

int

32位帶符號整數

"int"

long

64位帶符號整數

"long"

float

32位單精度浮點數

"float"

double

64位雙精度浮點數

"double"

bytes

byte陣列

"bytes"

string

Unicode字串

"string"

型別

描述

模式

array

An ordered collection of objects. All objects in a particular array must have the same schema.

{

"type": "array",

"items": "long"

}

map

An unordered collection of key-value pairs. Keys must be strings and values may be any type, although within a particular map, all values must have the same schema.

{

"type": "map",

"values": "string"

}

record

A collection of named fields of any type.

{

"type": "record",

"name": "WeatherRecord",

"doc": "A weather reading.",

"fields": [

{"name": "year", "type": "int"},

{"name": "temperature", "type": "int"},

{"name": "stationId", "type": "string"}

]

}

enum

A set of named values.

{

"type": "enum",

"name": "Cutlery",

"doc": "An eating utensil.",

"symbols": ["KNIFE", "FORK", "SPOON"]

}

fixed

A fixed number of 8-bit unsigned bytes.

{

"type": "fixed",

"name": "Md5Hash",

"size": 16

}

union

A union of schemas. A union is represented by a JSON

array, where each element in the array is a schema. Data represented by a union must match one of the schemas in the union.

[

"null",

"string",

{"type": "map", "values": "string"}

]

Avro複雜資料型別


 通過上圖所示,通過程式可以將本地的小檔案進行打包,組裝成一個大檔案在HDFS中進行儲存,本地的小檔案成為Avro的記錄。具體的程式如下面的程式碼所示:

  1. publicclass Demo {  
  2.     publicstaticfinal String FIELD_CONTENTS = "contents";  
  3.     publicstaticfinal String FIELD_FILENAME = "filename";  
  4.     publicstaticfinal String SCHEMA_JSON = "{\"type\": \"record\",\"name\": \"SmallFilesTest\", "
  5.             + "\"fields\": ["
  6.             + "{\"name\":\""
  7.             + FIELD_FILENAME  
  8.             + "\",\"type\":\"string\"},"
  9.             + "{\"name\":\""
  10.             + FIELD_CONTENTS  
  11.             + "\", \"type\":\"bytes\"}]}";  
  12.     publicstaticfinal Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);  
  13.     publicstaticvoid writeToAvro(File srcPath, OutputStream outputStream) throws IOException {  
  14.         DataFileWriter<Object> writer = new  DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100);  
  15.         writer.setCodec(CodecFactory.snappyCodec());  
  16.         writer.create(SCHEMA, outputStream);  
  17.         for (Object obj : FileUtils.listFiles(srcPath, nullfalse)){  
  18.             File file = (File) obj;  
  19.             String filename = file.getAbsolutePath();  
  20.             byte content[] = FileUtils.readFileToByteArray(file);  
  21.             GenericRecord record = new GenericData.Record(SCHEMA);  
  22.             record.put(FIELD_FILENAME, filename);  
  23.             record.put(FIELD_CONTENTS, ByteBuffer.wrap(content));  
  24.             writer.append(record);  
  25.             System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content));  
  26.         }  
  27.         IOUtils.cleanup(null, writer);  
  28.         IOUtils.cleanup(null, outputStream);  
  29.     }  
  30.     publicstaticvoid main(String args[]) throws Exception {  
  31.         Configuration config = new Configuration();  
  32.         FileSystem hdfs = FileSystem.get(config);  
  33.         File sourceDir = new File(args[0]);  
  34.         Path destFile = new Path(args[1]);  
  35.         OutputStream os = hdfs.create(destFile);  
  36.         writeToAvro(sourceDir, os);  
  37.     }  
  38. }  

  1. publicclass Demo {  
  2.     privatestaticfinal String FIELD_FILENAME = "filename";  
  3.     privatestaticfinal String FIELD_CONTENTS = "contents";  
  4.     publicstaticvoid readFromAvro(InputStream is) throws  IOException {  
  5.         DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>());  
  6.         for (Object o : reader) {  
  7.             GenericRecord r = (GenericRecord) o;  
  8.             System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array()));  
  9.         }  
  10.         IOUtils.cleanup(null, is);  
  11.         IOUtils.cleanup(null, reader);  
  12.     }  
  13.     publicstaticvoid main(String... args) throws Exception {  
  14.         Configuration config = new Configuration();  
  15.         FileSystem hdfs = FileSystem.get(config);  
  16.         Path destFile = new Path(args[0]);  
  17.         InputStream is = hdfs.open(destFile);  
  18.         readFromAvro(is);  
  19.     }  
  20. }