1. 程式人生 > >【十八掌●武功篇】第七掌:MapReduce之單元測試

【十八掌●武功篇】第七掌:MapReduce之單元測試

MRUnit是一個基於JUnit的單元測試框架,專門用來對Hadoop框架的MapReduce進行單元測試。MRUnit針對於不同的測試物件使用不同的Driver:
- MapDriver,針對單獨的Map測試。
- ReduceDriver,針對單獨的Reduce進行測試。
- MapReduceDriver,將Map和Reduce連貫起來進行測試。

單元測試例項

1、新增依賴

<dependency>
    <groupId>org.apache.mrunit</groupId>
    <artifactId>
mrunit</artifactId> <version>1.1.0</version> <classifier>hadoop2</classifier> </dependency> <!— 新增依賴時注意classifier,區分hadoop1hadoop2,mapreduceV1配置hadoop1,mapreduceV2配置hadoop2 -->

2、MapReduce程式類

package mapreduce;
import mywritable.PariWritable;
import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class SortWCMapReduce extends Configured implements Tool { //Mapper類 public static class SortWCMapper extends Mapper<LongWritable, Text, PariWritable, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split(","); PariWritable mapOutputKey = new PariWritable(strs[0], Integer.valueOf(strs[1])); context.write(mapOutputKey, new IntWritable(mapOutputKey.getSecond())); } } //Reducer類 public static class SortWCReducer extends Reducer<PariWritable, IntWritable, Text, IntWritable> { @Override public void reduce(PariWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(new Text(key.getFirst()), value); } } } //Driver驅動方法 public int run(String[] args) throws Exception { //獲取配置 Configuration configuration = this.getConf(); //建立job Job job = Job.getInstance(configuration, SortWCMapReduce.class.getSimpleName()); //指定MapReduce主類 job.setJarByClass(SortWCMapReduce.class); //指定輸入路徑 Path inpath = new Path(args[0]); FileInputFormat.addInputPath(job, inpath); //指定輸出路徑 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outpath); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(SortWCMapper.class); job.setMapOutputKeyClass(PariWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(SortWCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); boolean isSucces = job.waitForCompletion(true); return isSucces ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); int status = ToolRunner.run(configuration, new SortWCMapReduce(), args); System.exit(status); } }

3、自定義型別類

package mywritable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by 鳴宇淳 on 2017/5/15.
 */
public class PariWritable implements WritableComparable<PariWritable> {
    private String first;
    private Integer second;

    public String getFirst() {
        return first;
    }

    public Integer getSecond() {
        return second;
    }

    public PariWritable() {
    }

    public PariWritable(String f, Integer s) {
        this.set(f, s);
    }

    public void set(String f, Integer s) {
        this.first = f;
        this.second = s;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(first);
        dataOutput.writeInt(second);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.first = dataInput.readUTF();
        this.second = dataInput.readInt();
    }

    @Override
    public String toString() {
        return first + "|" + second;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (!(obj instanceof PariWritable)) {
            return false;
        } else {
            PariWritable other = (PariWritable) obj;
            return this.first.equals(other.first) && this.second.compareTo(other.second) == 0;
        }
    }

    public int compareTo(PariWritable o) {
        //先比較第一個,如果第一個相同,比較第二個
        int comp = this.first.compareTo(o.first);
        if (comp == 0) {
            comp= this.second.compareTo(o.second);
        }
        return comp;
    }
}

4、單元測試類


import mapreduce.SortWCMapReduce;
import mywritable.PariWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;

/**
 * Created by 鳴宇淳 on 2017/5/12.
 */
public class SortWCMapReduceTest {
    MapDriver<LongWritable, Text, PariWritable, IntWritable> mapDriver;
    ReduceDriver<PariWritable, IntWritable, Text, IntWritable> reduceDriver;

    @Before
    public void setUp() {
        SortWCMapReduce.SortWCMapper mapper = new SortWCMapReduce.SortWCMapper();
        SortWCMapReduce.SortWCReducer reduce = new SortWCMapReduce.SortWCReducer();
        mapDriver = new MapDriver(mapper);
        reduceDriver = new ReduceDriver(reduce);

    }

//Map測試方法
    @Test
    public void testMapper() throws IOException {
        mapDriver.withInput(new LongWritable(), new Text("655209,3"));
        mapDriver.withOutput(new PariWritable("655209", 3), new IntWritable(3));
        mapDriver.runTest();
    }

//reduce測試方法
    @Test
    public void testReduce() throws IOException {
        reduceDriver.withInput(new PariWritable("62669", 5), new ArrayList<IntWritable>() {
            {
                add(new IntWritable(1));
                add(new IntWritable(3));
            }
        });
        reduceDriver.withOutput(new Text("62669"), new IntWritable(1))
                .withOutput(new Text("62669"), new IntWritable(3));
        reduceDriver.runTest();
    }

//MapReduce整體測試方法
@Test
public void testMapReduce() {
    mapReduceDriver.withInput(new LongWritable(), new Text("10001,4"))
            .withInput(new LongWritable(), new Text("10002,2"))
            .withInput(new LongWritable(), new Text("10001,2"))
    mapReduceDriver.withOutput(new Text("10001"), new IntWritable(2))
            .withOutput(new Text("10001"), new IntWritable(4))
            .withOutput(new Text("10002"), new IntWritable(2));
}
}