MapReduce輸入輸出類型、格式及實例
輸入格式
1、輸入分片與記錄
2、文件輸入
3、文本輸入
4、二進制輸入
5、多文件輸入
6、數據庫格式輸入
1、輸入分片與記錄
1、JobClient通過指定的輸入文件的格式來生成數據分片InputSplit。
2、一個分片不是數據本身,而是可分片數據的引用。
3、InputFormat接口負責生成分片。
InputFormat 負責處理MR的輸入部分,有三個作用:
驗證作業的輸入是否規範。
把輸入文件切分成InputSplit。
提供RecordReader 的實現類。把InputSplit讀到Mapper中進行處理。
2、文件輸入
抽象類:FilelnputFormat
1、FilelnputFormat是全部使用文件作為數據源的InputFormat實現的基類。
2、FilelnputFormat輸入數據格式的分片大小由數據塊大小決定。
FileInputFormat保存作為job輸入的全部文件。並實現了對輸入文件計算splits的方法。
至於獲得記錄的方法是有不同的子類——TextInputFormat進行實現的。
package org.apache.hadoop.mapreduce.lib.input;
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
/*Generate the list of files and make them into FileSplits.*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
......
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
......
}
/*Get the minimum split size*/
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
/*Get the maximum split size.*/
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
}
//是否分片
/*
Is the given filename splitable? Usually, true, but if the file is stream compressed, it will not be.
<code>FileInputFormat</code> implementations can override this and return <code>false</code> to ensure that individual input files are never split-up so that [email protected] Mapper}s process entire files.
*/
protected boolean isSplitable(JobContext context, Path filename) {
return true;//默認須要分片
}
}
自己定義輸入格式
假設我們不須要分片,那我們就須要對isSplitable方法進行重寫
1、繼承FileInputFormat基類。
2、重寫裏面的getSplits(JobContext context)方法。
3、重寫createRecordReader(InputSplit split,TaskAttemptContext context)方法。
具體樣例:
http://blog.csdn.net/scgaliguodong123_/article/details/46492039
InputSplit
在運行mapreduce之前,原始數據被切割成若幹split。每一個split作為一個map任務的輸入,在map運行過程中split會被分解成一個個記錄(key-value對), map會依次處理每一個記錄。
FileInputFormat僅僅劃分比HDFS block大的文件,所以FileInputFormat劃分
的結果是這個文件或者是這個文件裏的一部分。
假設一個文件的大小比block小,將不會被劃分,這也是Hadoop處理大文件
的效率要比處理非常多小文件的效率高的原因。
當Hadoop處理非常多小文件(文件大小小於hdfs block大小)的時候。因為
FileInputFormat不會對小文件進行劃分,所以每一個小文件都會被當做一個split並分配一個map任務,導致效率底下。
比如:一個1G的文件。會被劃分成16個64MB的split,並分配16個map任務處
理,而10000個100kb的文件會被10000個map任務處理。
Map任務的數量?
一個InputSplit相應一個Map task。
InputSplit的大小是由Math.max(minSize,Math.min(maxSize, blockSize))決定。
單節點一般10-100個map task。map task運行時長不建議低於1 分鐘,否
則效率低。
抽象類:CombineFilelnputFormat
1、能夠使用CombineFilelnputFormat來合並小文件。
2、因為CombineFilelnputFormat是一個抽象類,使用的時候須要創建一個
CombineFilelnputFormat的實體類,而且實現getRecordReader()
的方法。
3、避免文件分法的方法:
A、數據塊大小盡可能大。這樣使文件的大小小於數據塊的大小,就不用進行分片。(這樣的方式不太友好)
B、繼承FilelnputFormat,而且重寫isSplitable()方法。
job.setInputFormatClass(CombineTextInputFormat.class);
Hadoop2.6.0 CombineTextInputFormat源代碼:
package org.apache.hadoop.mapreduce.lib.input;
/* Input format that is a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
public class CombineTextInputFormat
extends CombineFileInputFormat<LongWritable,Text> {
public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader<LongWritable,Text>(
(CombineFileSplit)split, context, TextRecordReaderWrapper.class);
}
/*A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be used in a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
private static class TextRecordReaderWrapper
extends CombineFileRecordReaderWrapper<LongWritable,Text> {
// this constructor signature is required by CombineFileRecordReader
public TextRecordReaderWrapper(CombineFileSplit split,
TaskAttemptContext context, Integer idx)
throws IOException, InterruptedException {
super(new TextInputFormat(), split, context, idx);
}
}
}
3、文本輸入
類名:TextlnputFormat
1、TextlnputFormat是默認的lnputFormat,每一行數據就是一條記錄。
2、TextlnputFormat的key是LongWritable類型的。存儲該行在整個文件的偏移量,value是每行的數據內容,Text類型。
3、輸入分片與HDFS數據塊關系:TextlnputFormat每一條記錄就是一行,非常有可能某一行跨數據塊存放。默認以\n或回車鍵作為一行記錄。
4、TextInputFormat繼承了FileInputFormat。
類名:KeyValueTextInputFormat
能夠通過設置key為行號的方式來知道記錄的行號,而且能夠通過key.value.separator.in.input
設置key與value的切割符。
當輸入數據的每一行是兩列,並用tab分離的形式的時候,KeyValueTextInputformat處理這樣的格式的文件非常適合。
假設行中有分隔符,那麽分隔符前面的作為key,後面的作為value。假設行中沒有分隔符,那麽整行作為key,value為空。
job.setInputFormatClass(KeyValueTextInputFormat.class);
//默認分隔符就是制表符
//conf.setStrings(KeyValueLineRecordReader.KEY_VALUE
_SEPERATOR, "\t")
類名:NLineInputFormat
能夠設置每一個mapper處理的行數。能夠通過mapred.line.input.format.lienspermap
屬性設置。
NLineInputformat能夠控制在每一個split中數據的行數。
//設置具體輸入處理類
job.setInputFormatClass(NLineInputFormat.class);
//設置每一個split的行數
NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[2]));
4、二進制輸入
輸入類:
SequenceFileInputFormat 將key和value以sequencefile格式輸入。
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat 將key和value以原始二進制的格式輸入。
因為SequenceFile能夠支持Splittable。所以能夠作為mapreduce輸入文件的格式,能夠非常方便的得到己經含有<key,value>
的分片。
SequenceFile處理、壓縮處理。
5、多文件輸入
類名:MultipleInputs
1、MultipleInputs能夠提供多個輸入數據類型。
2、通過addInputPath()方法來設置多路徑。
6、數據庫格式輸入
類名:DBInputFormat
1、DBInputFormat是一個使用JDBC方式連接數據庫,而且從關系型數據庫中讀取數據的一種輸入格式。
2、有多個map會去連接數據庫。有可能造成數據庫崩潰,因此,避免過多的數據庫連接。
3、HBase中的TablelnputFormat能夠讓MapReduce程序訪問HBase表裏的數據。
實例單輸入路徑
[root@master liguodong]# hdfs dfs -cat /input.txt
hello you
hello everybody
hello hadoop
[root@master liguodong]# hdfs dfs -text /tmp.seq
15/06/10 21:17:11 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
15/06/10 21:17:11 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
100 apache software
99 chinese good
98 james NBA
97 index pass
96 apache software
95 chinese good
94 james NBA
93 index pass
......
package mrinputformat;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestInputFormat {
public static class TokenizerMapper
extends Mapper<IntWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);//1
private Text word = new Text();
public void map(IntWritable key, Text value, Context context
) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
//k v
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//1、配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
//2、打包運行必須運行的方法
job.setJarByClass(TestInputFormat.class);
//3、輸入路徑
//hdfs://master:8020/tmp.seq
//hdfs://master:8020/output
FileInputFormat.addInputPath(job, new Path(args[0]));
//默認是TextInputFormat
job.setInputFormatClass(SequenceFileInputFormat.class);
//4、Map
job.setMapperClass(TokenizerMapper.class);
//5、Combiner
job.setCombinerClass(IntSumReducer.class);
//6、Reducer
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
//7、 輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//8、提交作業
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
運行結果:
多輸入路徑方式
package mrinputformat;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestInputFormat {
//採用TextInputFormat
public static class Mapper1
extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);//1
private Text word = new Text();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
//k v
context.write(word, one);
}
}
}
//SequenceFileInputFormat
public static class Mapper2
extends Mapper<IntWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);//1
private Text word = new Text();
public void map(IntWritable key, Text value, Context context
) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
//k v
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//1、配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
//2、打包運行必須運行的方法
job.setJarByClass(TestInputFormat.class);
//3、輸入路徑
//hdfs://master:8020/tmp.seq
//hdfs://master:8020/output
//單個輸入路徑
//FileInputFormat.addInputPath(job, new Path(args[0]));
//默認是TextInputFormat
//job.setInputFormatClass(SequenceFileInputFormat.class);
//4、Map
//job.setMapperClass(TokenizerMapper.class);
//多個輸入路徑
Path path1 = new Path("hdfs://master:8020/input.txt");
Path path2 = new Path("hdfs://master:8020/tmp.seq");
MultipleInputs.addInputPath(job, path1, TextInputFormat.class,Mapper1.class);
MultipleInputs.addInputPath(job, path2, SequenceFileInputFormat.class,Mapper2.class);
//5、Combiner
job.setCombinerClass(IntSumReducer.class);
//6、Reducer
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7、 輸出路徑
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:8020/output"));
//8、提交作業
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
運行結果:
輸出格式
文本輸出
TextOutputFormat
默認的輸出格式。key是LongWritable,value是Text類型, key和value中間值用tab隔開的。
二進制輸出
SequenceFileOutputFormat
將key和value以sequencefile格式輸出。
SequenceFileAsBinaryOutputFormat
將key和value以原始二進制的格式輸出。
MapFileOutputFormat
將key和value寫入MapFile中。因為MapFile中的key是有序的,所以寫入的時候必須保證記錄是按key值順序寫入的。
多文件輸出
MultipleOutputFormat
MultipleOutputs
默認情況下一個reducer會產生一個輸出,可是有些時候我們想一個reducer產生多個輸出。 MultipleOutputFormat和MultipleOutputs能夠實現這個功能。
差別:MultipleOutputs能夠產生不同類型的輸出。
數據庫格式輸出
DBOutputFormat
MapReduce輸入輸出類型、格式及實例