1. 程式人生 > >hadoop SequenceFile——大資料 儲存

hadoop SequenceFile——大資料 儲存

SequenceFile是一個由二進位制序列化過的key/value的位元組流組成的文字儲存檔案。

基於壓縮型別CompressType,共有三種SequenceFile Writer:

public static enum CompressionType {
    /** 不壓縮 */
    NONE, 
    /** 只壓縮value */
    RECORD,
    /** 壓縮很多記錄的key/value成一塊 */
    BLOCK
  }

There are three SequenceFile Writers based on the CompressType used to compress key/value pairs:

1、Writer : Uncompressed records.

在SequenceFile裡有Writer的實現,原始碼如下:

 public static class Writer implements java.io.Closeable, Syncable

  大資料學習去群119599574

 /** Write and flush the file header. */
    private void writeFileHeader() 
      throws IOException {
      out.write(VERSION);
      Text.writeString(out, keyClass.getName());
      Text.writeString(out, valClass.getName());
      
      out.writeBoolean(this.isCompressed());
      out.writeBoolean(this.isBlockCompressed());
      
      if (this.isCompressed()) {
        Text.writeString(out, (codec.getClass()).getName());
      }
      this.metadata.write(out);
      out.write(sync);                       // write the sync bytes
      out.flush();                           // flush header
    }

首先SequenceFile檔案有個表頭,以上是寫表頭的程式碼,我們以一個例項結合程式碼來看一下表頭的組成。

package demo;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;

public class SequenceFileWriteDemo {
	private static final String[] DATA = { "One, two, buckle my shoe",
			"Three, four, shut the door", "Five, six, pick up sticks",
			"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

	public static void main(String[] args) throws IOException {
		String uri = args[0];
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://xxx.xxx.xxx.xx:9000");
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path path = new Path(uri);
		IntWritable key = new IntWritable();
		Text value = new Text();
		SequenceFile.Writer writer = null;
		try {
			String compressType = args[1];
			System.out.println("compressType "+compressType);
			
				//	Writer : Uncompressed records. 
			if(compressType.equals("1") ){
				System.out.println("compress none");
				writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.NONE);
			}else if(compressType .equals("2") ){
				System.out.println("compress record");
				//RecordCompressWriter : Record-compressed files, only compress values. 
				writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.RECORD);	
			}else if(compressType.equals("3") ){
				System.out.println("compress block");
				//	BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable. 
				writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.BLOCK);	
			}
			
			for (int i = 0; i < 100; i++) {
				key.set(100 - i);
				value.set(DATA[i % DATA.length]);
				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,value);
				writer.append(key, value);
				
			}
		} finally {
			IOUtils.closeStream(writer);
		}
	}
}

存入的文字檔案內容如下:

key     value

100     One, two, buckle my shoe

99      Three, four, shut the door

98      Five, six, pick up sticks

97      Seven, eight, lay them straight

96      Nine, ten, a big fat hen

95      One, two, buckle my shoe

94      Three, four, shut the door

93      Five, six, pick up sticks

92      Seven, eight, lay them straight

91      Nine, ten, a big fat hen

90      One, two, buckle my shoe

89      Three, four, shut the door

88      Five, six, pick up sticks

87      Seven, eight, lay them straight

86      Nine, ten, a big fat hen

85      One, two, buckle my shoe

84      Three, four, shut the door

83      Five, six, pick up sticks

82      Seven, eight, lay them straight

81      Nine, ten, a big fat hen

80      One, two, buckle my shoe

79      Three, four, shut the door

78      Five, six, pick up sticks

77      Seven, eight, lay them straight

76      Nine, ten, a big fat hen

75      One, two, buckle my shoe

74      Three, four, shut the door

73      Five, six, pick up sticks

72      Seven, eight, lay them straight

71      Nine, ten, a big fat hen

70      One, two, buckle my shoe

69      Three, four, shut the door

68      Five, six, pick up sticks

67      Seven, eight, lay them straight

66      Nine, ten, a big fat hen

65      One, two, buckle my shoe

64      Three, four, shut the door

63      Five, six, pick up sticks

62      Seven, eight, lay them straight

61      Nine, ten, a big fat hen

60      One, two, buckle my shoe

59      Three, four, shut the door

58      Five, six, pick up sticks

57      Seven, eight, lay them straight

56      Nine, ten, a big fat hen

55      One, two, buckle my shoe

54      Three, four, shut the door

53      Five, six, pick up sticks

52      Seven, eight, lay them straight

51      Nine, ten, a big fat hen

50      One, two, buckle my shoe

49      Three, four, shut the door

48      Five, six, pick up sticks

47      Seven, eight, lay them straight

46      Nine, ten, a big fat hen

45      One, two, buckle my shoe

44      Three, four, shut the door

43      Five, six, pick up sticks

42      Seven, eight, lay them straight

