1. 程式人生 > >MapReduce處理資料去重與資料排序

MapReduce處理資料去重與資料排序

一:MapReduce處理資料去重

Map的key具有資料去重的功能

/*
 * 去除資料中相同資料
 * 資料去重問題
 * 以整個資料作為key傳送出去, value為null
 */
public class DelsameMap extends Mapper<LongWritable, Text, Text, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.length() > 0) {
			context.write(new Text(line.trim()), new Text(""));
		}

	}
}
public class DelsameRedu extends Reducer<Text, Text, Text, NullWritable> {
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(key, NullWritable.get());
	}
}
public class DelsameMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(DelsameMain.class);

		job.setMapperClass(DelsameMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(DelsameRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}


二:MapReduce處理資料排序

將原始資料作為map輸出的key設定為int型別。map會自動的根據key進行排序

/*
 * mapreduce處理資料排序
 *將原始資料作為map輸出的key設定為int型別。map會自動的根據key進行排序
 */
public class SortMap extends Mapper<LongWritable, Text, IntWritable, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, IntWritable, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.length() > 0) {
			context.write(new IntWritable(Integer.parseInt(line.trim())),
					new Text(""));
		}
	}
}
/*
 * 將values作為次序key。將map排序好的key作為value輸出
 */
public class SortRedu extends
		Reducer<IntWritable, Text, IntWritable, IntWritable> {
	private IntWritable num = new IntWritable(1);

	@Override
	protected void reduce(IntWritable key, Iterable<Text> values,
			Reducer<IntWritable, Text, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {

		// 將values作為排序的次序。將map拍好序的key作為reduce的value輸出
		for (Text val : values) {
			context.write(num, key);
			num = new IntWritable(num.get() + 1);
		}
	}
}
public class SortMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(SortMain.class);

		job.setMapperClass(SortMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(SortRedu.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}