Hadoop學習記錄(六、MapReduce測試)
阿新 • • 發佈:2018-12-11
1.MRUnit進行單元測試
加入依賴
<dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <version>1.1.0</version> <classifier>hadoop2</classifier> <scope>test</scope> </dependency>
定義一個數據輸出實體類,並加上資料的驗證條件:
public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; public void parse(String record) { // 從15到19列獲取年份資訊 year = record.substring(15, 19); String airTemperatureString; // 判斷氣溫的正負 if (record.charAt(87) == '+') { //從88到92列獲取溫度資訊 airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); //獲取質量驗證值 quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; } }
mapper類:
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()) { context.write(new Text(parser.getYear()), new IntWritable(parser.getAirTemperature())); } } }
測試類:
public class MaxTemperatureMapperTest {
// 氣溫為負數
@Test
public void processesValidRecord() throws IOException, InterruptedException {
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// Year ^^^^
"99999V0203201N00261220001CN9999999N9-00111+99999999999");
// Temperature ^^^^^
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0), value)
.withOutput(new Text("1950"), new IntWritable(-11))
.runTest();
}
// 氣溫為正數
@Test
public void processesPositiveTemperatureRecord() throws IOException,
InterruptedException {
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// Year ^^^^
"99999V0203201N00261220001CN9999999N9+00111+99999999999");
// Temperature ^^^^^
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0), value)
.withOutput(new Text("1950"), new IntWritable(11))
.runTest();
}
// 氣溫值過大
@Test
public void ignoresMissingTemperatureRecord() throws IOException,
InterruptedException {
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// Year ^^^^
"99999V0203201N00261220001CN9999999N9+99991+99999999999");
// Temperature ^^^^^
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0), value)
.runTest();
}
// 質量無效資料
@Test
public void ignoresSuspectQualityRecord() throws IOException,
InterruptedException {
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// Year ^^^^
"99999V0203201N00261220001CN9999999N9+00112+99999999999");
// Temperature ^^^^^
// Suspect quality ^
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0), value)
.runTest();
}
}
reducer類:
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
測試類:
public class MaxTemperatureReducerTest {
@Test
public void returnsMaximumIntegerInValues() throws IOException{
new ReduceDriver<Text, IntWritable, Text, IntWritable>()
.withReducer(new MaxTemperatureReducer())
.withInput(new Text("1950"),
Arrays.asList(new IntWritable(10), new IntWritable(5)))
.withOutput(new Text("1950"), new IntWritable(10))
.runTest();
}
}
2.虛擬機器環境測試
使用Tool介面實現一個MapReduce驅動程式:
public class MaxTemperatureDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
System.exit(exitCode);
}
}
在虛擬機器執行
./hadoop jar /usr/local/hadoop-1.0-SNAPSHOT.jar MaxTemperatureDriver file:///usr/local/sample.txt file:///usr/local/output
驗證:
在本地執行測試類與本地檔案比對(注意:本機要先配置hadoop):
public class MaxTemperatureDriverTest {
public static class OutputLogFilter implements PathFilter {
public boolean accept(Path path) {
return !path.getName().startsWith("_");
}
}
//vv MaxTemperatureDriverTestV2
@Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
conf.set("mapreduce.framework.name", "local");
conf.setInt("mapreduce.task.io.sort.mb", 1);
Path input = new Path("input/ncdc/micro");
Path output = new Path("output");
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(output, true); // delete old output
MaxTemperatureDriver driver = new MaxTemperatureDriver();
driver.setConf(conf);
int exitCode = driver.run(new String[] {
input.toString(), output.toString() });
assertThat(exitCode, is(0));
checkOutput(conf, output);
}
//^^ MaxTemperatureDriverTestV2
private void checkOutput(Configuration conf, Path output) throws IOException {
FileSystem fs = FileSystem.getLocal(conf);
Path[] outputFiles = FileUtil.stat2Paths(
fs.listStatus(output, new OutputLogFilter()));
assertThat(outputFiles.length, is(1));
BufferedReader actual = asBufferedReader(fs.open(outputFiles[0]));
BufferedReader expected = asBufferedReader(
getClass().getResourceAsStream("/expected.txt"));
String expectedLine;
while ((expectedLine = expected.readLine()) != null) {
assertThat(actual.readLine(), is(expectedLine));
}
assertThat(actual.readLine(), nullValue());
actual.close();
expected.close();
}
private BufferedReader asBufferedReader(InputStream in) throws IOException {
return new BufferedReader(new InputStreamReader(in));
}
}
本機mini叢集測試:
public class MaxTemperatureDriverMiniTest extends ClusterMapReduceTestCase {
public static class OutputLogFilter implements PathFilter {
public boolean accept(Path path) {
return !path.getName().startsWith("_");
}
}
@Override
protected void setUp() throws Exception {
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
super.setUp();
}
// Not marked with @Test since ClusterMapReduceTestCase is a JUnit 3 test case
public void test() throws Exception {
Configuration conf = createJobConf();
Path localInput = new Path("input/ncdc/micro");
Path input = getInputDir();
Path output = getOutputDir();
// Copy input data into test HDFS
getFileSystem().copyFromLocalFile(localInput, input);
MaxTemperatureDriver driver = new MaxTemperatureDriver();
driver.setConf(conf);
int exitCode = driver.run(new String[] {
input.toString(), output.toString() });
assertThat(exitCode, is(0));
// Check the output is as expected
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(output, new OutputLogFilter()));
assertThat(outputFiles.length, is(1));
InputStream in = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
assertThat(reader.readLine(), is("1949\t111"));
assertThat(reader.readLine(), is("1950\t22"));
assertThat(reader.readLine(), nullValue());
reader.close();
}
}