41      Nine, ten, a big fat hen

40      One, two, buckle my shoe

39      Three, four, shut the door

38      Five, six, pick up sticks

37      Seven, eight, lay them straight

36      Nine, ten, a big fat hen

35      One, two, buckle my shoe

34      Three, four, shut the door

33      Five, six, pick up sticks

32      Seven, eight, lay them straight

31      Nine, ten, a big fat hen

30      One, two, buckle my shoe

29      Three, four, shut the door

28      Five, six, pick up sticks

27      Seven, eight, lay them straight

26      Nine, ten, a big fat hen

25      One, two, buckle my shoe

24      Three, four, shut the door

23      Five, six, pick up sticks

22      Seven, eight, lay them straight

21      Nine, ten, a big fat hen

20      One, two, buckle my shoe

19      Three, four, shut the door

18      Five, six, pick up sticks

17      Seven, eight, lay them straight

16      Nine, ten, a big fat hen

15      One, two, buckle my shoe

14      Three, four, shut the door

13      Five, six, pick up sticks

12      Seven, eight, lay them straight

11      Nine, ten, a big fat hen

10      One, two, buckle my shoe

9       Three, four, shut the door

8       Five, six, pick up sticks

7       Seven, eight, lay them straight

6       Nine, ten, a big fat hen

5       One, two, buckle my shoe

4       Three, four, shut the door

3       Five, six, pick up sticks

2       Seven, eight, lay them straight

1       Nine, ten, a big fat hen

把以上java 打成jar,執行hadoop jar sfile.jar  /test/numbers.seq  1  1代表不壓縮

hadoop fs -get hdfs:///test/numbers1.seq   /usr/test/  取出剛剛生成的檔案,我們用UE開啟看一下

0x53 0x45 0x51

這是SequenceFile Format的magic header「SEQ」,用來區別文字是否是「SequenceFile Format」。

0x06 版本編號,目前最新版為「SEQ6」

以上程式碼原始碼如下:

 private static byte[] VERSION = new byte[] {
    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
  };
out.write(VERSION);

0x20 0x65........0x65

這部份是keyClassName(Key的類名),而第1個Byte(0x20)用來表示此字串的長度,此範例為org.apache.hadoop.io.IntWritable

以上程式碼原始碼如下:

 Text.writeString(out, keyClass.getName());

0x19 0x6F  ..... 0x74 這部份是valueClassName(Value的類名),第1個Byte(0x19)也是用來表示此字串的長度,此範例為org.apache.hadoop.io.BytesWritable

 Text.writeString(out, valClass.getName());

大資料學習去群119599574

0x00 是否compression?「0x00」=否 (此為Boolean所以佔1個Byte)0x00 是否blockCompression?「0x00」=否(此為Boolean所以佔1個Byte)

   out.writeBoolean(this.isCompressed());
    out.writeBoolean(this.isBlockCompressed());

如果是壓縮的話接下去還會寫壓縮類,此範例沒有壓縮所以沒有此類名的寫入,原始碼如下:

 if (this.isCompressed()) {
        Text.writeString(out, (codec.getClass()).getName());
      }

0x00 0x00 0x00 0x00 metadata,此範例沒有包含任何metadata, 所以輸出「0x00 0x00 0x00 0x00」

  this.metadata.write(out);

0x76 0x61  ..... 0xAF sync標記,用來表示一個「Header」的結束。

byte[] sync;                          // 16 random bytes
    {
      try {                                       
        MessageDigest digester = MessageDigest.getInstance("MD5");
        long time = Time.now();
        digester.update((new UID()+"@"+time).getBytes());
        sync = digester.digest();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
    
out.write(sync);

至此頭部檔案寫入完畢,可見頭部的格式可以歸納如下:

SequenceFile Header

  • version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)

  • keyClassName -key class

  • valueClassName - value class

  • compression - A boolean which specifies if compression is turned on for keys/values in this file.

  • blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.

  • compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).

  • metadata -  for this file.

  • sync - A sync marker to denote end of the header. 

接下去我們看一下資料儲存格式:

0x00 0x00 0x00 0x1D

整個Record的size ,一個Record包含「Key、Value」。此處為29個位元組,因為 key=100 佔4個位元組,value=One, two, buckle my shoe 佔24位元組  還有一個位元組存了value的長度 所以 29位元組總共。

0x00 0x00 0x00 0x04

Key內容的size~ (此為Int所以佔4個位元組)

0x00 0x00 0x00 0x64

Key的內容,此處為100,那十六進位制就是64

大資料學習去群119599574

0x18

value內容的size,此處是One, two, buckle my shoe,長度24 所以十六進位制就是18

0x4F 0X6E....0x65

value的內容One, two, buckle my shoe

