1. 程式人生 > >MapReduce輸入輸出類型、格式及實例

MapReduce輸入輸出類型、格式及實例

回車 行記錄 ade nts cte ftw result 默認 math

輸入格式

1、輸入分片與記錄
2、文件輸入
3、文本輸入
4、二進制輸入
5、多文件輸入
6、數據庫格式輸入

1、輸入分片與記錄

1、JobClient通過指定的輸入文件的格式來生成數據分片InputSplit。
2、一個分片不是數據本身,而是可分片數據的引用
3、InputFormat接口負責生成分片。

InputFormat 負責處理MR的輸入部分,有三個作用:
驗證作業的輸入是否規範。
把輸入文件切分成InputSplit。
提供RecordReader 的實現類。把InputSplit讀到Mapper中進行處理。

技術分享

2、文件輸入

抽象類:FilelnputFormat
1、FilelnputFormat是全部使用文件作為數據源的InputFormat實現的基類。


2、FilelnputFormat輸入數據格式的分片大小由數據塊大小決定

FileInputFormat保存作為job輸入的全部文件。並實現了對輸入文件計算splits的方法。

至於獲得記錄的方法是有不同的子類——TextInputFormat進行實現的。

package org.apache.hadoop.mapreduce.lib.input;
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
  protected
long computeSplitSize(long blockSize, long minSize,long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } /*Generate the list of files and make them into FileSplits.*/ public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long
maxSize = getMaxSplitSize(job); ...... long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); ...... } /*Get the minimum split size*/ public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); } /*Get the maximum split size.*/ public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE); } //是否分片 /* Is the given filename splitable? Usually, true, but if the file is stream compressed, it will not be. <code>FileInputFormat</code> implementations can override this and return <code>false</code> to ensure that individual input files are never split-up so that [email protected] Mapper}s process entire files. */ protected boolean isSplitable(JobContext context, Path filename) { return true;//默認須要分片 } }
自己定義輸入格式

假設我們不須要分片,那我們就須要對isSplitable方法進行重寫
1、繼承FileInputFormat基類。
2、重寫裏面的getSplits(JobContext context)方法。


3、重寫createRecordReader(InputSplit split,TaskAttemptContext context)方法。


具體樣例:
http://blog.csdn.net/scgaliguodong123_/article/details/46492039

InputSplit

在運行mapreduce之前,原始數據被切割成若幹split。每一個split作為一個map任務的輸入,在map運行過程中split會被分解成一個個記錄(key-value對), map會依次處理每一個記錄。
FileInputFormat僅僅劃分比HDFS block大的文件,所以FileInputFormat劃分
的結果是這個文件或者是這個文件裏的一部分。


假設一個文件的大小比block小,將不會被劃分,這也是Hadoop處理大文件
的效率要比處理非常多小文件的效率高的原因。
當Hadoop處理非常多小文件(文件大小小於hdfs block大小)的時候。因為
FileInputFormat不會對小文件進行劃分,所以每一個小文件都會被當做一個split並分配一個map任務,導致效率底下。


比如:一個1G的文件。會被劃分成16個64MB的split,並分配16個map任務處
理,而10000個100kb的文件會被10000個map任務處理。

Map任務的數量?

一個InputSplit相應一個Map task。


InputSplit的大小是由Math.max(minSize,Math.min(maxSize, blockSize))決定。
單節點一般10-100個map task。map task運行時長不建議低於1 分鐘,否
則效率低。

抽象類:CombineFilelnputFormat
1、能夠使用CombineFilelnputFormat來合並小文件。

2、因為CombineFilelnputFormat是一個抽象類,使用的時候須要創建一個
CombineFilelnputFormat的實體類,而且實現getRecordReader()的方法。

3、避免文件分法的方法:
A、數據塊大小盡可能大。這樣使文件的大小小於數據塊的大小,就不用進行分片。(這樣的方式不太友好)
B、繼承FilelnputFormat,而且重寫isSplitable()方法。

job.setInputFormatClass(CombineTextInputFormat.class);

Hadoop2.6.0 CombineTextInputFormat源代碼:

