1. 程式人生 > >MapReduce程序之TopN問題(排行榜問題)

MapReduce程序之TopN問題(排行榜問題)

大數據 Hadoop MapReduce Java

[toc]


MapReduce程序之TopN問題(排行榜問題)

需求

有下面的文本文件:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/topn$ cat senventeen_a.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/topn$ cat senventeen_b.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39

以逗號作為分隔符,每一列分別為orderid,userid,payment,productid,現在需要按照payment從大到小求出TopN,比如top10,其輸出結果應該如下:

1   9000
2   2000
3   1234
4   1000
5   910
6   701
7   490
8   281
9   100
10  28

此外,TopN中的N應該是動態的,由輸入的參數來決定,根據引寫一個MapReduce程序來進行處理。

程序思路分析

如下:

Mapper:
/**
 * Mapper,因為Block中的每一個split都會交由一個Mapper Task來進行處理,對於TopN問題,可以考慮每一個Mapper Task的輸出
 * 可以為這個split中的前N個值,最後每個數據到達Reducer的時候,就可以大大減少原來需要比較的數據量,因為在Reducer處理之前
 * Map Task已經幫我們把的數據量大大減少了,比如,在MapReduce中,默認情況下一個Block就為一個split,當然這個是可以設置的
 * 而一個Block為128M,顯然128M能夠存儲的文本文件也是相當多的,假設現在我的數據有10個Block,即1280MB的數據,如果要求Top10
 * 的問題,此時,這些數據需要10個Mapper Task來進行處理,那麽在每個Mapper Task中先求出前10個數,最後這10個數再交由Reducer來進行處理
 * 也就是說,在我們的這個案例中,Reducer需要處理排序的數有100個,顯然經過Map處理之後,Reducer的壓力就大大減少了。
 * 那麽如何實現每個Mapper Task中都只輸出10個數呢?這時可以使用一個set來緩存數據,從而達到先緩存10個數的目的,詳細可以參考下面的代碼。
 */

 Reducer:
 /**
 * Reducer,將Mapper Task輸出的數據排序後再輸出
 * 處理思路與Mapper是類似的
 */

 TopN中的N值問題:
// 向conf中傳入參數
// 在MapReduce中,因為計算是分散到每個節點上進行的
// 也就是將我們的Maper和Reducer也是分散到每個節點進行的
// 所以不能在TopNJob中設置一個全局變量來對N進行設置(雖然在本地運行時是沒有問題的,但在集群運行時會有問題)
// 因此MapReduce提供了在Configuration對象中設置參數的方法
// 通過在Configuration對象中設置某些參數,可以保證每個節點的Mapper和Reducer都能夠讀取到N

MapReduce程序

關於如何處理TopN問題的思路已經在代碼註釋中有說明,不過需要註意的是,這裏使用了前面開發的Job工具類來開發驅動程序。

package com.uplooking.bigdata.mr.topn;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import com.uplooking.bigdata.mr.secondsort.AccessLogWritable;
import com.uplooking.bigdata.mr.secondsort.SecondSortJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;

/**
 * MapReduce程序之TopN問題
 */
public class TopNJob {

    /**
     * 驅動程序,使用Job工具類來生成job
     */
    public static void main(String[] args) throws Exception {
        if (args == null || args.length < 3) {
            System.err.println("Parameter Errors! Usages:<inputpath> <outputpath> <topN>");
            System.exit(-1);
        }

        // 向conf中傳入參數
        // 在MapReduce中,因為計算是分散到每個節點上進行的
        // 也就是將我們的Maper和Reducer也是分散到每個節點進行的
        // 所以不能在TopNJob中設置一個全局變量來對N進行設置(雖然在本地運行時是沒有問題的,但在集群運行時會有問題)
        // 因此MapReduce提供了在Configuration對象中設置參數的方法
        // 通過在Configuration對象中設置某些參數,可以保證每個節點的Mapper和Reducer都能夠讀取到N
        Configuration conf = new Configuration();
        conf.set("topN", args[2]);

        Job job = MapReduceJobUtil.buildJob(conf,
                TopNJob.class,
                args[0],
                TextInputFormat.class,
                TopNJobMapper.class,
                IntWritable.class,
                NullWritable.class,
                new Path(args[1]),
                TextOutputFormat.class,
                TopNReducer.class,
                IntWritable.class,
                IntWritable.class);

        // ReduceTask必須設置為1
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
    }

