MapReduce程序之數據去重
阿新 • • 發佈:2018-03-08
大數據 Hadoop MapReduce Java [toc]
MapReduce程序之數據去重
需求
有下面兩個文件:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/duplication$ cat file1.txt 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/duplication$ cat file2.txt 2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4 d 2012-3-5 a 2012-3-6 c 2012-3-7 d 2012-3-3 c
要求去除兩個文件中的重復行並輸出到文件中。
MapReduce程序
關於如何去重,思路已經在代碼註釋中有說明,不過需要註意的是,這裏使用了前面開發的Job工具類來開發驅動程序,程序代碼如下:
package com.uplooking.bigdata.mr.duplication; import com.uplooking.bigdata.common.utils.MapReduceJobUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 數據去重 */ public class DuplicationJob { /** * 驅動程序,使用工具類來生成job * * @param args */ public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>"); System.exit(-1); } Job job = MapReduceJobUtil.buildJob(new Configuration(), DuplicationJob.class, args[0], TextInputFormat.class, DuplicationMapper.class, Text.class, NullWritable.class, new Path(args[1]), TextOutputFormat.class, DuplicationReducer.class, Text.class, NullWritable.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } /** * 單詞去重Mapper操作,主要是將每一行的內容作為key輸出到reduce中 * 即map輸出的key為某一行的內容,value為NullWritable */ public static class DuplicationMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 直接將行作為key寫到context中 context.write(value, NullWritable.get()); } } /** * 因為shuffle會將map輸出中key相同的key-value對都拉取到同一個reducer中 * 所以數據到達reducer後,key就是唯一的key,而values則為空的集合 * 所以在reducer中也是直接將數據寫入到context中,讓reducer寫出數據即可 */ public static class DuplicationReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
測試
這裏使用本地環境來運行MapReduce程序,輸入的參數如下:
/Users/yeyonghao/data/input/duplication /Users/yeyonghao/data/output/mr/duplication
也可以將其打包成jar包,然後上傳到Hadoop環境中運行。
運行程序後,查看輸出結果如下:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/duplication$ cat part-r-00000 2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d
可以看到,使用我們的MapReduce程序,已經完成了數據去重的目的。
數據去重的另一種思路
除了前面的數據去重思路外,還有另外一種思路,雖然相對會復雜一些,不過這裏也提及一下。
Map處理時,可以將每一行數據處理成<2012-3-3, c>
的形式,然後輸出,作為reduce的輸入,比如經過shuffle之後,到達reducer的輸入為<2012-3-3, [c, c]>,那麽就可以對叠代列表中的數據通過Java中的set集合來進行去重,這也可以作為解決這個上面的數據去重案例的一種思路,但顯然這個方法沒有前面的方法好,不具有通用性,所以還是建議使用第一種方法。
MapReduce程序之數據去重