1. 程式人生 > >hadoop程式設計小技巧(5)---自定義輸入檔案格式類InputFormat

hadoop程式設計小技巧(5)---自定義輸入檔案格式類InputFormat

Hadoop程式碼測試環境:Hadoop2.4

應用:在對資料需要進行一定條件的過濾和簡單處理的時候可以使用自定義輸入檔案格式類。

Hadoop內建的輸入檔案格式類有:

1)FileInputFormat<K,V>這個是基本的父類,我們自定義就直接使用它作為父類;

2)TextInputFormat<LongWritable,Text>這個是預設的資料格式類,我們一般程式設計,如果沒有特別指定的話,一般都使用的是這個;key代表當前行資料距離檔案開始的距離,value程式碼當前行字串;

3)SequenceFileInputFormat<K,V>這個是序列檔案輸入格式,使用序列檔案可以提高效率,但是不利於檢視結果,建議在過程中使用序列檔案,最後展示可以使用視覺化輸出;

4)KeyValueTextInputFormat<Text,Text>這個是讀取以Tab(也即是\t)分隔的資料,每行資料如果以\t分隔,那麼使用這個讀入,就可以自動把\t前面的當做key,後面的當做value;

5)CombineFileInputFormat<K,V>合併大量小資料是使用;

6)MultipleInputs,多種輸入,可以為每個輸入指定邏輯處理的Mapper;

原理:

InputFormat介面有兩個重要的函式:

1)getInputSplits,用於確定輸入分片,當我們繼承FileInputFormat時,就可以忽略此函式,而使用FileInputFormat的此函式即可;

2)createRecordReader ,針對資料如何讀取的類,定義輸入檔案格式,其實也就是定義此類;

在每個map函式中,最開始呼叫的都是nextKeyValue()函式,這個函式就是在RecordReader中定義的(我們自定義RecordReader就是使用不同的實現而已),所以這裡會呼叫我們指定的RecordReader中的nextKeyValue函式。這個函式就會處理或者說是初始化key和value,然後返回true,告知已經處理好了。接著就會呼叫getCurrentKey 和getCurrentValue獲取當前的key和value值。最後,返回map,繼續執行map邏輯。

自定義輸入檔案格式類:

package fz.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
 * 自定義輸入檔案讀取類
 * 
 * @author fansy
 *
 */
public class CustomInputFormat extends FileInputFormat<Text, Text> {

	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new CustomReader();
	}
}
這裡看到如果繼承了FileInputFormat 後,就不需要關心getInputSplits了,而只需要定義RecordReader即可。

自定義RecordReader

package fz.inputformat;

//import java.io.BufferedReader;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public  class CustomReader extends RecordReader<Text ,Text>{
//	private BufferedReader in;
	private LineReader lr ;
	private Text key = new Text();
	private Text value = new Text();
	private long start ;
	private long end;
	private long currentPos;
	private Text line = new Text();
	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext cxt)
			throws IOException, InterruptedException {
		FileSplit split =(FileSplit) inputSplit;
		Configuration conf = cxt.getConfiguration();
		Path path = split.getPath();
		FileSystem fs = path.getFileSystem(conf);
		FSDataInputStream is = fs.open(path);
		lr = new LineReader(is,conf);
		
		// 處理起始點和終止點
		start =split.getStart();
		end = start + split.getLength();
		is.seek(start);
		if(start!=0){
			start += lr.readLine(new Text(),0,
					(int)Math.min(Integer.MAX_VALUE, end-start));
		}
		currentPos = start;
	}

	// 針對每行資料進行處理
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if(currentPos > end){
			return false;
		}
		currentPos += lr.readLine(line);
		if(line.getLength()==0){
			return false;
		}
		if(line.toString().startsWith("ignore")){
			currentPos += lr.readLine(line);
		}
		
		String [] words = line.toString().split(",");
		// 異常處理
		if(words.length<2){
			System.err.println("line:"+line.toString()+".");
			return false;
		}
		key.set(words[0]);
		value.set(words[1]);
		return true;
		
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (currentPos - start) / (float) (end - start));
        }
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		lr.close();
	}
	
}
這裡主要是兩個函式,initial和nextKeyValue。

initial主要用於初始化,包括開啟和讀取檔案,定義讀取的進度等;

nextKeyValue則是針對每行資料(由於這裡使用的是LineReader,所以每次讀取的是一行,這裡定義不同的讀取方式,可以讀取不同的內容),產生對應的key和value對,如果沒有報錯,則返回true。這裡可以看到設定了一條規則,如果輸入資料是以ignore開始的話就忽略,同時每行只取得逗號前後的資料分別作為key和value。

實戰:

輸入資料:

ignore,2
a,3
ignore,4
c,1
c,2,3,2
4,3,2
ignore,34,2
定義主類,主類的Mapper是預設的Mapper,沒有reducer。
package fz.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class FileInputFormatDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new FileInputFormatDriver(),args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		if(arg0.length!=2){
			System.err.println("Usage:\nfz.inputformat.FileInputFormatDriver <in> <out>");
			return -1;
		}
		Configuration conf = getConf();
		
		Path in = new Path(arg0[0]);
		Path out= new Path(arg0[1]);
		out.getFileSystem(conf).delete(out, true);
		
		Job job = Job.getInstance(conf,"fileintputformat test job");
		job.setJarByClass(getClass());
		
		job.setInputFormatClass(CustomInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setMapperClass(Mapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
//		job.setOutputKeyClass(LongWritable.class);
//		job.setOutputValueClass(VectorWritable.class);
		job.setNumReduceTasks(0);
//		System.out.println(job.getConfiguration().get("mapreduce.job.reduces"));
//		System.out.println(conf.get("mapreduce.job.reduces"));
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		return job.waitForCompletion(true)?0:-1;
	}

}

檢視輸出:



這裡可以看到,ignore的資料已經被忽略掉了,同時每行只輸出了逗號前後的資料而已。

同時需要注意到:

這裡有一行資料讀入的是空字串,這個暫時還沒找到原因。

總結:自定義輸入資料格式可以針對不同的資料做些過濾,進行一些簡單的邏輯處理,有點類似map的功能,但是如果僅僅是這點功能的話,那完全可以使用map來取代了。其實輸入資料格式還有其他的功能,比如合併大量的小資料,以提高效率,這個在下篇再說。

分享,成長,快樂