MapReduce處理資料去重與資料排序
阿新 • • 發佈:2019-02-16
一:MapReduce處理資料去重
Map的key具有資料去重的功能
/* * 去除資料中相同資料 * 資料去重問題 * 以整個資料作為key傳送出去, value為null */ public class DelsameMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.length() > 0) { context.write(new Text(line.trim()), new Text("")); } } }
public class DelsameRedu extends Reducer<Text, Text, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
public class DelsameMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(DelsameMain.class); job.setMapperClass(DelsameMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(DelsameRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
二:MapReduce處理資料排序
將原始資料作為map輸出的key設定為int型別。map會自動的根據key進行排序
/*
* mapreduce處理資料排序
*將原始資料作為map輸出的key設定為int型別。map會自動的根據key進行排序
*/
public class SortMap extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
if (line.length() > 0) {
context.write(new IntWritable(Integer.parseInt(line.trim())),
new Text(""));
}
}
}
/*
* 將values作為次序key。將map排序好的key作為value輸出
*/
public class SortRedu extends
Reducer<IntWritable, Text, IntWritable, IntWritable> {
private IntWritable num = new IntWritable(1);
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// 將values作為排序的次序。將map拍好序的key作為reduce的value輸出
for (Text val : values) {
context.write(num, key);
num = new IntWritable(num.get() + 1);
}
}
}
public class SortMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(SortMain.class);
job.setMapperClass(SortMap.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SortRedu.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}