1. 程式人生 > >[hadoop2.7.1]I/O之SequenceFile最新API程式設計例項(寫入、讀取)

[hadoop2.7.1]I/O之SequenceFile最新API程式設計例項(寫入、讀取)

寫操作

根據上一篇的介紹,在hadoop2.x之後,hadoop中的SequenceFile.Writer將會逐漸摒棄大量的createWriter()過載方法,而整合為更為簡潔的createWriter()方法,除了配置引數外,其他的引數統統使用SequenceFile.Writer.Option來替代,具體有:

新的API裡提供的option引數:

FileOption
FileSystemOption
StreamOption
BufferSizeOption
BlockSizeOption
ReplicationOption
KeyClassOption
ValueClassOption
MetadataOption
ProgressableOption
CompressionOption

這些引數能夠滿足各種不同的需要,引數之間不存在順序關係,這樣減少了程式碼編寫工作量,更為直觀,便於理解,下面先來看看這個方法,後邊將給出一個具體例項。

  • createWriter

    public static org.apache.hadoop.io.SequenceFile.Writer createWriter(Configuration conf,
                                                        org.apache.hadoop.io.SequenceFile.Writer.Option... opts)
                                                                 throws 
    IOException
    Create a new Writer with the given options.
    Parameters:
    conf - the configuration to use
    opts - the options to create the file with
    Returns:
    a new Writer
    Throws:

權威指南第四版中提供了一個SequenceFileWriteDemo例項:

// cc SequenceFileWriteDemo Writing a SequenceFile
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

// vv SequenceFileWriteDemo
public class SequenceFileWriteDemo {
  
  private static final String[] DATA = {
    "One, two, buckle my shoe",
    "Three, four, shut the door",
    "Five, six, pick up sticks",
    "Seven, eight, lay them straight",
    "Nine, ten, a big fat hen"
  };
  
  public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path path = new Path(uri);

    IntWritable key = new IntWritable();
    Text value = new Text();
    SequenceFile.Writer writer = null;
    try {
      writer = SequenceFile.createWriter(fs, conf, path,
          key.getClass(), value.getClass());
      
      for (int i = 0; i < 100; i++) {
        key.set(100 - i);
        value.set(DATA[i % DATA.length]);
        System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
        writer.append(key, value);
      }
    } finally {
      IOUtils.closeStream(writer);
    }
  }
}
// ^^ SequenceFileWriteDemo

對於上面例項中的createWriter()方法用整合之後的最新的方法來改寫一下,程式碼如下:

package org.apache.hadoop.io;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Writer.FileOption;
import org.apache.hadoop.io.SequenceFile.Writer.KeyClassOption;
import org.apache.hadoop.io.SequenceFile.Writer.ValueClassOption;
import org.apache.hadoop.io.Text;

public class THT_testSequenceFile2 {

	private static final String[] DATA = { "One, two, buckle my shoe",
			"Three, four, shut the door", "Five, six, pick up sticks",
			"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

	public static void main(String[] args) throws IOException {
		// String uri = args[0];
		String uri = "file:///D://B.txt";
		Configuration conf = new Configuration();
		Path path = new Path(uri);

		IntWritable key = new IntWritable();
		Text value = new Text();
		SequenceFile.Writer writer = null;
		SequenceFile.Writer.FileOption option1 = (FileOption) Writer.file(path);
		SequenceFile.Writer.KeyClassOption option2 = (KeyClassOption) Writer.keyClass(key.getClass());
		SequenceFile.Writer.ValueClassOption option3 = (ValueClassOption) Writer.valueClass(value.getClass());
		
		try {
			
			writer = SequenceFile.createWriter( conf, option1,option2,option3,Writer.compression(CompressionType.RECORD));
			
			for (int i = 0; i < 10; i++) {
				key.set(1 + i);
				value.set(DATA[i % DATA.length]);
				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
						value);
				writer.append(key, value);
			}
		} finally {
			IOUtils.closeStream(writer);
		}
	}
}

執行結果如下:

2015-11-06 22:15:05,027 INFO  compress.CodecPool (CodecPool.java:getCompressor(153)) - Got brand-new compressor [.deflate]
[128]	1	One, two, buckle my shoe
[173]	2	Three, four, shut the door
[220]	3	Five, six, pick up sticks
[264]	4	Seven, eight, lay them straight
[314]	5	Nine, ten, a big fat hen
[359]	6	One, two, buckle my shoe
[404]	7	Three, four, shut the door
[451]	8	Five, six, pick up sticks
[495]	9	Seven, eight, lay them straight
[545]	10	Nine, ten, a big fat hen

生成的檔案:



讀操作

新的API裡提供的option引數:

FileOption -表示讀哪個檔案
InputStreamOption
StartOption
LengthOption -按照設定的長度變數來決定讀取的位元組
BufferSizeOption
OnlyHeaderOption

根據最新的API直接上原始碼:

package org.apache.hadoop.io;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class THT_testSequenceFile3 {

	public static void main(String[] args) throws IOException {
		//String uri = args[0];
		String uri = "file:///D://B.txt";
		Configuration conf = new Configuration();
		Path path = new Path(uri);
		SequenceFile.Reader.Option option1 = Reader.file(path);
		SequenceFile.Reader.Option option2 = Reader.length(174);//這個引數表示讀取的長度
		
		SequenceFile.Reader reader = null;
		try {
			reader = new SequenceFile.Reader(conf,option1,option2);
			Writable key = (Writable) ReflectionUtils.newInstance(
					reader.getKeyClass(), conf);
			Writable value = (Writable) ReflectionUtils.newInstance(
					reader.getValueClass(), conf);
			long position = reader.getPosition();
			while (reader.next(key, value)) {
				String syncSeen = reader.syncSeen() ? "*" : "";
				System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
						value);
				position = reader.getPosition(); // beginning of next record
			}
		} finally {
			IOUtils.closeStream(reader);
		}
	}
}

我這兒設定了一個讀取長度的引數,只讀到第174個位元組那,所以執行結果如下:

2015-11-06 22:53:00,602 INFO  compress.CodecPool (CodecPool.java:getDecompressor(181)) - Got brand-new decompressor [.deflate]
[128]	1	One, two, buckle my shoe
[173]	2	Three, four, shut the door