以上程式碼原始碼如下:

 public synchronized void append(Object key, Object val){
 .......
 // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length
      out.writeInt(keyLength);                            // key portion length
      out.write(buffer.getData(), 0, buffer.getLength()); // data
 }

 當資料達到一個閥值,會寫sync,寫sync就是呼叫checkAndWriteSync();   原始碼如下:

 synchronized void checkAndWriteSync() throws IOException {
      if (sync != null &&
          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
        sync();
      }
    }

SYNC_INTERVAL定義如下:

  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash

  /** The number of bytes between sync points.*/
  public static final int SYNC_INTERVAL = 100*SYNC_SIZE;

可見每2000byte會寫一個sync

SequeceFile 無壓縮圖示如下:

可見格式如下:

Uncompressed SequenceFile Format

  • Record

    • Record length

    • Key length

    • Key

    • Value 

接下去我們看一下RecordCompressWriter

2、RecordCompressWriter : Record-compressed files, only compress values.

執行hadoop jar sfile.jar  /test/numbers2.seq  2  2代表使用RecordCompressWriter壓縮

同樣我們用UE看一下生成的檔案:

可見表頭和不壓縮的基本一致,有些小區別如下:

0x01

代表使用了壓縮

0x2A 0x6F ....0x63

使用的壓縮類org.apache.hadoop.io.compress.DefaultCodec

0x00 ...0x25

整個Record的size ,為37,為啥比不壓縮佔用的位元組更多?

0x00 0x00 0x00 0x04

Key內容的size~ (此為Int所以佔4個位元組)

0x00 0x00 0x00 0x64

Key的內容,此處為100,那十六進位制就是64 可見key沒有壓縮

以上原始碼如下:

 @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);

      buffer.reset();

      // Append the 'key'
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Compress 'value' and append it
      deflateFilter.resetState();
      compressedValSerializer.serialize(val);
      deflateOut.flush();
      deflateFilter.finish();

      // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length
      out.writeInt(keyLength);                            // key portion length
      out.write(buffer.getData(), 0, buffer.getLength()); // data
    }

SequeceFile 壓縮value圖示如下:

可見格式如下:

Record-Compressed SequenceFile Format

  • Record

    • Record length

    • Key length

    • Key

    • Compressed Value 

接下去我們看一下BlockCompressWriter

3、BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable. 

執行hadoop jar sfile.jar  /test/numbers3.seq  3  3代表使用BlockCompressWriter壓縮

同樣我們用UE看一下生成的檔案:

可見表頭和不壓縮的基本一致,有些小區別如下:

0x01

代表使用了壓縮

0x01

代表使用了block壓縮

對於block壓縮的原始碼如下:

 public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key+" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      // Save key/value into respective buffers 
      int oldKeyLength = keyBuffer.getLength();
      keySerializer.serialize(key);
      int keyLength = keyBuffer.getLength() - oldKeyLength;
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);
      WritableUtils.writeVInt(keyLenBuffer, keyLength);

      int oldValLength = valBuffer.getLength();
      uncompressedValSerializer.serialize(val);
      int valLength = valBuffer.getLength() - oldValLength;
      WritableUtils.writeVInt(valLenBuffer, valLength);
      
      // Added another key/value pair
      ++noBufferedRecords;
      
      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
      if (currentBlockSize >= compressionBlockSize) {
        sync();
      }
    }

 其中,可見沒呼叫一次append,就會相應累加keyLenBuffer和valLenBuffer的長度

 WritableUtils.writeVInt(keyLenBuffer, keyLength);

 WritableUtils.writeVInt(valLenBuffer, valLength);

 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();

      if (currentBlockSize >= compressionBlockSize) {

        sync();

      }

compressionBlockSize =  conf.getInt("io.seqfile.compress.blocksize", 1000000);

當超過compressionBlockSize是會呼叫sync,我們來看看sync的原始碼,如下:

  public synchronized void sync() throws IOException {
      if (noBufferedRecords > 0) {
        super.sync();
        
        // No. of records
        WritableUtils.writeVInt(out, noBufferedRecords);
        
        // Write 'keys' and lengths
        writeBuffer(keyLenBuffer);
        writeBuffer(keyBuffer);
        
        // Write 'values' and lengths
        writeBuffer(valLenBuffer);
        writeBuffer(valBuffer);
        
        // Flush the file-stream
        out.flush();
        
        // Reset internal states
        keyLenBuffer.reset();
        keyBuffer.reset();
        valLenBuffer.reset();
        valBuffer.reset();
        noBufferedRecords = 0;
      }

Sequenc File ,塊壓縮圖示如下:

可見格式如下:

Block-Compressed SequenceFile Format

  • Record Block

    • Uncompressed number of records in the block

    • Compressed key-lengths block-size

    • Compressed key-lengths block

    • Compressed keys block-size

    • Compressed keys block

    • Compressed value-lengths block-size

    • Compressed value-lengths block

    • Compressed values block-size

    • Compressed values block

  • A sync-marker every block.