    /**
     * Mapper,因為Block中的每一個split都會交由一個Mapper Task來進行處理,對於TopN問題,可以考慮每一個Mapper Task的輸出
     * 可以為這個split中的前N個值,最後每個數據到達Reducer的時候,就可以大大減少原來需要比較的數據量,因為在Reducer處理之前
     * Map Task已經幫我們把的數據量大大減少了,比如,在MapReduce中,默認情況下一個Block就為一個split,當然這個是可以設置的
     * 而一個Block為128M,顯然128M能夠存儲的文本文件也是相當多的,假設現在我的數據有10個Block,即1280MB的數據,如果要求Top10
     * 的問題,此時,這些數據需要10個Mapper Task來進行處理,那麽在每個Mapper Task中先求出前10個數,最後這10個數再交由Reducer來進行處理
     * 也就是說,在我們的這個案例中,Reducer需要處理排序的數有100個,顯然經過Map處理之後,Reducer的壓力就大大減少了。
     * 那麽如何實現每個Mapper Task中都只輸出10個數呢?這時可以使用一個set來緩存數據,從而達到先緩存10個數的目的,詳細可以參考下面的代碼。
     */
    public static class TopNJobMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {

        TreeSet<Integer> cachedTopN = null;
        Integer N = null;

        /**
         * 每個Mapper Task執行前都會先執行setup函數
         * map函數是每行執行一次
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // TreeSet定義的排序規則為倒序,後面做數據的處理時只需要pollLast最後一個即可將
            // TreeSet中較小的數去掉
            cachedTopN = new TreeSet<Integer>(new Comparator<Integer>() {
                @Override
                public int compare(Integer o1, Integer o2) {
                    int ret = 0;
                    if (o1 > o2) {
                        ret = -1;
                    } else if (o1 < o2) {
                        ret = 1;
                    }

                    return ret;
                }
            });
            // 拿到傳入參數時的topN中的N值
            N = Integer.valueOf(context.getConfiguration().get("topN"));
        }

        /**
         * 將split中前N個數篩選出來
         */
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 解析每一行
            String[] fields = value.toString().split(",");
            if (fields == null || fields.length < 3) {
                return;
            }
            // 轉換payment為數字,如果出現異常,終止當前map函數的執行
            Integer payment = null;
            try {
                payment = Integer.valueOf(fields[2]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
                return;
            }
            // 將數字寫入到TreeSet當中
            cachedTopN.add(payment);
            // 判斷cachedTopN中的元素個數是否已經達到N個,如果已經達到N個,則去掉最後一個
            if (cachedTopN.size() > N) {
                cachedTopN.pollLast();
            }
        }

        /**
         * 每個Mapper Task執行結束後才會執行cleanup函數
         * 將map函數篩選出來的前N個數寫入到context中作為輸出
         * 將
         * map函數是每行執行一次
         */
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Integer num : cachedTopN) {
                context.write(new IntWritable(num), NullWritable.get());
            }
        }
    }

    /**
     * Reducer,將Mapper Task輸出的數據排序後再輸出
     * 處理思路與Mapper是類似的
     */
    public static class TopNReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable> {

        TreeSet<Integer> cachedTopN = null;
        Integer N = null;

        /**
         * 初始化一個TreeSet
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // TreeSet定義的排序規則為倒序,後面做數據的處理時只需要pollLast最後一個即可將
            // TreeSet中較小的數去掉
            cachedTopN = new TreeSet<Integer>(new Comparator<Integer>() {
                @Override
                public int compare(Integer o1, Integer o2) {
                    int ret = 0;
                    if (o1 > o2) {
                        ret = -1;
                    } else if (o1 < o2) {
                        ret = 1;
                    }
                    return ret;
                }
            });
            // 拿到傳入參數時的topN中的N值
            N = Integer.valueOf(context.getConfiguration().get("topN"));
        }

        /**
         * 篩選Reducer Task中的前10個數
         */
        @Override
        protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            cachedTopN.add(Integer.valueOf(key.toString()));
            // 判斷cachedTopN中的元素個數是否已經達到N個,如果已經達到N個,則去掉最後一個
            if (cachedTopN.size() > N) {
                cachedTopN.pollLast();
            }
        }

        /**
         * 將reduce函數篩選出來的前N個數寫入到context中作為輸出
         */
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            int index = 1;
            for(Integer num : cachedTopN) {
                context.write(new IntWritable(index), new IntWritable(num));
                index++;
            }
        }
    }

}

測試

這裏使用本地環境來運行MapReduce程序,輸入的參數如下:

/Users/yeyonghao/data/input/topn /Users/yeyonghao/data/output/mr/topn 10

也可以將其打包成jar包,然後上傳到Hadoop環境中運行。

運行程序後,查看輸出結果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/topn$ cat part-r-00000
1   9000
2   2000
3   1234
4   1000
5   910
6   701
7   490
8   281
9   100
10  28

可以看到,我們的MapReduce程序已經完成了TopN問題的處理,並且其中的N值是動態的,可以根據參數來動態確定。

MapReduce程序之TopN問題(排行榜問題)