package org.apache.hadoop.mapreduce.lib.input;
/* Input format that is a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
public class CombineTextInputFormat
  extends CombineFileInputFormat<LongWritable,Text> {

  public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,
    TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader<LongWritable,Text>(
      (CombineFileSplit)split, context, TextRecordReaderWrapper.class);
  }

  /*A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be used in a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
  private static class TextRecordReaderWrapper
    extends CombineFileRecordReaderWrapper<LongWritable,Text> {
    // this constructor signature is required by CombineFileRecordReader
    public TextRecordReaderWrapper(CombineFileSplit split,
      TaskAttemptContext context, Integer idx)
      throws IOException, InterruptedException {
      super(new TextInputFormat(), split, context, idx);
    }
  }
}

3、文本輸入

類名:TextlnputFormat
1、TextlnputFormat是默認的lnputFormat,每一行數據就是一條記錄

2、TextlnputFormat的key是LongWritable類型的。存儲該行在整個文件的偏移量,value是每行的數據內容,Text類型。

3、輸入分片與HDFS數據塊關系:TextlnputFormat每一條記錄就是一行,非常有可能某一行跨數據塊存放。默認以\n或回車鍵作為一行記錄。

4、TextInputFormat繼承了FileInputFormat。
技術分享

類名:KeyValueTextInputFormat
能夠通過設置key為行號的方式來知道記錄的行號,而且能夠通過key.value.separator.in.input設置key與value的切割符。
當輸入數據的每一行是兩列,並用tab分離的形式的時候,KeyValueTextInputformat處理這樣的格式的文件非常適合。

假設行中有分隔符,那麽分隔符前面的作為key,後面的作為value。假設行中沒有分隔符,那麽整行作為key,value為空。

job.setInputFormatClass(KeyValueTextInputFormat.class);
//默認分隔符就是制表符
//conf.setStrings(KeyValueLineRecordReader.KEY_VALUE
_SEPERATOR, "\t")

類名:NLineInputFormat
能夠設置每一個mapper處理的行數。能夠通過mapred.line.input.format.lienspermap屬性設置。
NLineInputformat能夠控制在每一個split中數據的行數

//設置具體輸入處理類
job.setInputFormatClass(NLineInputFormat.class);
//設置每一個split的行數
NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[2]));

4、二進制輸入

輸入類:

SequenceFileInputFormat 將key和value以sequencefile格式輸入。
SequenceFileAsTextInputFormat 
SequenceFileAsBinaryInputFormat 將key和value以原始二進制的格式輸入。

因為SequenceFile能夠支持Splittable。所以能夠作為mapreduce輸入文件的格式,能夠非常方便的得到己經含有<key,value>的分片。

SequenceFile處理、壓縮處理。

5、多文件輸入

類名:MultipleInputs
1、MultipleInputs能夠提供多個輸入數據類型。
2、通過addInputPath()方法來設置多路徑。

6、數據庫格式輸入

類名:DBInputFormat
1、DBInputFormat是一個使用JDBC方式連接數據庫,而且從關系型數據庫中讀取數據的一種輸入格式。
2、有多個map會去連接數據庫。有可能造成數據庫崩潰,因此,避免過多的數據庫連接。


3、HBase中的TablelnputFormat能夠讓MapReduce程序訪問HBase表裏的數據。

實例單輸入路徑

[root@master liguodong]# hdfs dfs -cat /input.txt
hello you
hello everybody
hello hadoop
[root@master liguodong]# hdfs dfs -text /tmp.seq
15/06/10 21:17:11 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
15/06/10 21:17:11 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
100     apache software
99      chinese good
98      james NBA
97      index pass
96      apache software
95      chinese good
94      james NBA
93      index pass
......
package mrinputformat;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TestInputFormat {

    public static class TokenizerMapper
       extends Mapper<IntWritable, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//1
        private Text word = new Text();

        public void map(IntWritable key, Text value, Context context
                        ) throws IOException, InterruptedException 
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                //k v
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) 
            {
                sum += val.get();
            }
            result.set(sum); 
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        //1、配置  
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count"); 

        //2、打包運行必須運行的方法
        job.setJarByClass(TestInputFormat.class);

        //3、輸入路徑  
        //hdfs://master:8020/tmp.seq
        //hdfs://master:8020/output
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        //默認是TextInputFormat
        job.setInputFormatClass(SequenceFileInputFormat.class);

        //4、Map
        job.setMapperClass(TokenizerMapper.class);

        //5、Combiner
        job.setCombinerClass(IntSumReducer.class);

        //6、Reducer
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);

        //7、 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 

        //8、提交作業
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

運行結果:
技術分享

多輸入路徑方式

package mrinputformat;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TestInputFormat {

    //採用TextInputFormat
    public static class Mapper1
       extends Mapper<LongWritable, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//1
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException 
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                //k v
                context.write(word, one);
            }
        }
    }

    //SequenceFileInputFormat
    public static class Mapper2
       extends Mapper<IntWritable, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//1
        private Text word = new Text();

        public void map(IntWritable key, Text value, Context context
                        ) throws IOException, InterruptedException 
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                //k v
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) 
            {
                sum += val.get();
            }
            result.set(sum); 
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        //1、配置  
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count"); 

        //2、打包運行必須運行的方法
        job.setJarByClass(TestInputFormat.class);

        //3、輸入路徑  
        //hdfs://master:8020/tmp.seq
        //hdfs://master:8020/output
        //單個輸入路徑
        //FileInputFormat.addInputPath(job, new Path(args[0]));  
        //默認是TextInputFormat
        //job.setInputFormatClass(SequenceFileInputFormat.class);
        //4、Map
        //job.setMapperClass(TokenizerMapper.class);

        //多個輸入路徑
        Path path1 = new Path("hdfs://master:8020/input.txt");
        Path path2 = new Path("hdfs://master:8020/tmp.seq");
        MultipleInputs.addInputPath(job, path1, TextInputFormat.class,Mapper1.class);
        MultipleInputs.addInputPath(job, path2, SequenceFileInputFormat.class,Mapper2.class);

        //5、Combiner
        job.setCombinerClass(IntSumReducer.class);

        //6、Reducer
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:8020/output"));

        //8、提交作業
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

運行結果:
技術分享

輸出格式

技術分享

文本輸出

TextOutputFormat

默認的輸出格式。key是LongWritable,value是Text類型, key和value中間值用tab隔開的。

二進制輸出

SequenceFileOutputFormat
將key和value以sequencefile格式輸出。

SequenceFileAsBinaryOutputFormat
將key和value以原始二進制的格式輸出。

MapFileOutputFormat
將key和value寫入MapFile中。因為MapFile中的key是有序的,所以寫入的時候必須保證記錄是按key值順序寫入的。

技術分享

多文件輸出

MultipleOutputFormat
MultipleOutputs

默認情況下一個reducer會產生一個輸出,可是有些時候我們想一個reducer產生多個輸出。 MultipleOutputFormat和MultipleOutputs能夠實現這個功能。
差別:MultipleOutputs能夠產生不同類型的輸出。

技術分享

技術分享

數據庫格式輸出

DBOutputFormat

技術分享

MapReduce輸入輸出類型、格式及實例