1. 程式人生 > >MapReduce程序之數據去重

MapReduce程序之數據去重

大數據 Hadoop MapReduce Java

[toc]


MapReduce程序之數據去重

需求

有下面兩個文件:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/duplication$ cat file1.txt
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/duplication$ cat file2.txt
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

要求去除兩個文件中的重復行並輸出到文件中。

MapReduce程序

關於如何去重,思路已經在代碼註釋中有說明,不過需要註意的是,這裏使用了前面開發的Job工具類來開發驅動程序,程序代碼如下:

package com.uplooking.bigdata.mr.duplication;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * 數據去重
 */
public class DuplicationJob {

    /**
     * 驅動程序,使用工具類來生成job
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        if (args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
            System.exit(-1);
        }

        Job job = MapReduceJobUtil.buildJob(new Configuration(),
                DuplicationJob.class,
                args[0],
                TextInputFormat.class,
                DuplicationMapper.class,
                Text.class,
                NullWritable.class,
                new Path(args[1]),
                TextOutputFormat.class,
                DuplicationReducer.class,
                Text.class,
                NullWritable.class);

        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
    }

    /**
     * 單詞去重Mapper操作,主要是將每一行的內容作為key輸出到reduce中
     * 即map輸出的key為某一行的內容,value為NullWritable
     */
    public static class DuplicationMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 直接將行作為key寫到context中
            context.write(value, NullWritable.get());
        }
    }

    /**
     * 因為shuffle會將map輸出中key相同的key-value對都拉取到同一個reducer中
     * 所以數據到達reducer後,key就是唯一的key,而values則為空的集合
     * 所以在reducer中也是直接將數據寫入到context中,讓reducer寫出數據即可
     */
    public static class DuplicationReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

測試

這裏使用本地環境來運行MapReduce程序,輸入的參數如下:

/Users/yeyonghao/data/input/duplication /Users/yeyonghao/data/output/mr/duplication

也可以將其打包成jar包,然後上傳到Hadoop環境中運行。

運行程序後,查看輸出結果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/duplication$ cat part-r-00000
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d

可以看到,使用我們的MapReduce程序,已經完成了數據去重的目的。

數據去重的另一種思路

除了前面的數據去重思路外,還有另外一種思路,雖然相對會復雜一些,不過這裏也提及一下。

Map處理時,可以將每一行數據處理成&lt;2012-3-3, c&gt;的形式,然後輸出,作為reduce的輸入,比如經過shuffle之後,到達reducer的輸入為<2012-3-3, [c, c]>,那麽就可以對叠代列表中的數據通過Java中的set集合來進行去重,這也可以作為解決這個上面的數據去重案例的一種思路,但顯然這個方法沒有前面的方法好,不具有通用性,所以還是建議使用第一種方法。

MapReduce程序之數據去重