1. 程式人生 > >hadoop 使用map合並小文件到SequenceFile

hadoop 使用map合並小文件到SequenceFile

耗時 art 合並 next entity 繼承 name for each nes

上一例是直接用SequenceFile的createWriter來實現,本例采用mapreduce的方式。

1、把小文件整體讀入需要自定義InputFormat格式,自定義InputFormat格式需要先定義RecordReader讀取方式,為了整體讀入,RecordReader使用一次性讀入所有字節。

1.1 繼承RecordReader泛型,重寫這個類。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class WholeFileRecordReader extends RecordReader<NullWritable,BytesWritable> { private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable();
private boolean processed = false; /** * Called once at initialization. * * @param split the split that defines the range of records to read * @param context the information about the task * @throws IOException * @throws InterruptedException */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } /** * Read the next key, value pair. * * @return true if a key/value pair was read * @throws IOException * @throws InterruptedException */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(!processed){ byte[] contents = new byte[(int)fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in,contents,0,contents.length);//一次全部讀取 value.set(contents,0,contents.length); }finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } /** * Get the current key * * @return the current key or null if there is no current key * @throws IOException * @throws InterruptedException */ @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } /** * Get the current value. * * @return the object that was read * @throws IOException * @throws InterruptedException */ @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * The current progress of the record reader through its data. * * @return a number between 0.0 and 1.0 that is the fraction of the data read * @throws IOException * @throws InterruptedException */ @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f:0.0f; } /** * Close the record reader. */ @Override public void close() throws IOException { } }

1.2 繼承FileInputFormat泛型,重寫文件輸入格式

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;


public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {
    /**
     * Is the given filename splittable? Usually, true, but if the file is
     * stream compressed, it will not be.
     * <p>
     * The default implementation in <code>FileInputFormat</code> always returns
     * true. Implementations that may deal with non-splittable files <i>must</i>
     * override this method.
     * <p>
     * <code>FileInputFormat</code> implementations can override this and return
     * <code>false</code> to ensure that individual input files are never split-up
     * so that  process entire files.
     *
     * @param context  the job context
     * @param filename the file name to check
     * @return is this file splitable?
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;//文件不分片,為了整體讀入
    }

    /**
     * Create a record reader for a given split. The framework will call
     * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
     * the split is used.
     *
     * @param split   the split to be read
     * @param context the information about the task
     * @return a new record reader
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        WholeFileRecordReader recordReader = new WholeFileRecordReader();
        recordReader.initialize(split,context);
        return recordReader;
    }
}

2、MAPPER,不要寫reduce,本例只是合並文件。

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;


public class SequenceFileMapper extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> {
    private Text filenameKey;
    /**
     * Called once at the beginning of the task.
     *
     * @param context
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit)split).getPath();
        filenameKey = new Text(path.toString());
    }

    /**
     * Called once for each key/value pair in the input split. Most applications
     * should override this, but the default is the identity function.
     *
     * @param key
     * @param value
     * @param context
     */
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(filenameKey,value);
    }
}

3、執行job,使用輔助類Tool,也可以不用,直接寫job執行就可以。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {

    /**
     * Execute the command with the given arguments.
     *
     * @param args command specific arguments.
     * @return exit code.
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        if(conf==null){
            return -1;
        }

        Path outPath = new Path(args[1]);
        FileSystem fileSystem = outPath.getFileSystem(conf);
        //刪除輸出路徑
        if(fileSystem.exists(outPath))
        {
            fileSystem.delete(outPath,true);
        }

        Job job = Job.getInstance(conf,"SmallFilesToSequenceFile");
        job.setJarByClass(SmallFilesToSequenceFileConverter.class);

        job.setMapperClass(SequenceFileMapper.class);

        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);



        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));


        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        long startTime = System.currentTimeMillis();

        int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);
        System.exit(exitCode);

        long endTime = System.currentTimeMillis();
        long timeSpan = endTime - startTime;
        System.out.println("運行耗時:"+timeSpan+"毫秒。");
    }
}

4、上傳集群運行,打包成jar包的時候把META-INF目錄和src目錄放同級,防止找不到函數入口。

#手動調整reduce數量為2,運算後會生成兩個part
[hadoop@bigdata-senior01 ~]$ hadoop jar SmallFilesToSequenceFileConverter.jar -D mapreduce.job.reduces=2 /demo /output3


...
[hadoop@bigdata-senior01 ~]$ hadoop fs -ls /output3
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2019-02-18 16:17 /output3/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 60072 2019-02-18 16:17 /output3/part-r-00000
-rw-r--r-- 1 hadoop supergroup 28520 2019-02-18 16:17 /output3/part-r-00001

hadoop 使用map合並小文件到SequenceFile