1. 程式人生 > >使用Hadoop的MapReduce來實現資料去重

使用Hadoop的MapReduce來實現資料去重

最近在系統學習大資料知識,學了沒有記錄過幾天又忘光了,所以把學習內容記錄下來,方便以後檢視 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
 * FileName: DdfferentData
 * Author:   hadoop
 * Email:    
[email protected]
* Date: 18-10-6 上午9:34 * Description: * hadoop過濾出不同的資料 */ public class DdfferentData { /** * 使用Mapper將資料檔案中的資料本身作為Mapper輸出的key直接輸出 */ public static class forDifferenceMapper extends Mapper<Object, Text, Text, Text> { private final IntWritable one = new IntWritable(1); private Text mapperValue = new Text(); //存放key的值 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.write(value,mapperValue); } } /** * 使用Reducer將輸入的key本身作為key直接輸出 */ public static class forDifferenceReducer extends Reducer<Text, Text, Text, Text> { private Text reduceValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key,reduceValue); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //設定MapReduce的配置 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length < 2){ System.out.println("Usage: DeferentData <in> [<in>...] <out>"); System.exit(2); } //Job job = new Job(conf); Job job = Job.getInstance(conf); job.setJarByClass(DdfferentData.class); job.setJobName("DeferentData"); job.setMapperClass(forDifferenceMapper.class); job.setCombinerClass(forDifferenceReducer.class);//加速MapReduce並行效率 job.setReducerClass(forDifferenceReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); for (int i = 0; i < otherArgs.length-1;++i){ FileInputFormat.addInputPath(job,new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); } }